-module(ddfs_master).
-behaviour(gen_server).
-export([start_link/0]).
-export([get_tags/1, get_tags/3,
get_nodeinfo/1,
get_read_nodes/0,
get_hosted_tags/1,
gc_blacklist/0, gc_blacklist/1,
gc_stats/0,
choose_write_nodes/3,
new_blob/4, new_blob/5,
safe_gc_blacklist/0, safe_gc_blacklist/1,
refresh_tag_cache/0,
tag_notify/2,
tag_operation/2, tag_operation/3,
update_gc_stats/1,
update_nodes/1
]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-define(WEB_PORT, 8011).
-compile(nowarn_deprecated_type).
-include("common_types.hrl").
-include("gs_util.hrl").
-include("config.hrl").
-include("ddfs.hrl").
-include("ddfs_tag.hrl").
-include("ddfs_gc.hrl").
-type node_info() :: {node(), {non_neg_integer(), non_neg_integer()}}.
-type gc_stats() :: none | gc_run_stats().
-record(state, {tags = gb_trees:empty() :: gb_trees:tree(),
tag_cache = false :: false | gb_sets:set(),
cache_refresher :: pid(),
nodes = [] :: [node_info()],
write_blacklist = [] :: [node()],
read_blacklist = [] :: [node()],
gc_blacklist = [] :: [node()],
safe_gc_blacklist = gb_sets:empty() :: gb_sets:set(),
gc_stats = none :: none | {gc_stats(), erlang:timestamp()}}).
-type state() :: #state{}.
-type replyto() :: {pid(), reference()}.
-export_type([gc_stats/0, node_info/0]).
%% ===================================================================
%% API functions
-spec start_link() -> {ok, pid()}.
start_link() ->
lager:info("DDFS master starts"),
case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of
{ok, Server} -> {ok, Server};
{error, {already_started, Server}} -> {ok, Server}
end.
-spec tag_operation(term(), tagname()) -> term().
tag_operation(Op, Tag) ->
gen_server:call(?MODULE, {tag, Op, Tag}).
-spec tag_operation(term(), tagname(), non_neg_integer() | infinity) ->
term().
tag_operation(Op, Tag, Timeout) ->
gen_server:call(?MODULE, {tag, Op, Tag}, Timeout).
-spec tag_notify(term(), tagname()) -> ok.
tag_notify(Op, Tag) ->
gen_server:cast(?MODULE, {tag_notify, Op, Tag}).
-spec get_nodeinfo(all) -> {ok, [node_info()]}.
get_nodeinfo(all) ->
gen_server:call(?MODULE, {get_nodeinfo, all}).
-spec get_read_nodes() -> {ok, [node()], non_neg_integer()} | {error, term()}.
get_read_nodes() ->
gen_server:call(?MODULE, get_read_nodes, infinity).
-spec gc_blacklist() -> {ok, [node()]}.
gc_blacklist() ->
gen_server:call(?MODULE, gc_blacklist).
-spec gc_blacklist([node()]) -> ok.
gc_blacklist(Nodes) ->
gen_server:cast(?MODULE, {gc_blacklist, Nodes}).
-spec gc_stats() -> {ok, none | {gc_stats(), erlang:timestamp()}} | {error, term()}.
gc_stats() ->
gen_server:call(?MODULE, gc_stats).
-spec get_hosted_tags(host()) -> {ok, [tagname()]} | {error, term()}.
get_hosted_tags(Host) ->
gen_server:call(?MODULE, {get_hosted_tags, Host}).
-spec choose_write_nodes(non_neg_integer(), [node()], [node()]) -> {ok, [node()]}.
choose_write_nodes(K, Include, Exclude) ->
gen_server:call(?MODULE, {choose_write_nodes, K, Include, Exclude}).
-spec get_tags(gc) -> {ok, [tagname()], [node()]} | too_many_failed_nodes;
(safe) -> {ok, [binary()]} | too_many_failed_nodes.
get_tags(Mode) ->
get_tags(?MODULE, Mode, ?GET_TAG_TIMEOUT).
-spec get_tags(server(), gc, non_neg_integer()) ->
{ok, [tagname()], [node()]} | too_many_failed_nodes;
(server(), safe, non_neg_integer()) ->
{ok, [binary()]} | too_many_failed_nodes.
get_tags(Server, Mode, Timeout) ->
disco_profile:timed_run(
fun() -> gen_server:call(Server, {get_tags, Mode}, Timeout) end,
get_tags).
-spec new_blob(string()|object_name(), non_neg_integer(), [node()], [node()]) ->
too_many_replicas | {ok, [nonempty_string()]}.
new_blob(Obj, K, Include, Exclude) ->
gen_server:call(?MODULE, {new_blob, Obj, K, Include, Exclude}, infinity).
-spec new_blob(server(), string()|object_name(), non_neg_integer(), [node()], [node()]) ->
too_many_replicas | {ok, [nonempty_string()]}.
new_blob(Master, Obj, K, Include, Exclude) ->
gen_server:call(Master, {new_blob, Obj, K, Include, Exclude}, infinity).
-spec safe_gc_blacklist() -> {ok, [node()]} | {error, term()}.
safe_gc_blacklist() ->
gen_server:call(?MODULE, safe_gc_blacklist).
-spec safe_gc_blacklist(gb_sets:set()) -> ok.
safe_gc_blacklist(SafeGCBlacklist) ->
gen_server:cast(?MODULE, {safe_gc_blacklist, SafeGCBlacklist}).
-spec update_gc_stats(gc_run_stats()) -> ok.
update_gc_stats(Stats) ->
gen_server:cast(?MODULE, {update_gc_stats, Stats}).
-type nodes_update() :: [{node(), boolean(), boolean()}].
-spec update_nodes(nodes_update()) -> ok.
update_nodes(DDFSNodes) ->
gen_server:cast(?MODULE, {update_nodes, DDFSNodes}).
-spec update_nodestats(gb_trees:tree()) -> ok.
update_nodestats(NewNodes) ->
gen_server:cast(?MODULE, {update_nodestats, NewNodes}).
-spec update_tag_cache(gb_sets:set()) -> ok.
update_tag_cache(TagCache) ->
gen_server:cast(?MODULE, {update_tag_cache, TagCache}).
-spec refresh_tag_cache() -> ok.
refresh_tag_cache() ->
gen_server:cast(?MODULE, refresh_tag_cache).
%% ===================================================================
%% gen_server callbacks
-spec init(_) -> gs_init().
init(_Args) ->
_ = [disco_profile:new_histogram(Name)
|| Name <- [get_tags, do_get_tags_all, do_get_tags_filter,
do_get_tags_safe, do_get_tags_gc]],
spawn_link(fun() -> monitor_diskspace() end),
spawn_link(fun() -> ddfs_gc:start_gc(disco:get_setting("DDFS_DATA")) end),
Refresher = spawn_link(fun() -> refresh_tag_cache_proc() end),
put(put_port, disco:get_setting("DDFS_PUT_PORT")),
{ok, #state{cache_refresher = Refresher}}.
-type choose_write_nodes_msg() :: {choose_write_nodes, non_neg_integer(), [node()], [node()]}.
-type new_blob_msg() :: {new_blob, string() | object_name(), non_neg_integer(), [node()]}.
-type tag_msg() :: {tag, ddfs_tag:call_msg(), tagname()}.
-spec handle_call(dbg_state_msg(), from(), state()) ->
gs_reply(state());
({get_nodeinfo, all}, from(), state()) ->
gs_reply({ok, [node_info()]});
(get_read_nodes, from(), state()) ->
gs_reply({ok, [node()], non_neg_integer});
(gc_blacklist, from(), state()) ->
gs_reply({ok, [node()]});
(gc_stats, from(), state()) ->
gs_reply({ok, gc_stats(), erlang:timestamp()});
(choose_write_nodes_msg(), from(), state()) ->
gs_reply({ok, [node()]});
(new_blob_msg(), from(), state()) ->
gs_reply(new_blob_result());
(tag_msg(), from(), state()) ->
gs_reply({error, nonodes}) | gs_noreply();
({get_tags, gc | safe}, from(), state()) ->
gs_noreply();
({get_hosted_tags, host()}, from(), state()) ->
gs_noreply();
(safe_gc_blacklist, from(), state()) ->
gs_reply({ok, [node()]}).
handle_call(dbg_get_state, _, S) ->
{reply, S, S};
handle_call({get_nodeinfo, all}, _From, #state{nodes = Nodes} = S) ->
{reply, {ok, Nodes}, S};
handle_call(get_read_nodes, _F, #state{nodes = Nodes, read_blacklist = RB} = S) ->
{reply, do_get_readable_nodes(Nodes, RB), S};
handle_call(gc_blacklist, _F, #state{gc_blacklist = Nodes} = S) ->
{reply, {ok, Nodes}, S};
handle_call(gc_stats, _F, #state{gc_stats = Stats} = S) ->
{reply, {ok, Stats}, S};
handle_call({choose_write_nodes, K, Include, Exclude}, _,
#state{nodes = N, write_blacklist = WBL, gc_blacklist = GBL} = S) ->
BL = lists:umerge(WBL, GBL),
{reply, do_choose_write_nodes(N, K, Include, Exclude, BL), S};
handle_call({new_blob, Obj, K, Include, Exclude}, _,
#state{nodes = N, gc_blacklist = GBL, write_blacklist = WBL} = S) ->
BL = lists:umerge(WBL, GBL),
{reply, do_new_blob(Obj, K, Include, Exclude, BL, N), S};
handle_call({tag, _M, _Tag}, _From, #state{nodes = []} = S) ->
{reply, {error, no_nodes}, S};
handle_call({tag, M, Tag}, From, S) ->
{noreply, do_tag_request(M, Tag, From, S)};
handle_call({get_tags, Mode}, From, #state{nodes = Nodes} = S) ->
spawn(fun() ->
gen_server:reply(From, do_get_tags(Mode, [N || {N, _} <- Nodes]))
end),
{noreply, S};
handle_call({get_hosted_tags, Host}, From, S) ->
spawn(fun() -> gen_server:reply(From, ddfs_gc:hosted_tags(Host)) end),
{noreply, S};
handle_call(safe_gc_blacklist, _From, #state{safe_gc_blacklist = SBL} = S) ->
{reply, {ok, gb_sets:to_list(SBL)}, S}.
-spec handle_cast({tag_notify, ddfs_tag:cast_msg(), tagname()}
| {gc_blacklist, [node()]}
| {safe_gc_blacklist, gb_sets:set()}
| {update_gc_stats, gc_stats()}
| {update_tag_cache, gb_sets:set()}
| refresh_tag_cache
| {update_nodes, nodes_update()}
| {update_nodestats, gb_trees:tree()},
state()) -> gs_noreply().
handle_cast({tag_notify, M, Tag}, S) ->
{noreply, do_tag_notify(M, Tag, S)};
handle_cast({gc_blacklist, Nodes}, #state{safe_gc_blacklist = SBL} = S) ->
BLSet = gb_sets:from_list(Nodes),
NewSBL = gb_sets:intersection(BLSet, SBL),
{noreply, S#state{gc_blacklist = gb_sets:to_list(BLSet),
safe_gc_blacklist = NewSBL}};
handle_cast({safe_gc_blacklist, SafeBlacklist}, #state{gc_blacklist = BL} = S) ->
SBL = gb_sets:intersection(SafeBlacklist, gb_sets:from_list(BL)),
{noreply, S#state{safe_gc_blacklist = SBL}};
handle_cast({update_gc_stats, Stats}, S) ->
{noreply, S#state{gc_stats = {Stats, now()}}};
handle_cast({update_tag_cache, TagCache}, S) ->
{noreply, S#state{tag_cache = TagCache}};
handle_cast(refresh_tag_cache, #state{cache_refresher = Refresher} = S) ->
Refresher ! refresh,
{noreply, S};
handle_cast({update_nodes, NewNodes}, S) ->
{noreply, do_update_nodes(NewNodes, S)};
handle_cast({update_nodestats, NewNodes}, S) ->
{noreply, do_update_nodestats(NewNodes, S)}.
-spec handle_info({'DOWN', _, _, pid(), _}, state()) -> gs_noreply().
handle_info({'DOWN', _, _, Pid, _}, S) ->
{noreply, do_tag_exit(Pid, S)}.
%% ===================================================================
%% gen_server callback stubs
-spec terminate(term(), state()) -> ok.
terminate(Reason, _State) ->
lager:warning("DDFS master died: ~p", [Reason]).
-spec code_change(term(), state(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%% ===================================================================
%% internal functions
-spec do_get_readable_nodes([node_info()], [node()]) ->
{ok, [node()], non_neg_integer()}.
do_get_readable_nodes(Nodes, ReadBlacklist) ->
NodeSet = gb_sets:from_ordset(lists:sort([Node || {Node, _} <- Nodes])),
BlackSet = gb_sets:from_ordset(ReadBlacklist),
ReadableNodeSet = gb_sets:subtract(NodeSet, BlackSet),
{ok, gb_sets:to_list(ReadableNodeSet), gb_sets:size(BlackSet)}.
-spec do_choose_write_nodes([node_info()], non_neg_integer(), [node()], [node()], [node()]) ->
{ok, [node()]}.
do_choose_write_nodes(Nodes, K, Include, Exclude, BlackList) ->
% Include is the list of nodes that must be included
%
% Node selection algorithm:
% 1. try to choose K nodes randomly from all the nodes which have
% more than ?MIN_FREE_SPACE bytes free space available and which
% are not excluded or blacklisted.
% 2. if K nodes cannot be found this way, choose the K emptiest
% nodes which are not excluded or blacklisted.
Primary = ([N || {N, {Free, _Total}} <- Nodes, Free > ?MIN_FREE_SPACE / 1024]
-- (Exclude ++ BlackList)),
if length(Primary) >= K ->
{ok, Include ++ disco_util:choose_random(Primary -- Include , K - length(Include))};
true ->
Preferred = [N || {N, _} <- lists:reverse(lists:keysort(2, Nodes))],
Secondary = Include ++ lists:sublist(Preferred -- (Include ++ Exclude ++ BlackList),
K - length(Include)),
{ok, Secondary}
end.
-type new_blob_result() :: too_many_replicas | {ok, [nonempty_string()]}.
-spec do_new_blob(string()|object_name(), non_neg_integer(), [node()], [node()], [node()], [node_info()]) ->
new_blob_result().
do_new_blob(_Obj, K, _Include, _Exclude, _BlackList, Nodes) when K > length(Nodes) ->
too_many_replicas;
do_new_blob(Obj, K, Include, Exclude, BlackList, Nodes) ->
{ok, WriteNodes} = do_choose_write_nodes(Nodes, K, Include, Exclude, BlackList),
Urls = [["http://", disco:host(N), ":", get(put_port), "/ddfs/", Obj]
|| N <- WriteNodes],
{ok, Urls}.
% Tag request: Start a new tag server if one doesn't exist already. Forward
% the request to the tag server.
-spec get_tag_pid(tagname(), gb_trees:tree(), false | gb_sets:set()) ->
{pid(), gb_trees:tree()}.
get_tag_pid(Tag, Tags, Cache) ->
case gb_trees:lookup(Tag, Tags) of
none ->
NotFound = (Cache =/= false
andalso not gb_sets:is_element(Tag, Cache)),
{ok, Server} = ddfs_tag:start(Tag, NotFound),
erlang:monitor(process, Server),
{Server, gb_trees:insert(Tag, Server, Tags)};
{value, P} ->
{P, Tags}
end.
-spec do_tag_request(term(), tagname(), replyto(), state()) ->
state().
do_tag_request(M, Tag, From, #state{tags = Tags, tag_cache = Cache} = S) ->
{Pid, TagsN} = get_tag_pid(Tag, Tags, Cache),
gen_server:cast(Pid, {M, From}),
S#state{tags = TagsN,
tag_cache = Cache =/= false andalso gb_sets:add(Tag, Cache)}.
-spec do_tag_notify(term(), tagname(), state()) -> state().
do_tag_notify(M, Tag, #state{tags = Tags, tag_cache = Cache} = S) ->
{Pid, TagsN} = get_tag_pid(Tag, Tags, Cache),
gen_server:cast(Pid, {notify, M}),
S#state{tags = TagsN,
tag_cache = Cache =/= false andalso gb_sets:add(Tag, Cache)}.
-spec do_update_nodes(nodes_update(), state()) -> state().
do_update_nodes(NewNodes, #state{nodes = Nodes, tags = Tags} = S) ->
WriteBlacklist = lists:sort([Node || {Node, false, _} <- NewNodes]),
ReadBlacklist = lists:sort([Node || {Node, _, false} <- NewNodes]),
OldNodes = gb_trees:from_orddict(Nodes),
UpdatedNodes = lists:keysort(1, [case gb_trees:lookup(Node, OldNodes) of
none ->
{Node, {0, 0}};
{value, OldStats} ->
{Node, OldStats}
end || {Node, _WB, _RB} <- NewNodes]),
if
UpdatedNodes =/= Nodes ->
_ = [gen_server:cast(Pid, {die, none}) || Pid <- gb_trees:values(Tags)],
spawn(fun() ->
{ok, ReadableNodes, RBSize} =
do_get_readable_nodes(UpdatedNodes, ReadBlacklist),
refresh_tag_cache(ReadableNodes, RBSize)
end),
S#state{nodes = UpdatedNodes,
write_blacklist = WriteBlacklist,
read_blacklist = ReadBlacklist,
tag_cache = false,
tags = gb_trees:empty()};
true ->
S#state{write_blacklist = WriteBlacklist,
read_blacklist = ReadBlacklist}
end.
-spec do_update_nodestats(gb_trees:tree(), state()) -> state().
do_update_nodestats(NewNodes, #state{nodes = Nodes} = S) ->
UpdatedNodes = [case gb_trees:lookup(Node, NewNodes) of
none ->
{Node, Stats};
{value, NewStats} ->
{Node, NewStats}
end || {Node, Stats} <- Nodes],
S#state{nodes = UpdatedNodes}.
-spec do_tag_exit(pid(), state()) -> state().
do_tag_exit(Pid, S) ->
NewTags = [X || {_, V} = X <- gb_trees:to_list(S#state.tags), V =/= Pid],
S#state{tags = gb_trees:from_orddict(NewTags)}.
-spec do_get_tags(all | filter, [node()]) -> {[node()], [node()], [binary()]};
(safe, [node()]) -> {ok, [binary()]} | too_many_failed_nodes;
(gc, [node()]) -> {ok, [binary()], [node()]} | too_many_failed_nodes.
do_get_tags(all, Nodes) ->
disco_profile:timed_run(
fun() ->
{Replies, Failed} =
gen_server:multi_call(Nodes, ddfs_node, get_tags, ?NODE_TIMEOUT),
{OkNodes, Tags} = lists:unzip(Replies),
{OkNodes, Failed, lists:usort(lists:flatten(Tags))}
end, do_get_tags_all);
do_get_tags(filter, Nodes) ->
disco_profile:timed_run(
fun() ->
{OkNodes, Failed, Tags} = do_get_tags(all, Nodes),
case tag_operation(get_tagnames, <<"+deleted">>, ?NODEOP_TIMEOUT) of
{ok, Deleted} ->
TagSet = gb_sets:from_ordset(Tags),
DelSet = gb_sets:insert(<<"+deleted">>, Deleted),
NotDeleted = gb_sets:to_list(gb_sets:subtract(TagSet, DelSet)),
{OkNodes, Failed, NotDeleted};
E ->
E
end
end, do_get_tags_filter);
do_get_tags(safe, Nodes) ->
disco_profile:timed_run(
fun() ->
TagMinK = list_to_integer(disco:get_setting("DDFS_TAG_MIN_REPLICAS")),
case do_get_tags(filter, Nodes) of
{_OkNodes, Failed, Tags} when length(Failed) < TagMinK ->
{ok, Tags};
_ ->
too_many_failed_nodes
end
end, do_get_tags_safe);
% The returned tag list may include +deleted.
do_get_tags(gc, Nodes) ->
disco_profile:timed_run(
fun() ->
{OkNodes, Failed, Tags} = do_get_tags(all, Nodes),
TagMinK = list_to_integer(disco:get_setting("DDFS_TAG_MIN_REPLICAS")),
case length(Failed) < TagMinK of
false ->
too_many_failed_nodes;
true ->
case tag_operation(get_tagnames, <<"+deleted">>, ?NODEOP_TIMEOUT) of
{ok, Deleted} ->
TagSet = gb_sets:from_ordset(Tags),
NotDeleted = gb_sets:subtract(TagSet, Deleted),
{ok, gb_sets:to_list(NotDeleted), OkNodes};
E ->
E
end
end
end, do_get_tags_gc).
% Timeouts in this call by the below processes can cause ddfs_master
% itself to crash, since the processes are linked to it.
-spec safe_get_read_nodes() -> {ok, [node()], non_neg_integer()} | error.
safe_get_read_nodes() ->
try get_read_nodes() of
{ok, _ReadableNodes, _RBSize} = RN ->
RN;
E ->
lager:error("unexpected response retrieving readable nodes: ~p", [E]),
error
catch
K:E ->
lager:error("error retrieving readable nodes: ~p:~p", [K, E]),
error
end.
-spec monitor_diskspace() -> no_return().
monitor_diskspace() ->
case safe_get_read_nodes() of
{ok, ReadableNodes, _RBSize} ->
{Space, _F} = gen_server:multi_call(ReadableNodes,
ddfs_node,
get_diskspace,
?NODE_TIMEOUT),
update_nodestats(gb_trees:from_orddict(lists:keysort(1, Space)));
error ->
ok
end,
timer:sleep(?DISKSPACE_INTERVAL),
monitor_diskspace().
-spec refresh_tag_cache_proc() -> no_return().
refresh_tag_cache_proc() ->
case safe_get_read_nodes() of
{ok, ReadableNodes, RBSize} ->
refresh_tag_cache(ReadableNodes, RBSize);
error ->
ok
end,
receive
refresh ->
ok
after ?TAG_CACHE_INTERVAL ->
ok
end,
refresh_tag_cache_proc().
-spec refresh_tag_cache([node()], non_neg_integer()) -> ok.
refresh_tag_cache(Nodes, BLSize) ->
TagMinK = list_to_integer(disco:get_setting("DDFS_TAG_MIN_REPLICAS")),
{Replies, Failed} =
gen_server:multi_call(Nodes, ddfs_node, get_tags, ?NODE_TIMEOUT),
if Nodes =/= [], length(Failed) + BLSize < TagMinK ->
{_OkNodes, Tags} = lists:unzip(Replies),
update_tag_cache(gb_sets:from_list(lists:flatten(Tags)));
true -> ok
end.