diff options
Diffstat (limited to 'lib/diameter/src/transport/diameter_etcp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_etcp.erl | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/lib/diameter/src/transport/diameter_etcp.erl b/lib/diameter/src/transport/diameter_etcp.erl new file mode 100644 index 0000000000..d925d62545 --- /dev/null +++ b/lib/diameter/src/transport/diameter_etcp.erl @@ -0,0 +1,311 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% This module implements a transport_module that uses Erlang message +%% passing for transport. +%% + +-module(diameter_etcp). + +-behaviour(gen_server). + +%% transport_module interface. +-export([start/3]). + +%% gen_tcp-ish interface used by diameter_tcp. +-export([listen/2, + accept/1, + connect/3, + send/2, + close/1, + setopts/2, + port/1]). + +%% child start +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2]). + +%% Server states. + +-record(listener, + {acceptors = [] :: [pid()]}). + +-record(connection, + {parent :: pid(), + peer :: {connect, reference()} %% {connect, MRef} + | accept + | pid()}). + +%% start/3 + +%% 'module' option makes diameter_tcp call here instead of gen_tcp/inet. +start(T, Svc, Opts) + when is_list(Opts) -> + diameter_etcp_sup:start(), + diameter_tcp:start(T, Svc, [{module, ?MODULE} | Opts]). + +%% listen/2 +%% +%% Spawn a process that represents the listening socket. The local +%% port number can be any term, not just an integer. The listener +%% process registers its host/port with diameter_reg and this is the +%% handle with which connect/3 finds the appropriate listening +%% process. + +listen(LPort, Opts) -> + Parent = self(), + diameter_etcp_sup:start_child({listen, Parent, LPort, Opts}). + +%% accept/1 +%% +%% Output: pid() + +accept(LPid) -> + start(fun(Ref, Parent) -> acceptor(LPid, Ref, Parent) end). + +%% connect/3 +%% +%% Output: pid() + +%% RAddr here can either be a 4/8-tuple address or {Node, Addr}. +connect(RAddr, RPort, _Opts) -> + start(fun(Ref, Parent) -> connector(RAddr, RPort, Ref, Parent) end). + +%% send/2 + +send(Pid, Bin) -> + Pid ! {send, Bin}, + ok. + +%% close/1 + +close(Pid) -> + Pid ! close, + monitor(Pid), + receive {'DOWN', _, process, Pid, _} -> ok end. + +%% setopts/2 + +setopts(_, _) -> + ok. + +%% port/1 + +port(_) -> + 3868. %% We have no local port: fake it. + +%% start_link/1 + +start_link(T) -> + gen_server:start_link(?MODULE, T, []). + +%% --------------------------------------------------------------------------- +%% # init/1 +%% --------------------------------------------------------------------------- + +%% Maintain a list of acceptor pids as the process state. Each accept +%% adds a pid to the list, each connect removes one. +init({listen, Parent, LPort, Opts}) -> + monitor(Parent), + {ip, LAddr} = lists:keyfind(ip, 1, Opts), + true = diameter_reg:add_new({?MODULE, listener, LAddr, LPort}), + {ok, #listener{}}; + +init({connect, Fun, Ref, Parent}) -> + {ok, #connection{parent = Parent, + peer = Fun(Ref, Parent)}}. + +%% --------------------------------------------------------------------------- +%% # handle_call/3 +%% --------------------------------------------------------------------------- + +handle_call(_, _, State) -> + {reply, nok, State}. + +%% --------------------------------------------------------------------------- +%% # handle_cast/2 +%% --------------------------------------------------------------------------- + +handle_cast(_, State) -> + {noreply, State}. + +%% --------------------------------------------------------------------------- +%% # handle_info/2 +%% --------------------------------------------------------------------------- + +handle_info(T, #listener{acceptors = L} = S) -> + {noreply, S#listener{acceptors = l(T,L)}}; + +handle_info(T, State) -> + {noreply, transition(T, State)}. + +%% --------------------------------------------------------------------------- +%% # code_change/3 +%% --------------------------------------------------------------------------- + +code_change(_, State, _) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% # terminate/2 +%% --------------------------------------------------------------------------- + +terminate(_, _) -> + ok. + +%% --------------------------------------------------------------------------- + +monitor(Pid) -> + erlang:monitor(process, Pid). + +putr(Key, Val) -> + put({?MODULE, Key}, Val). + +eraser(Key) -> + erase({?MODULE, Key}). + +%% l/2 + +l({'DOWN', _, process, _, _} = T, _) -> + x(T); + +%% New accepting process. +l({accept, APid}, As) -> + As ++ [APid]; + +%% Peer wants to connect but we have no acceptor ... +l({connect, Peer}, [] = As) -> + Peer ! {refused, self()}, + As; + +%% ... or we do. +l({connect, Peer}, [APid | Rest]) -> + Peer ! {accepted, APid}, + Rest. + +x(T) -> + exit({shutdown, T}). + +%% start/1 + +start(Fun) -> + Ref = make_ref(), + {ok, Pid} + = T + = diameter_etcp_sup:start_child({connect, Fun, Ref, self()}), + MRef = monitor(Pid), + receive + {ok, Ref} -> + T; + {'DOWN', MRef, process, _, Reason} -> + {error, Reason} + end. + +%% acceptor/3 + +acceptor(LPid, Ref, Parent) -> + LPid ! {accept, self()}, %% announce that we're accepting + putr(ref, {ok, Ref}), + monitor(Parent), + monitor(LPid), + accept. + +%% connector/4 + +connector(RAddr, RPort, Ref, Parent) -> + c(match(RAddr, RPort), Ref, Parent). + +c([], _, _) -> + x(refused); + +c([{_,LPid}], Ref, Parent) -> + LPid ! {connect, self()}, + putr(ref, {ok, Ref}), + monitor(Parent), + {connect, monitor(LPid)}. + +match({Node, RAddr}, RPort) -> + rpc:call(Node, diameter_reg, match, [{?MODULE, listener, RAddr, RPort}]); + +match(RAddr, RPort) -> + match({node(), RAddr}, RPort). + +%% transition/2 + +%% Unexpected parent or peer death. +transition({'DOWN', _, process, _, _} = T, S) -> + element(2,S) ! {tcp_error, self(), T}, + x(T); + +%% Connector is receiving acceptor pid from listener. +transition({accepted, Peer}, #connection{parent = Parent, + peer = {connect, MRef}}) -> + monitor(Peer), + erlang:demonitor(MRef, [flush]), + Peer ! {connect, self()}, + Parent ! {ok, _} = eraser(ref), + #connection{parent = Parent, + peer = Peer}; + +%% Connector is receiving connection refusal from listener. +transition({refused, _} = T, #connection{peer = {connect, _}}) -> + x(T); + +%% Acceptor is receiving peer connect. +transition({connect, Peer}, #connection{parent = Parent, + peer = accept}) -> + monitor(Peer), + Parent ! {ok, _} = eraser(ref), + #connection{parent = Parent, + peer = Peer}; + +%% Incoming message. +transition({recv, Bin}, #connection{parent = Parent} = S) -> + Parent ! {tcp, self(), Bin}, + S; + +%% Outgoing message. +transition({send, Bin}, #connection{peer = Peer} = S) -> + Peer ! {recv, Bin}, + S; + +%% diameter_etcp:close/1 call when a peer is connected ... +transition(close = T, #connection{peer = Peer}) + when is_pid(Peer) -> + Peer ! {close, self()}, + x(T); + +%% ... or not. +transition(close = T, #connection{}) -> + x(T); + +%% Peer is closing the connection. +transition({close, Peer} = T, #connection{parent = Parent, + peer = Peer}) + when is_pid(Peer) -> + Parent ! {tcp_closed, self()}, + x(T). |