aboutsummaryrefslogblamecommitdiffstats
path: root/lib/stdlib/src/pg.erl
blob: ee177e4e0b239c634c5df97fc3eecf91c63d2ba3 (plain) (tree)
1
2
3
4

                   
                                                        































                                                                         

                                                     








                                                          


                                                           

















                                                                    


                                       








                                              

                                    









                                                                 

                                     








                                             

                                     



































































                                                                           
%%
%% %CopyrightBegin%
%% 
%% Copyright Ericsson AB 1996-2011. All Rights Reserved.
%% 
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%% 
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%% 
%% %CopyrightEnd%
%%
-module(pg).

%% pg provides a process group facility. Messages 
%% can be multicasted to all members in the group

-export([create/1,
	 create/2,
	 standby/2,
	 join/2,
	 send/2,
	 esend/2,
	 members/1,
	 name_to_pid/1,
	 master/1]).


%% Create a brand new empty process group with the master residing 
%% at the local node

-spec create(PgName) -> 'ok' | {'error', Reason} when
      PgName :: term(),
      Reason :: 'already_created' | term().

create(PgName) -> 
    catch begin check(PgName),
    Pid = spawn(pg,master,[PgName]),
    global:register_name(PgName,Pid),
    ok end.

%% Create a brand new empty process group with the master 
%% residing at Node

-spec create(PgName, Node) -> 'ok' | {'error', Reason} when
      PgName :: term(),
      Node :: node(),
      Reason :: 'already_created' | term().

create(PgName, Node) ->
    catch begin check(PgName),
    Pid = spawn(Node,pg,master,[PgName]),
    global:register_name(PgName,Pid),
    ok end.

%% Have a process on Node that will act as a standby for the process
%% group manager. So if the node where the manager runs fails, the
%% process group will continue to function.

-spec standby(term(), node()) -> 'ok'.

standby(_PgName, _Node) ->
    ok.

%% Tell process group PgName that Pid is a new member of the group
%% synchronously return a list of all old members in the group

-spec join(PgName, Pid) -> Members when
      PgName :: term(),
      Pid :: pid(),
      Members :: [pid()].

join(PgName, Pid) when is_atom(PgName) -> 
    global:send(PgName, {join,self(),Pid}),
    receive
	{_P,{members,Members}} ->
	    Members
    end.

%% Multi cast Mess to all members in the group

-spec send(PgName, Msg) -> 'ok' when
      PgName :: term(),
      Msg :: term().

send(PgName, Mess) when is_atom(PgName) ->
    global:send(PgName, {send, self(), Mess}),
    ok;
send(Pg, Mess) when is_pid(Pg) ->
    Pg ! {send,self(),Mess},
    ok.

%% multi cast a message to all members in the group but ourselves
%% If we are a member

-spec esend(PgName, Msg) -> 'ok' when
      PgName :: term(),
      Msg :: term().

esend(PgName, Mess) when is_atom(PgName) ->
    global:send(PgName, {esend,self(),Mess}),
    ok;
esend(Pg, Mess) when is_pid(Pg) ->
    Pg ! {esend,self(),Mess},
    ok.

%% Return the members of the group

-spec members(PgName) -> Members when
      PgName :: term(),
      Members :: [pid()].

members(PgName) when is_atom(PgName) ->
    global:send(PgName, {self() ,members}),
    receive
	{_P,{members,Members}} ->
	    Members
    end;
members(Pg) when is_pid(Pg) ->
    Pg ! {self,members},
    receive
	{_P,{members,Members}} ->
	    Members
    end.

-spec name_to_pid(atom()) -> pid() | 'undefined'.

name_to_pid(PgName) when is_atom(PgName) ->
    global:whereis_name(PgName).

-spec master(term()) -> no_return().

master(PgName) ->
    process_flag(trap_exit, true),
    master_loop(PgName, []).

master_loop(PgName,Members) ->
    receive
	{send,From,Message} ->
	    send_all(Members,{pg_message,From,PgName,Message}),
	    master_loop(PgName,Members);
	{esend,From,Message} ->
	    send_all(lists:delete(From,Members),
		     {pg_message,From,PgName,Message}),
	    master_loop(PgName,Members);
	{join,From,Pid} ->
	    link(Pid),
	    send_all(Members,{new_member,PgName,Pid}),
	    From ! {self(),{members,Members}},
	    master_loop(PgName,[Pid|Members]);
	{From,members} ->
	    From ! {self(),{members,Members}},
	    master_loop(PgName,Members);
	{'EXIT',From,_} ->
	    L =
		case lists:member(From,Members) of
		    true ->
			NewMembers = lists:delete(From,Members),
			send_all(NewMembers, {crashed_member,PgName,From}),
			NewMembers;
		    false ->
			Members
		end,
	    master_loop(PgName,L)  
    end.

send_all([], _) -> ok;
send_all([P|Ps], M) ->
    P ! M,
    send_all(Ps, M).

%% Check if the process group already exists

check(PgName) ->
    case global:whereis_name(PgName) of
        Pid when is_pid(Pid) -> 
            throw({error,already_created});
        undefined ->
	    ok
    end.