aboutsummaryrefslogtreecommitdiffstats
path: root/lib/stdlib/src/dets_server.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stdlib/src/dets_server.erl')
-rw-r--r--lib/stdlib/src/dets_server.erl402
1 files changed, 402 insertions, 0 deletions
diff --git a/lib/stdlib/src/dets_server.erl b/lib/stdlib/src/dets_server.erl
new file mode 100644
index 0000000000..931112088e
--- /dev/null
+++ b/lib/stdlib/src/dets_server.erl
@@ -0,0 +1,402 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2001-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(dets_server).
+
+%% Disk based linear hashing lookup dictionary. Server part.
+
+-behaviour(gen_server).
+
+%% External exports.
+-export([all/0, close/1, get_pid/1, open_file/1, open_file/2, pid2name/1,
+ users/1, verbose/1]).
+
+%% Internal.
+-export([start_link/0, start/0, stop/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+%% record for not yet handled reqeusts to open or close files
+-record(pending, {tab, ref, pid, from, reqtype, clients}). % [{From,Args}]
+
+%% state for the dets server
+-record(state, {store, parent, pending}). % [pending()]
+
+-include("dets.hrl").
+
+-define(REGISTRY, dets_registry). % {Table, NoUsers, TablePid}
+-define(OWNERS, dets_owners). % {TablePid, Table}
+-define(STORE, dets). % {User, Table} and {{links,User}, NoLinks}
+
+%%-define(DEBUGF(X,Y), io:format(X, Y)).
+-define(DEBUGF(X,Y), void).
+
+-compile({inline, [{pid2name_1,1}]}).
+
+%%%----------------------------------------------------------------------
+%%% API
+%%%----------------------------------------------------------------------
+
+%% Internal.
+start_link() ->
+ gen_server:start_link({local, ?SERVER_NAME}, dets_server, [self()], []).
+
+start() ->
+ ensure_started().
+
+stop() ->
+ case whereis(?SERVER_NAME) of
+ undefined ->
+ stopped;
+ _Pid ->
+ gen_server:call(?SERVER_NAME, stop, infinity)
+ end.
+
+all() ->
+ call(all).
+
+close(Tab) ->
+ call({close, Tab}).
+
+get_pid(Tab) ->
+ ets:lookup_element(?REGISTRY, Tab, 3).
+
+open_file(File) ->
+ call({open, File}).
+
+open_file(Tab, OpenArgs) ->
+ call({open, Tab, OpenArgs}).
+
+pid2name(Pid) ->
+ ensure_started(),
+ pid2name_1(Pid).
+
+users(Tab) ->
+ call({users, Tab}).
+
+verbose(What) ->
+ call({set_verbose, What}).
+
+call(Message) ->
+ ensure_started(),
+ gen_server:call(?SERVER_NAME, Message, infinity).
+
+%%%----------------------------------------------------------------------
+%%% Callback functions from gen_server
+%%%----------------------------------------------------------------------
+
+%%----------------------------------------------------------------------
+%% Func: init/1
+%% Returns: {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%%----------------------------------------------------------------------
+init(Parent) ->
+ Store = init(),
+ {ok, #state{store=Store, parent=Parent, pending = []}}.
+
+%%----------------------------------------------------------------------
+%% Func: handle_call/3
+%% Returns: {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} | (terminate/2 is called)
+%% {stop, Reason, State} (terminate/2 is called)
+%%----------------------------------------------------------------------
+handle_call(all, _From, State) ->
+ F = fun(X, A) -> [element(1, X) | A] end,
+ {reply, ets:foldl(F, [], ?REGISTRY), State};
+handle_call({close, Tab}, From, State) ->
+ request([{{close, Tab}, From}], State);
+handle_call({open, File}, From, State) ->
+ request([{{open, File}, From}], State);
+handle_call({open, Tab, OpenArgs}, From, State) ->
+ request([{{open, Tab, OpenArgs}, From}], State);
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
+handle_call({set_verbose, What}, _From, State) ->
+ set_verbose(What),
+ {reply, ok, State};
+handle_call({users, Tab}, _From, State) ->
+ Users = ets:select(State#state.store, [{{'$1', Tab}, [], ['$1']}]),
+ {reply, Users, State}.
+
+%%----------------------------------------------------------------------
+%% Func: handle_cast/2
+%% Returns: {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State} (terminate/2 is called)
+%%----------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%----------------------------------------------------------------------
+%% Func: handle_info/2
+%% Returns: {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State} (terminate/2 is called)
+%%----------------------------------------------------------------------
+handle_info({pending_reply, {Ref, Result0}}, State) ->
+ {value, #pending{tab = Tab, pid = Pid, from = {FromPid,_Tag}=From,
+ reqtype = ReqT, clients = Clients}} =
+ lists:keysearch(Ref, #pending.ref, State#state.pending),
+ Store = State#state.store,
+ Result =
+ case {Result0, ReqT} of
+ {ok, add_user} ->
+ do_link(Store, FromPid),
+ true = ets:insert(Store, {FromPid, Tab}),
+ ets:update_counter(?REGISTRY, Tab, 1),
+ {ok, Tab};
+ {ok, internal_open} ->
+ link(Pid),
+ do_link(Store, FromPid),
+ true = ets:insert(Store, {FromPid, Tab}),
+ true = ets:insert(?REGISTRY, {Tab, 1, Pid}),
+ true = ets:insert(?OWNERS, {Pid, Tab}),
+ {ok, Tab};
+ {Reply, _} -> % ok or Error
+ Reply
+ end,
+ gen_server:reply(From, Result),
+ NP = lists:keydelete(Pid, #pending.pid, State#state.pending),
+ State1 = State#state{pending = NP},
+ request(Clients, State1);
+handle_info({'EXIT', Pid, _Reason}, State) ->
+ Store = State#state.store,
+ case pid2name_1(Pid) of
+ {ok, Tab} ->
+ %% A table was killed.
+ true = ets:delete(?REGISTRY, Tab),
+ true = ets:delete(?OWNERS, Pid),
+ Users = ets:select(State#state.store, [{{'$1', Tab}, [], ['$1']}]),
+ true = ets:match_delete(Store, {'_', Tab}),
+ lists:foreach(fun(User) -> do_unlink(Store, User) end, Users),
+ {noreply, State};
+ undefined ->
+ %% Close all tables used by Pid.
+ F = fun({FromPid, Tab}, S) ->
+ {_, S1} = handle_close(S, {close, Tab},
+ {FromPid, notag}, Tab),
+ S1
+ end,
+ State1 = lists:foldl(F, State, ets:lookup(Store, Pid)),
+ {noreply, State1}
+ end;
+handle_info(_Message, State) ->
+ {noreply, State}.
+
+%%----------------------------------------------------------------------
+%% Func: terminate/2
+%% Purpose: Shutdown the server
+%% Returns: any (ignored by gen_server)
+%%----------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%----------------------------------------------------------------------
+%% Func: code_change/3
+%% Purpose: Convert process state when code is changed
+%% Returns: {ok, NewState}
+%%----------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%----------------------------------------------------------------------
+%%% Internal functions
+%%%----------------------------------------------------------------------
+
+ensure_started() ->
+ case whereis(?SERVER_NAME) of
+ undefined ->
+ DetsSup = {dets_sup, {dets_sup, start_link, []}, permanent,
+ 1000, supervisor, [dets_sup]},
+ _ = supervisor:start_child(kernel_safe_sup, DetsSup),
+ DetsServer = {?SERVER_NAME, {?MODULE, start_link, []},
+ permanent, 2000, worker, [?MODULE]},
+ _ = supervisor:start_child(kernel_safe_sup, DetsServer),
+ ok;
+ _ -> ok
+ end.
+
+init() ->
+ set_verbose(verbose_flag()),
+ process_flag(trap_exit, true),
+ ets:new(?REGISTRY, [set, named_table]),
+ ets:new(?OWNERS, [set, named_table]),
+ ets:new(?STORE, [duplicate_bag]).
+
+verbose_flag() ->
+ case init:get_argument(dets) of
+ {ok, Args} ->
+ lists:member(["verbose"], Args);
+ _ ->
+ false
+ end.
+
+set_verbose(true) ->
+ put(verbose, yes);
+set_verbose(_) ->
+ erase(verbose).
+
+%% Inlined.
+pid2name_1(Pid) ->
+ case ets:lookup(?OWNERS, Pid) of
+ [] -> undefined;
+ [{_Pid,Tab}] -> {ok, Tab}
+ end.
+
+request([{Req, From} | L], State) ->
+ Res = case Req of
+ {close, Tab} ->
+ handle_close(State, Req, From, Tab);
+ {open, File} ->
+ do_internal_open(State, From, [File, get(verbose)]);
+ {open, Tab, OpenArgs} ->
+ do_open(State, Req, From, OpenArgs, Tab)
+ end,
+ State2 = case Res of
+ {pending, State1} ->
+ State1;
+ {Reply, State1} ->
+ gen_server:reply(From, Reply),
+ State1
+ end,
+ request(L, State2);
+request([], State) ->
+ {noreply, State}.
+
+%% -> {pending, NewState} | {Reply, NewState}
+do_open(State, Req, From, Args, Tab) ->
+ case check_pending(Tab, From, State, Req) of
+ {pending, NewState} -> {pending, NewState};
+ false ->
+ case ets:lookup(?REGISTRY, Tab) of
+ [] ->
+ A = [Tab, Args, get(verbose)],
+ do_internal_open(State, From, A);
+ [{Tab, _Counter, Pid}] ->
+ pending_call(Tab, Pid, make_ref(), From, Args,
+ add_user, State)
+ end
+ end.
+
+%% -> {pending, NewState} | {Reply, NewState}
+do_internal_open(State, From, Args) ->
+ case supervisor:start_child(dets_sup, [self()]) of
+ {ok, Pid} ->
+ Ref = make_ref(),
+ Tab = case Args of
+ [T, _, _] -> T;
+ [_, _] -> Ref
+ end,
+ pending_call(Tab, Pid, Ref, From, Args, internal_open, State);
+ Error ->
+ {Error, State}
+ end.
+
+%% -> {pending, NewState} | {Reply, NewState}
+handle_close(State, Req, {FromPid,_Tag}=From, Tab) ->
+ case check_pending(Tab, From, State, Req) of
+ {pending, NewState} -> {pending, NewState};
+ false ->
+ Store = State#state.store,
+ case ets:match_object(Store, {FromPid, Tab}) of
+ [] ->
+ ?DEBUGF("DETS: Table ~w close attempt by non-owner~w~n",
+ [Tab, FromPid]),
+ {{error, not_owner}, State};
+ [_ | Keep] ->
+ case ets:lookup(?REGISTRY, Tab) of
+ [{Tab, 1, Pid}] ->
+ do_unlink(Store, FromPid),
+ true = ets:delete(?REGISTRY, Tab),
+ true = ets:delete(?OWNERS, Pid),
+ true = ets:match_delete(Store, {FromPid, Tab}),
+ unlink(Pid),
+ pending_call(Tab, Pid, make_ref(), From, [],
+ internal_close, State);
+ [{Tab, _Counter, Pid}] ->
+ do_unlink(Store, FromPid),
+ true = ets:match_delete(Store, {FromPid, Tab}),
+ [true = ets:insert(Store, K) || K <- Keep],
+ ets:update_counter(?REGISTRY, Tab, -1),
+ pending_call(Tab, Pid, make_ref(), From, [],
+ remove_user, State)
+ end
+ end
+ end.
+
+%% Links with counters
+do_link(Store, Pid) ->
+ Key = {links, Pid},
+ case ets:lookup(Store, Key) of
+ [] ->
+ true = ets:insert(Store, {Key, 1}),
+ link(Pid);
+ [{_, C}] ->
+ true = ets:delete(Store, Key),
+ true = ets:insert(Store, {Key, C+1})
+ end.
+
+do_unlink(Store, Pid) ->
+ Key = {links, Pid},
+ case ets:lookup(Store, Key) of
+ [{_, C}] when C > 1 ->
+ true = ets:delete(Store, Key),
+ true = ets:insert(Store, {Key, C-1});
+ _ ->
+ true = ets:delete(Store, Key),
+ unlink(Pid)
+
+ end.
+
+pending_call(Tab, Pid, Ref, {FromPid, _Tag}=From, Args, ReqT, State) ->
+ Server = self(),
+ F = fun() ->
+ Res = case ReqT of
+ add_user ->
+ dets:add_user(Pid, Tab, Args);
+ internal_open ->
+ dets:internal_open(Pid, Ref, Args);
+ internal_close ->
+ dets:internal_close(Pid);
+ remove_user ->
+ dets:remove_user(Pid, FromPid)
+ end,
+ Server ! {pending_reply, {Ref, Res}}
+ end,
+ _ = spawn(F),
+ PD = #pending{tab = Tab, ref = Ref, pid = Pid, reqtype = ReqT,
+ from = From, clients = []},
+ P = [PD | State#state.pending],
+ {pending, State#state{pending = P}}.
+
+check_pending(Tab, From, State, Req) ->
+ case lists:keysearch(Tab, #pending.tab, State#state.pending) of
+ {value, #pending{tab = Tab, clients = Clients}=P} ->
+ NP = lists:keyreplace(Tab, #pending.tab, State#state.pending,
+ P#pending{clients = Clients++[{Req,From}]}),
+ {pending, State#state{pending = NP}};
+ false ->
+ false
+ end.