%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2010-2011. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% %% %% This module implements a transport_module that uses Erlang message %% passing for transport. %% -module(diameter_etcp). -behaviour(gen_server). %% transport_module interface. -export([start/3]). %% gen_tcp-ish interface used by diameter_tcp. -export([listen/2, accept/1, connect/3, send/2, close/1, setopts/2, port/1]). %% child start -export([start_link/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). %% Server states. -record(listener, {acceptors = [] :: [pid()]}). -record(connection, {parent :: pid(), peer :: {connect, reference()} %% {connect, MRef} | accept | pid()}). %% start/3 %% 'module' option makes diameter_tcp call here instead of gen_tcp/inet. start(T, Svc, Opts) when is_list(Opts) -> diameter_etcp_sup:start(), diameter_tcp:start(T, Svc, [{module, ?MODULE} | Opts]). %% listen/2 %% %% Spawn a process that represents the listening socket. The local %% port number can be any term, not just an integer. The listener %% process registers its host/port with diameter_reg and this is the %% handle with which connect/3 finds the appropriate listening %% process. listen(LPort, Opts) -> Parent = self(), diameter_etcp_sup:start_child({listen, Parent, LPort, Opts}). %% accept/1 %% %% Output: pid() accept(LPid) -> start(fun(Ref, Parent) -> acceptor(LPid, Ref, Parent) end). %% connect/3 %% %% Output: pid() %% RAddr here can either be a 4/8-tuple address or {Node, Addr}. connect(RAddr, RPort, _Opts) -> start(fun(Ref, Parent) -> connector(RAddr, RPort, Ref, Parent) end). %% send/2 send(Pid, Bin) -> Pid ! {send, Bin}, ok. %% close/1 close(Pid) -> Pid ! close, monitor(Pid), receive {'DOWN', _, process, Pid, _} -> ok end. %% setopts/2 setopts(_, _) -> ok. %% port/1 port(_) -> 3868. %% We have no local port: fake it. %% start_link/1 start_link(T) -> gen_server:start_link(?MODULE, T, []). %% --------------------------------------------------------------------------- %% # init/1 %% --------------------------------------------------------------------------- %% Maintain a list of acceptor pids as the process state. Each accept %% adds a pid to the list, each connect removes one. init({listen, Parent, LPort, Opts}) -> monitor(Parent), {ip, LAddr} = lists:keyfind(ip, 1, Opts), true = diameter_reg:add_new({?MODULE, listener, LAddr, LPort}), {ok, #listener{}}; init({connect, Fun, Ref, Parent}) -> {ok, #connection{parent = Parent, peer = Fun(Ref, Parent)}}. %% --------------------------------------------------------------------------- %% # handle_call/3 %% --------------------------------------------------------------------------- handle_call(_, _, State) -> {reply, nok, State}. %% --------------------------------------------------------------------------- %% # handle_cast/2 %% --------------------------------------------------------------------------- handle_cast(_, State) -> {noreply, State}. %% --------------------------------------------------------------------------- %% # handle_info/2 %% --------------------------------------------------------------------------- handle_info(T, #listener{acceptors = L} = S) -> {noreply, S#listener{acceptors = l(T,L)}}; handle_info(T, State) -> {noreply, transition(T, State)}. %% --------------------------------------------------------------------------- %% # code_change/3 %% --------------------------------------------------------------------------- code_change(_, State, _) -> {ok, State}. %% --------------------------------------------------------------------------- %% # terminate/2 %% --------------------------------------------------------------------------- terminate(_, _) -> ok. %% --------------------------------------------------------------------------- monitor(Pid) -> erlang:monitor(process, Pid). putr(Key, Val) -> put({?MODULE, Key}, Val). eraser(Key) -> erase({?MODULE, Key}). %% l/2 l({'DOWN', _, process, _, _} = T, _) -> x(T); %% New accepting process. l({accept, APid}, As) -> As ++ [APid]; %% Peer wants to connect but we have no acceptor ... l({connect, Peer}, [] = As) -> Peer ! {refused, self()}, As; %% ... or we do. l({connect, Peer}, [APid | Rest]) -> Peer ! {accepted, APid}, Rest. x(T) -> exit({shutdown, T}). %% start/1 start(Fun) -> Ref = make_ref(), {ok, Pid} = T = diameter_etcp_sup:start_child({connect, Fun, Ref, self()}), MRef = monitor(Pid), receive {ok, Ref} -> T; {'DOWN', MRef, process, _, Reason} -> {error, Reason} end. %% acceptor/3 acceptor(LPid, Ref, Parent) -> LPid ! {accept, self()}, %% announce that we're accepting putr(ref, {ok, Ref}), monitor(Parent), monitor(LPid), accept. %% connector/4 connector(RAddr, RPort, Ref, Parent) -> c(match(RAddr, RPort), Ref, Parent). c([], _, _) -> x(refused); c([{_,LPid}], Ref, Parent) -> LPid ! {connect, self()}, putr(ref, {ok, Ref}), monitor(Parent), {connect, monitor(LPid)}. match({Node, RAddr}, RPort) -> rpc:call(Node, diameter_reg, match, [{?MODULE, listener, RAddr, RPort}]); match(RAddr, RPort) -> match({node(), RAddr}, RPort). %% transition/2 %% Unexpected parent or peer death. transition({'DOWN', _, process, _, _} = T, S) -> element(2,S) ! {tcp_error, self(), T}, x(T); %% Connector is receiving acceptor pid from listener. transition({accepted, Peer}, #connection{parent = Parent, peer = {connect, MRef}}) -> monitor(Peer), erlang:demonitor(MRef, [flush]), Peer ! {connect, self()}, Parent ! {ok, _} = eraser(ref), #connection{parent = Parent, peer = Peer}; %% Connector is receiving connection refusal from listener. transition({refused, _} = T, #connection{peer = {connect, _}}) -> x(T); %% Acceptor is receiving peer connect. transition({connect, Peer}, #connection{parent = Parent, peer = accept}) -> monitor(Peer), Parent ! {ok, _} = eraser(ref), #connection{parent = Parent, peer = Peer}; %% Incoming message. transition({recv, Bin}, #connection{parent = Parent} = S) -> Parent ! {tcp, self(), Bin}, S; %% Outgoing message. transition({send, Bin}, #connection{peer = Peer} = S) -> Peer ! {recv, Bin}, S; %% diameter_etcp:close/1 call when a peer is connected ... transition(close = T, #connection{peer = Peer}) when is_pid(Peer) -> Peer ! {close, self()}, x(T); %% ... or not. transition(close = T, #connection{}) -> x(T); %% Peer is closing the connection. transition({close, Peer} = T, #connection{parent = Parent, peer = Peer}) when is_pid(Peer) -> Parent ! {tcp_closed, self()}, x(T).