aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_etcp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_etcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_etcp.erl311
1 files changed, 311 insertions, 0 deletions
diff --git a/lib/diameter/src/transport/diameter_etcp.erl b/lib/diameter/src/transport/diameter_etcp.erl
new file mode 100644
index 0000000000..d925d62545
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_etcp.erl
@@ -0,0 +1,311 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% This module implements a transport_module that uses Erlang message
+%% passing for transport.
+%%
+
+-module(diameter_etcp).
+
+-behaviour(gen_server).
+
+%% transport_module interface.
+-export([start/3]).
+
+%% gen_tcp-ish interface used by diameter_tcp.
+-export([listen/2,
+ accept/1,
+ connect/3,
+ send/2,
+ close/1,
+ setopts/2,
+ port/1]).
+
+%% child start
+-export([start_link/1]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2]).
+
+%% Server states.
+
+-record(listener,
+ {acceptors = [] :: [pid()]}).
+
+-record(connection,
+ {parent :: pid(),
+ peer :: {connect, reference()} %% {connect, MRef}
+ | accept
+ | pid()}).
+
+%% start/3
+
+%% 'module' option makes diameter_tcp call here instead of gen_tcp/inet.
+start(T, Svc, Opts)
+ when is_list(Opts) ->
+ diameter_etcp_sup:start(),
+ diameter_tcp:start(T, Svc, [{module, ?MODULE} | Opts]).
+
+%% listen/2
+%%
+%% Spawn a process that represents the listening socket. The local
+%% port number can be any term, not just an integer. The listener
+%% process registers its host/port with diameter_reg and this is the
+%% handle with which connect/3 finds the appropriate listening
+%% process.
+
+listen(LPort, Opts) ->
+ Parent = self(),
+ diameter_etcp_sup:start_child({listen, Parent, LPort, Opts}).
+
+%% accept/1
+%%
+%% Output: pid()
+
+accept(LPid) ->
+ start(fun(Ref, Parent) -> acceptor(LPid, Ref, Parent) end).
+
+%% connect/3
+%%
+%% Output: pid()
+
+%% RAddr here can either be a 4/8-tuple address or {Node, Addr}.
+connect(RAddr, RPort, _Opts) ->
+ start(fun(Ref, Parent) -> connector(RAddr, RPort, Ref, Parent) end).
+
+%% send/2
+
+send(Pid, Bin) ->
+ Pid ! {send, Bin},
+ ok.
+
+%% close/1
+
+close(Pid) ->
+ Pid ! close,
+ monitor(Pid),
+ receive {'DOWN', _, process, Pid, _} -> ok end.
+
+%% setopts/2
+
+setopts(_, _) ->
+ ok.
+
+%% port/1
+
+port(_) ->
+ 3868. %% We have no local port: fake it.
+
+%% start_link/1
+
+start_link(T) ->
+ gen_server:start_link(?MODULE, T, []).
+
+%% ---------------------------------------------------------------------------
+%% # init/1
+%% ---------------------------------------------------------------------------
+
+%% Maintain a list of acceptor pids as the process state. Each accept
+%% adds a pid to the list, each connect removes one.
+init({listen, Parent, LPort, Opts}) ->
+ monitor(Parent),
+ {ip, LAddr} = lists:keyfind(ip, 1, Opts),
+ true = diameter_reg:add_new({?MODULE, listener, LAddr, LPort}),
+ {ok, #listener{}};
+
+init({connect, Fun, Ref, Parent}) ->
+ {ok, #connection{parent = Parent,
+ peer = Fun(Ref, Parent)}}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_call/3
+%% ---------------------------------------------------------------------------
+
+handle_call(_, _, State) ->
+ {reply, nok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_cast/2
+%% ---------------------------------------------------------------------------
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_info/2
+%% ---------------------------------------------------------------------------
+
+handle_info(T, #listener{acceptors = L} = S) ->
+ {noreply, S#listener{acceptors = l(T,L)}};
+
+handle_info(T, State) ->
+ {noreply, transition(T, State)}.
+
+%% ---------------------------------------------------------------------------
+%% # code_change/3
+%% ---------------------------------------------------------------------------
+
+code_change(_, State, _) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # terminate/2
+%% ---------------------------------------------------------------------------
+
+terminate(_, _) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+
+monitor(Pid) ->
+ erlang:monitor(process, Pid).
+
+putr(Key, Val) ->
+ put({?MODULE, Key}, Val).
+
+eraser(Key) ->
+ erase({?MODULE, Key}).
+
+%% l/2
+
+l({'DOWN', _, process, _, _} = T, _) ->
+ x(T);
+
+%% New accepting process.
+l({accept, APid}, As) ->
+ As ++ [APid];
+
+%% Peer wants to connect but we have no acceptor ...
+l({connect, Peer}, [] = As) ->
+ Peer ! {refused, self()},
+ As;
+
+%% ... or we do.
+l({connect, Peer}, [APid | Rest]) ->
+ Peer ! {accepted, APid},
+ Rest.
+
+x(T) ->
+ exit({shutdown, T}).
+
+%% start/1
+
+start(Fun) ->
+ Ref = make_ref(),
+ {ok, Pid}
+ = T
+ = diameter_etcp_sup:start_child({connect, Fun, Ref, self()}),
+ MRef = monitor(Pid),
+ receive
+ {ok, Ref} ->
+ T;
+ {'DOWN', MRef, process, _, Reason} ->
+ {error, Reason}
+ end.
+
+%% acceptor/3
+
+acceptor(LPid, Ref, Parent) ->
+ LPid ! {accept, self()}, %% announce that we're accepting
+ putr(ref, {ok, Ref}),
+ monitor(Parent),
+ monitor(LPid),
+ accept.
+
+%% connector/4
+
+connector(RAddr, RPort, Ref, Parent) ->
+ c(match(RAddr, RPort), Ref, Parent).
+
+c([], _, _) ->
+ x(refused);
+
+c([{_,LPid}], Ref, Parent) ->
+ LPid ! {connect, self()},
+ putr(ref, {ok, Ref}),
+ monitor(Parent),
+ {connect, monitor(LPid)}.
+
+match({Node, RAddr}, RPort) ->
+ rpc:call(Node, diameter_reg, match, [{?MODULE, listener, RAddr, RPort}]);
+
+match(RAddr, RPort) ->
+ match({node(), RAddr}, RPort).
+
+%% transition/2
+
+%% Unexpected parent or peer death.
+transition({'DOWN', _, process, _, _} = T, S) ->
+ element(2,S) ! {tcp_error, self(), T},
+ x(T);
+
+%% Connector is receiving acceptor pid from listener.
+transition({accepted, Peer}, #connection{parent = Parent,
+ peer = {connect, MRef}}) ->
+ monitor(Peer),
+ erlang:demonitor(MRef, [flush]),
+ Peer ! {connect, self()},
+ Parent ! {ok, _} = eraser(ref),
+ #connection{parent = Parent,
+ peer = Peer};
+
+%% Connector is receiving connection refusal from listener.
+transition({refused, _} = T, #connection{peer = {connect, _}}) ->
+ x(T);
+
+%% Acceptor is receiving peer connect.
+transition({connect, Peer}, #connection{parent = Parent,
+ peer = accept}) ->
+ monitor(Peer),
+ Parent ! {ok, _} = eraser(ref),
+ #connection{parent = Parent,
+ peer = Peer};
+
+%% Incoming message.
+transition({recv, Bin}, #connection{parent = Parent} = S) ->
+ Parent ! {tcp, self(), Bin},
+ S;
+
+%% Outgoing message.
+transition({send, Bin}, #connection{peer = Peer} = S) ->
+ Peer ! {recv, Bin},
+ S;
+
+%% diameter_etcp:close/1 call when a peer is connected ...
+transition(close = T, #connection{peer = Peer})
+ when is_pid(Peer) ->
+ Peer ! {close, self()},
+ x(T);
+
+%% ... or not.
+transition(close = T, #connection{}) ->
+ x(T);
+
+%% Peer is closing the connection.
+transition({close, Peer} = T, #connection{parent = Parent,
+ peer = Peer})
+ when is_pid(Peer) ->
+ Parent ! {tcp_closed, self()},
+ x(T).