aboutsummaryrefslogtreecommitdiffstats
path: root/lib/stdlib/src/pg.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/stdlib/src/pg.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/stdlib/src/pg.erl')
-rw-r--r--lib/stdlib/src/pg.erl172
1 files changed, 172 insertions, 0 deletions
diff --git a/lib/stdlib/src/pg.erl b/lib/stdlib/src/pg.erl
new file mode 100644
index 0000000000..503654e706
--- /dev/null
+++ b/lib/stdlib/src/pg.erl
@@ -0,0 +1,172 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(pg).
+
+%% pg provides a process group facility. Messages
+%% can be multicasted to all members in the group
+
+-export([create/1,
+ create/2,
+ standby/2,
+ join/2,
+ send/2,
+ esend/2,
+ members/1,
+ name_to_pid/1,
+ master/1]).
+
+
+%% Create a brand new empty process group with the master residing
+%% at the local node
+
+-spec create(term()) -> 'ok' | {'error', term()}.
+
+create(PgName) ->
+ catch begin check(PgName),
+ Pid = spawn(pg,master,[PgName]),
+ global:register_name(PgName,Pid),
+ ok end.
+
+%% Create a brand new empty process group with the master
+%% residing at Node
+
+-spec create(term(), node()) -> 'ok' | {'error', term()}.
+
+create(PgName, Node) ->
+ catch begin check(PgName),
+ Pid = spawn(Node,pg,master,[PgName]),
+ global:register_name(PgName,Pid),
+ ok end.
+
+%% Have a process on Node that will act as a standby for the process
+%% group manager. So if the node where the manager runs fails, the
+%% process group will continue to function.
+
+-spec standby(term(), node()) -> 'ok'.
+
+standby(_PgName, _Node) ->
+ ok.
+
+%% Tell process group PgName that Pid is a new member of the group
+%% synchronously return a list of all old members in the group
+
+-spec join(atom(), pid()) -> [pid()].
+
+join(PgName, Pid) when is_atom(PgName) ->
+ global:send(PgName, {join,self(),Pid}),
+ receive
+ {_P,{members,Members}} ->
+ Members
+ end.
+
+%% Multi cast Mess to all members in the group
+
+-spec send(atom() | pid(), term()) -> 'ok'.
+
+send(PgName, Mess) when is_atom(PgName) ->
+ global:send(PgName, {send, self(), Mess}),
+ ok;
+send(Pg, Mess) when is_pid(Pg) ->
+ Pg ! {send,self(),Mess},
+ ok.
+
+%% multi cast a message to all members in the group but ourselves
+%% If we are a member
+
+-spec esend(atom() | pid(), term()) -> 'ok'.
+
+esend(PgName, Mess) when is_atom(PgName) ->
+ global:send(PgName, {esend,self(),Mess}),
+ ok;
+esend(Pg, Mess) when is_pid(Pg) ->
+ Pg ! {esend,self(),Mess},
+ ok.
+
+%% Return the members of the group
+
+-spec members(atom() | pid()) -> [pid()].
+
+members(PgName) when is_atom(PgName) ->
+ global:send(PgName, {self() ,members}),
+ receive
+ {_P,{members,Members}} ->
+ Members
+ end;
+members(Pg) when is_pid(Pg) ->
+ Pg ! {self,members},
+ receive
+ {_P,{members,Members}} ->
+ Members
+ end.
+
+-spec name_to_pid(atom()) -> pid() | 'undefined'.
+
+name_to_pid(PgName) when is_atom(PgName) ->
+ global:whereis_name(PgName).
+
+-spec master(term()) -> no_return().
+
+master(PgName) ->
+ process_flag(trap_exit, true),
+ master_loop(PgName, []).
+
+master_loop(PgName,Members) ->
+ receive
+ {send,From,Message} ->
+ send_all(Members,{pg_message,From,PgName,Message}),
+ master_loop(PgName,Members);
+ {esend,From,Message} ->
+ send_all(lists:delete(From,Members),
+ {pg_message,From,PgName,Message}),
+ master_loop(PgName,Members);
+ {join,From,Pid} ->
+ link(Pid),
+ send_all(Members,{new_member,PgName,Pid}),
+ From ! {self(),{members,Members}},
+ master_loop(PgName,[Pid|Members]);
+ {From,members} ->
+ From ! {self(),{members,Members}},
+ master_loop(PgName,Members);
+ {'EXIT',From,_} ->
+ L =
+ case lists:member(From,Members) of
+ true ->
+ NewMembers = lists:delete(From,Members),
+ send_all(NewMembers, {crashed_member,PgName,From}),
+ NewMembers;
+ false ->
+ Members
+ end,
+ master_loop(PgName,L)
+ end.
+
+send_all([], _) -> ok;
+send_all([P|Ps], M) ->
+ P ! M,
+ send_all(Ps, M).
+
+%% Check if the process group already exists
+
+check(PgName) ->
+ case global:whereis_name(PgName) of
+ Pid when is_pid(Pid) ->
+ throw({error,already_created});
+ undefined ->
+ ok
+ end.