aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2011-05-18 18:58:01 +0200
committerAnders Svensson <[email protected]>2011-05-18 18:58:01 +0200
commit1756ed583f24ba2206a0f573635a6fa3cdea5c54 (patch)
treec762a6e700f09fb580f8c9945fd6886ecc2c9923 /lib/diameter/src/transport
parente993da4426a76bb172290a10999267d3023120d5 (diff)
parent3c15ff32e89e401b4dde2b8acc9699be2614b996 (diff)
downloadotp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.tar.gz
otp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.tar.bz2
otp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.zip
Merge branch 'anders/diameter_import/OTP-9321' into dev
* anders/diameter_import/OTP-9321: Initial commit of the diameter application.
Diffstat (limited to 'lib/diameter/src/transport')
-rw-r--r--lib/diameter/src/transport/.gitignore3
-rw-r--r--lib/diameter/src/transport/Makefile141
-rw-r--r--lib/diameter/src/transport/diameter_etcp.erl311
-rw-r--r--lib/diameter/src/transport/diameter_etcp_sup.erl64
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl624
-rw-r--r--lib/diameter/src/transport/diameter_sctp_sup.erl74
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl531
-rw-r--r--lib/diameter/src/transport/diameter_tcp_sup.erl78
-rw-r--r--lib/diameter/src/transport/diameter_transport_sup.erl68
-rw-r--r--lib/diameter/src/transport/modules.mk29
10 files changed, 1923 insertions, 0 deletions
diff --git a/lib/diameter/src/transport/.gitignore b/lib/diameter/src/transport/.gitignore
new file mode 100644
index 0000000000..d9f072e262
--- /dev/null
+++ b/lib/diameter/src/transport/.gitignore
@@ -0,0 +1,3 @@
+
+/depend.mk
+
diff --git a/lib/diameter/src/transport/Makefile b/lib/diameter/src/transport/Makefile
new file mode 100644
index 0000000000..5dc1772796
--- /dev/null
+++ b/lib/diameter/src/transport/Makefile
@@ -0,0 +1,141 @@
+#
+# %CopyrightBegin%
+#
+# Copyright Ericsson AB 2010-2011. All Rights Reserved.
+#
+# The contents of this file are subject to the Erlang Public License,
+# Version 1.1, (the "License"); you may not use this file except in
+# compliance with the License. You should have received a copy of the
+# Erlang Public License along with this software. If not, it can be
+# retrieved online at http://www.erlang.org/.
+#
+# Software distributed under the License is distributed on an "AS IS"
+# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+# the License for the specific language governing rights and limitations
+# under the License.
+#
+# %CopyrightEnd%
+#
+#
+
+ifneq ($(ERL_TOP),)
+include $(ERL_TOP)/make/target.mk
+EBIN = ../../ebin
+include $(ERL_TOP)/make/$(TARGET)/otp.mk
+else
+include $(DIAMETER_TOP)/make/target.mk
+EBIN = ../../ebin
+include $(DIAMETER_TOP)/make/$(TARGET)/rules.mk
+endif
+
+
+# ----------------------------------------------------
+# Application version
+# ----------------------------------------------------
+
+include ../../vsn.mk
+VSN=$(DIAMETER_VSN)
+
+# ----------------------------------------------------
+# Release directory specification
+# ----------------------------------------------------
+
+RELSYSDIR = $(RELEASE_PATH)/lib/diameter-$(VSN)
+
+INCDIR = ../../include
+
+# ----------------------------------------------------
+# Target Specs
+# ----------------------------------------------------
+
+include modules.mk
+
+ERL_FILES = \
+ $(MODULES:%=%.erl)
+
+TARGET_FILES = \
+ $(MODULES:%=$(EBIN)/%.$(EMULATOR))
+
+# ----------------------------------------------------
+# FLAGS
+# ----------------------------------------------------
+
+ifeq ($(TYPE),debug)
+ERL_COMPILE_FLAGS += -Ddebug
+endif
+
+include ../app/diameter.mk
+
+ERL_COMPILE_FLAGS += \
+ $(DIAMETER_ERL_COMPILE_FLAGS) \
+ -I$(INCDIR)
+
+# ----------------------------------------------------
+# Targets
+# ----------------------------------------------------
+
+debug:
+ @${MAKE} TYPE=debug opt
+
+opt: $(TARGET_FILES)
+
+clean:
+ rm -f $(TARGET_FILES)
+ rm -f errs core *~
+ rm -f depend.mk
+
+docs:
+
+info:
+ @echo ""
+ @echo "ERL_FILES = $(ERL_FILES)"
+ @echo "HRL_FILES = $(HRL_FILES)"
+ @echo ""
+ @echo "TARGET_FILES = $(TARGET_FILES)"
+ @echo ""
+
+# ----------------------------------------------------
+# Special Build Targets
+# ----------------------------------------------------
+
+# Invoked from ../app to add modules to the app file.
+$(APP_TARGET): force
+ M=`echo $(MODULES) | sed -e 's/^ *//' -e 's/ *$$//' -e 'y/ /,/'`; \
+ echo "/%TRANSPORT_MODULES%/s//$$M/;w;q" | tr ';' '\n' \
+ | ed -s $@
+
+# ----------------------------------------------------
+# Release Target
+# ----------------------------------------------------
+ifneq ($(ERL_TOP),)
+include $(ERL_TOP)/make/otp_release_targets.mk
+else
+include $(DIAMETER_TOP)/make/release_targets.mk
+endif
+
+release_spec: opt
+ $(INSTALL_DIR) $(RELSYSDIR)/ebin
+ $(INSTALL_DATA) $(TARGET_FILES) $(RELSYSDIR)/ebin
+ $(INSTALL_DIR) $(RELSYSDIR)/src/transport
+ $(INSTALL_DATA) $(ERL_FILES) $(HRL_FILES) $(RELSYSDIR)/src/transport
+
+release_docs_spec:
+
+force:
+
+# ----------------------------------------------------
+# Dependencies
+# ----------------------------------------------------
+
+depend: depend.mk
+
+# Generate dependencies makefile.
+depend.mk: ../app/depend.sed $(ERL_FILES) modules.mk Makefile
+ for f in $(MODULES); do \
+ sed -f $< $$f.erl | sed "s@/@/$$f@"; \
+ done \
+ > $@
+
+-include depend.mk
+
+.PHONY: clean debug depend docs force info opt release_docs_spec release_spec
diff --git a/lib/diameter/src/transport/diameter_etcp.erl b/lib/diameter/src/transport/diameter_etcp.erl
new file mode 100644
index 0000000000..d925d62545
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_etcp.erl
@@ -0,0 +1,311 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% This module implements a transport_module that uses Erlang message
+%% passing for transport.
+%%
+
+-module(diameter_etcp).
+
+-behaviour(gen_server).
+
+%% transport_module interface.
+-export([start/3]).
+
+%% gen_tcp-ish interface used by diameter_tcp.
+-export([listen/2,
+ accept/1,
+ connect/3,
+ send/2,
+ close/1,
+ setopts/2,
+ port/1]).
+
+%% child start
+-export([start_link/1]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2]).
+
+%% Server states.
+
+-record(listener,
+ {acceptors = [] :: [pid()]}).
+
+-record(connection,
+ {parent :: pid(),
+ peer :: {connect, reference()} %% {connect, MRef}
+ | accept
+ | pid()}).
+
+%% start/3
+
+%% 'module' option makes diameter_tcp call here instead of gen_tcp/inet.
+start(T, Svc, Opts)
+ when is_list(Opts) ->
+ diameter_etcp_sup:start(),
+ diameter_tcp:start(T, Svc, [{module, ?MODULE} | Opts]).
+
+%% listen/2
+%%
+%% Spawn a process that represents the listening socket. The local
+%% port number can be any term, not just an integer. The listener
+%% process registers its host/port with diameter_reg and this is the
+%% handle with which connect/3 finds the appropriate listening
+%% process.
+
+listen(LPort, Opts) ->
+ Parent = self(),
+ diameter_etcp_sup:start_child({listen, Parent, LPort, Opts}).
+
+%% accept/1
+%%
+%% Output: pid()
+
+accept(LPid) ->
+ start(fun(Ref, Parent) -> acceptor(LPid, Ref, Parent) end).
+
+%% connect/3
+%%
+%% Output: pid()
+
+%% RAddr here can either be a 4/8-tuple address or {Node, Addr}.
+connect(RAddr, RPort, _Opts) ->
+ start(fun(Ref, Parent) -> connector(RAddr, RPort, Ref, Parent) end).
+
+%% send/2
+
+send(Pid, Bin) ->
+ Pid ! {send, Bin},
+ ok.
+
+%% close/1
+
+close(Pid) ->
+ Pid ! close,
+ monitor(Pid),
+ receive {'DOWN', _, process, Pid, _} -> ok end.
+
+%% setopts/2
+
+setopts(_, _) ->
+ ok.
+
+%% port/1
+
+port(_) ->
+ 3868. %% We have no local port: fake it.
+
+%% start_link/1
+
+start_link(T) ->
+ gen_server:start_link(?MODULE, T, []).
+
+%% ---------------------------------------------------------------------------
+%% # init/1
+%% ---------------------------------------------------------------------------
+
+%% Maintain a list of acceptor pids as the process state. Each accept
+%% adds a pid to the list, each connect removes one.
+init({listen, Parent, LPort, Opts}) ->
+ monitor(Parent),
+ {ip, LAddr} = lists:keyfind(ip, 1, Opts),
+ true = diameter_reg:add_new({?MODULE, listener, LAddr, LPort}),
+ {ok, #listener{}};
+
+init({connect, Fun, Ref, Parent}) ->
+ {ok, #connection{parent = Parent,
+ peer = Fun(Ref, Parent)}}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_call/3
+%% ---------------------------------------------------------------------------
+
+handle_call(_, _, State) ->
+ {reply, nok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_cast/2
+%% ---------------------------------------------------------------------------
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_info/2
+%% ---------------------------------------------------------------------------
+
+handle_info(T, #listener{acceptors = L} = S) ->
+ {noreply, S#listener{acceptors = l(T,L)}};
+
+handle_info(T, State) ->
+ {noreply, transition(T, State)}.
+
+%% ---------------------------------------------------------------------------
+%% # code_change/3
+%% ---------------------------------------------------------------------------
+
+code_change(_, State, _) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # terminate/2
+%% ---------------------------------------------------------------------------
+
+terminate(_, _) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+
+monitor(Pid) ->
+ erlang:monitor(process, Pid).
+
+putr(Key, Val) ->
+ put({?MODULE, Key}, Val).
+
+eraser(Key) ->
+ erase({?MODULE, Key}).
+
+%% l/2
+
+l({'DOWN', _, process, _, _} = T, _) ->
+ x(T);
+
+%% New accepting process.
+l({accept, APid}, As) ->
+ As ++ [APid];
+
+%% Peer wants to connect but we have no acceptor ...
+l({connect, Peer}, [] = As) ->
+ Peer ! {refused, self()},
+ As;
+
+%% ... or we do.
+l({connect, Peer}, [APid | Rest]) ->
+ Peer ! {accepted, APid},
+ Rest.
+
+x(T) ->
+ exit({shutdown, T}).
+
+%% start/1
+
+start(Fun) ->
+ Ref = make_ref(),
+ {ok, Pid}
+ = T
+ = diameter_etcp_sup:start_child({connect, Fun, Ref, self()}),
+ MRef = monitor(Pid),
+ receive
+ {ok, Ref} ->
+ T;
+ {'DOWN', MRef, process, _, Reason} ->
+ {error, Reason}
+ end.
+
+%% acceptor/3
+
+acceptor(LPid, Ref, Parent) ->
+ LPid ! {accept, self()}, %% announce that we're accepting
+ putr(ref, {ok, Ref}),
+ monitor(Parent),
+ monitor(LPid),
+ accept.
+
+%% connector/4
+
+connector(RAddr, RPort, Ref, Parent) ->
+ c(match(RAddr, RPort), Ref, Parent).
+
+c([], _, _) ->
+ x(refused);
+
+c([{_,LPid}], Ref, Parent) ->
+ LPid ! {connect, self()},
+ putr(ref, {ok, Ref}),
+ monitor(Parent),
+ {connect, monitor(LPid)}.
+
+match({Node, RAddr}, RPort) ->
+ rpc:call(Node, diameter_reg, match, [{?MODULE, listener, RAddr, RPort}]);
+
+match(RAddr, RPort) ->
+ match({node(), RAddr}, RPort).
+
+%% transition/2
+
+%% Unexpected parent or peer death.
+transition({'DOWN', _, process, _, _} = T, S) ->
+ element(2,S) ! {tcp_error, self(), T},
+ x(T);
+
+%% Connector is receiving acceptor pid from listener.
+transition({accepted, Peer}, #connection{parent = Parent,
+ peer = {connect, MRef}}) ->
+ monitor(Peer),
+ erlang:demonitor(MRef, [flush]),
+ Peer ! {connect, self()},
+ Parent ! {ok, _} = eraser(ref),
+ #connection{parent = Parent,
+ peer = Peer};
+
+%% Connector is receiving connection refusal from listener.
+transition({refused, _} = T, #connection{peer = {connect, _}}) ->
+ x(T);
+
+%% Acceptor is receiving peer connect.
+transition({connect, Peer}, #connection{parent = Parent,
+ peer = accept}) ->
+ monitor(Peer),
+ Parent ! {ok, _} = eraser(ref),
+ #connection{parent = Parent,
+ peer = Peer};
+
+%% Incoming message.
+transition({recv, Bin}, #connection{parent = Parent} = S) ->
+ Parent ! {tcp, self(), Bin},
+ S;
+
+%% Outgoing message.
+transition({send, Bin}, #connection{peer = Peer} = S) ->
+ Peer ! {recv, Bin},
+ S;
+
+%% diameter_etcp:close/1 call when a peer is connected ...
+transition(close = T, #connection{peer = Peer})
+ when is_pid(Peer) ->
+ Peer ! {close, self()},
+ x(T);
+
+%% ... or not.
+transition(close = T, #connection{}) ->
+ x(T);
+
+%% Peer is closing the connection.
+transition({close, Peer} = T, #connection{parent = Parent,
+ peer = Peer})
+ when is_pid(Peer) ->
+ Parent ! {tcp_closed, self()},
+ x(T).
diff --git a/lib/diameter/src/transport/diameter_etcp_sup.erl b/lib/diameter/src/transport/diameter_etcp_sup.erl
new file mode 100644
index 0000000000..bd089cf041
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_etcp_sup.erl
@@ -0,0 +1,64 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(diameter_etcp_sup).
+
+-behaviour(supervisor).
+
+%% interface
+-export([start/0,
+ start_child/1]).
+
+%% internal exports
+-export([start_link/1,
+ init/1]).
+
+%% start/0
+%%
+%% Start the etcp top supervisor.
+
+start() ->
+ diameter_transport_sup:start_child(?MODULE, ?MODULE).
+
+%% start_child/1
+%%
+%% Start a worker under the etcp supervisor.
+
+start_child(T) ->
+ supervisor:start_child(?MODULE, [T]).
+
+%% start_link/1
+%%
+%% Callback from diameter_transport_sup as a result of start/0.
+%% Starts a child supervisor under the transport supervisor.
+
+start_link(?MODULE) ->
+ SupName = {local, ?MODULE},
+ supervisor:start_link(SupName, ?MODULE, []).
+
+init([]) ->
+ Mod = diameter_etcp,
+ Flags = {simple_one_for_one, 0, 1},
+ ChildSpec = {Mod,
+ {Mod, start_link, []},
+ temporary,
+ 1000,
+ worker,
+ [Mod]},
+ {ok, {Flags, [ChildSpec]}}.
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
new file mode 100644
index 0000000000..92aa8488a0
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -0,0 +1,624 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(diameter_sctp).
+
+-behaviour(gen_server).
+
+%% interface
+-export([start/3]).
+
+%% child start from supervisor
+-export([start_link/1]).
+
+%% child start from here
+-export([init/1]).
+
+%% gen_server callbacks
+-export([handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2]).
+
+-include_lib("kernel/include/inet_sctp.hrl").
+-include_lib("diameter/include/diameter.hrl").
+
+-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
+
+%% The default port for a listener.
+-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
+
+%% How long a listener with no associations lives before offing
+%% itself.
+-define(LISTENER_TIMEOUT, 30000).
+
+%% How long to wait for a transport process to attach after
+%% association establishment.
+-define(ACCEPT_TIMEOUT, 5000).
+
+-type uint() :: non_neg_integer().
+
+%% Accepting/connecting transport process state.
+-record(transport,
+ {parent :: pid(),
+ mode :: {accept, pid()}
+ | {connect, {list(inet:ip_address()), uint(), list()}}
+ %% {RAs, RP, Errors}
+ | connect,
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id(), %% association identifier
+ peer :: {[inet:ip_address()], uint()}, %% {RAs, RP}
+ streams :: {uint(), uint()}, %% {InStream, OutStream} counts
+ os = 0 :: uint()}). %% next output stream
+
+%% Listener process state.
+-record(listener,
+ {ref :: reference(),
+ socket :: gen_sctp:sctp_socket(),
+ count = 0 :: uint(),
+ tmap = ets:new(?MODULE, []) :: ets:tid(),
+ %% {MRef, Pid|AssocId}, {AssocId, Pid}
+ pending = {0, ets:new(?MODULE, [ordered_set])},
+ tref :: reference()}).
+%% Field tmap is used to map an incoming message or event to the
+%% relevent transport process. Field pending implements a queue of
+%% transport processes to which an association has been assigned (at
+%% comm_up and written into tmap) but for which diameter hasn't yet
+%% spawned a transport process: a short-lived state of affairs as a
+%% new transport is spawned as a consequence of a peer being taken up,
+%% transport processes being spawned by the listener on demand. In
+%% case diameter starts a transport before comm_up on a new
+%% association, pending is set to an improper list with the spawned
+%% transport as head and the queue as tail.
+
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
+
+start(T, #diameter_service{capabilities = Caps}, Opts)
+ when is_list(Opts) ->
+ diameter_sctp_sup:start(), %% start supervisors on demand
+ Addrs = Caps#diameter_caps.host_ip_address,
+ s(T, Addrs, lists:map(fun ip/1, Opts)).
+
+ip({ifaddr, A}) ->
+ {ip, A};
+ip(T) ->
+ T.
+
+%% A listener spawns transports either as a consequence of this call
+%% when there is not yet an association to associate with it, or at
+%% comm_up on a new association in which case the call retrieves a
+%% transport from the pending queue.
+s({accept, Ref} = A, Addrs, Opts) ->
+ {LPid, LAs} = listener(Ref, {Opts, Addrs}),
+ try gen_server:call(LPid, {A, self()}, infinity) of
+ {ok, TPid} -> {ok, TPid, LAs}
+ catch
+ exit: Reason -> {error, Reason}
+ end;
+%% This implementation is due to there being no accept call in
+%% gen_sctp in order to be able to accept a new association only
+%% *after* an accepting transport has been spawned.
+
+s({connect = C, _}, Addrs, Opts) ->
+ diameter_sctp_sup:start_child({C, self(), Opts, Addrs}).
+
+%% start_link/1
+
+start_link(T) ->
+ proc_lib:start_link(?MODULE,
+ init,
+ [T],
+ infinity,
+ diameter_lib:spawn_opts(server, [])).
+
+%% ---------------------------------------------------------------------------
+%% # init/1
+%% ---------------------------------------------------------------------------
+
+init(T) ->
+ gen_server:enter_loop(?MODULE, [], i(T)).
+
+%% i/1
+
+%% A process owning a listening socket.
+i({listen, Ref, {Opts, Addrs}}) ->
+ {LAs, Sock} = AS = open(Addrs, Opts, ?DEFAULT_PORT),
+ proc_lib:init_ack({ok, self(), LAs}),
+ ok = gen_sctp:listen(Sock, true),
+ true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
+ start_timer(#listener{ref = Ref,
+ socket = Sock});
+
+%% A connecting transport.
+i({connect, Pid, Opts, Addrs}) ->
+ {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
+ {LAs, Sock} = open(Addrs, Rest, 0),
+ proc_lib:init_ack({ok, self(), LAs}),
+ erlang:monitor(process, Pid),
+ #transport{parent = Pid,
+ mode = {connect, connect(Sock, RAs, RP, [])},
+ socket = Sock};
+
+%% An accepting transport spawned by diameter.
+i({accept, Pid, LPid, Sock}) ->
+ proc_lib:init_ack({ok, self()}),
+ erlang:monitor(process, Pid),
+ erlang:monitor(process, LPid),
+ #transport{parent = Pid,
+ mode = {accept, LPid},
+ socket = Sock};
+
+%% An accepting transport spawned at association establishment.
+i({accept, Ref, LPid, Sock, Id}) ->
+ proc_lib:init_ack({ok, self()}),
+ MRef = erlang:monitor(process, LPid),
+ %% Wait for a signal that the transport has been started before
+ %% processing other messages.
+ receive
+ {Ref, Pid} -> %% transport started
+ #transport{parent = Pid,
+ mode = {accept, LPid},
+ socket = Sock};
+ {'DOWN', MRef, process, _, _} = T -> %% listener down
+ close(Sock, Id),
+ x(T)
+ after ?ACCEPT_TIMEOUT ->
+ close(Sock, Id),
+ x(timeout)
+ end.
+
+%% close/2
+
+close(Sock, Id) ->
+ gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}).
+%% Having to pass a record here is hokey.
+
+%% listener/2
+
+listener(LRef, T) ->
+ l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T).
+
+%% Existing process with the listening socket ...
+l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
+ {LAs, _Sock} = AS,
+ {LPid, LAs};
+
+%% ... or not: start one.
+l([], LRef, T) ->
+ {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}),
+ {LPid, LAs}.
+
+%% open/3
+
+open(Addrs, Opts, PortNr) ->
+ {LAs, Os} = addrs(Addrs, Opts),
+ {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of
+ {ok, Sock} ->
+ Sock;
+ {error, Reason} ->
+ x({open, Reason})
+ end}.
+
+addrs(Addrs, Opts) ->
+ case proplists:split(Opts, [ip]) of
+ {[[]], _} ->
+ {Addrs, Opts ++ [{ip, A} || A <- Addrs]};
+ {[As], Os} ->
+ LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As],
+ {LAs, Os ++ [{ip, A} || A <- LAs]}
+ end.
+
+portnr(Opts, PortNr) ->
+ case proplists:get_value(port, Opts) of
+ undefined ->
+ [{port, PortNr} | Opts];
+ _ ->
+ Opts
+ end.
+
+%% x/1
+
+x(Reason) ->
+ exit({shutdown, Reason}).
+
+%% gen_opts/1
+
+gen_opts(Opts) ->
+ {L,_} = proplists:split(Opts, [binary, list, mode, active, sctp_events]),
+ [[],[],[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
+ [binary, {active, once} | Opts].
+
+%% ---------------------------------------------------------------------------
+%% # handle_call/3
+%% ---------------------------------------------------------------------------
+
+handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
+ count = N}
+ = S) ->
+ {TPid, NewS} = accept(Pid, S),
+ {reply, {ok, TPid}, NewS#listener{count = N+1}};
+
+handle_call(_, _, State) ->
+ {reply, nok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_cast/2
+%% ---------------------------------------------------------------------------
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_info/2
+%% ---------------------------------------------------------------------------
+
+handle_info(T, #transport{} = S) ->
+ {noreply, #transport{} = t(T,S)};
+
+handle_info(T, #listener{} = S) ->
+ {noreply, #listener{} = l(T,S)}.
+
+%% ---------------------------------------------------------------------------
+%% # code_change/3
+%% ---------------------------------------------------------------------------
+
+code_change(_, State, _) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # terminate/2
+%% ---------------------------------------------------------------------------
+
+terminate(_, #transport{assoc_id = undefined}) ->
+ ok;
+
+terminate(_, #transport{socket = Sock,
+ mode = {accept, _},
+ assoc_id = Id}) ->
+ close(Sock, Id);
+
+terminate(_, #transport{socket = Sock}) ->
+ gen_sctp:close(Sock);
+
+terminate(_, #listener{socket = Sock}) ->
+ gen_sctp:close(Sock).
+
+%% ---------------------------------------------------------------------------
+
+%% start_timer/1
+
+start_timer(#listener{count = 0} = S) ->
+ S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)};
+start_timer(S) ->
+ S.
+
+%% l/2
+%%
+%% Transition listener state.
+
+%% Incoming message from SCTP.
+l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
+ setopts(Sock),
+ case find(Data, S) of
+ {TPid, NewS} ->
+ TPid ! Msg,
+ NewS;
+ false ->
+ S
+ end;
+
+%% Transport is asking message to be sent. See send/3 for why the send
+%% isn't directly from the transport.
+l({send, AssocId, StreamId, Bin}, #listener{socket = Sock} = S) ->
+ send(Sock, AssocId, StreamId, Bin),
+ S;
+
+%% Accepting transport has died. One that's awaiting an association ...
+l({'DOWN', MRef, process, TPid, _}, #listener{pending = [TPid | Q],
+ tmap = T,
+ count = N}
+ = S) ->
+ ets:delete(T, MRef),
+ ets:delete(T, TPid),
+ start_timer(S#listener{count = N-1,
+ pending = Q});
+
+%% ... ditto and a new transport has already been started ...
+l({'DOWN', _, process, _, _} = T, #listener{pending = [TPid | Q]}
+ = S) ->
+ #listener{pending = NQ}
+ = NewS
+ = l(T, S#listener{pending = Q}),
+ NewS#listener{pending = [TPid | NQ]};
+
+%% ... or not.
+l({'DOWN', MRef, process, TPid, _}, #listener{socket = Sock,
+ tmap = T,
+ count = N,
+ pending = {P,Q}}
+ = S) ->
+ [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId
+ ets:delete(T, MRef),
+ ets:delete(T, Id),
+ Id == TPid orelse close(Sock, Id),
+ case ets:lookup(Q, TPid) of
+ [{TPid, _}] -> %% transport in the pending queue ...
+ ets:delete(Q, TPid),
+ S#listener{pending = {P-1, Q}};
+ [] -> %% ... or not
+ start_timer(S#listener{count = N-1})
+ end;
+
+%% Timeout after the last accepting process has died.
+l({timeout, TRef, close = T}, #listener{tref = TRef,
+ count = 0}) ->
+ x(T);
+l({timeout, _, close}, #listener{} = S) ->
+ S.
+
+%% t/2
+%%
+%% Transition transport state.
+
+t(T,S) ->
+ case transition(T,S) of
+ ok ->
+ S;
+ #transport{} = NS ->
+ NS;
+ stop ->
+ x(T)
+ end.
+
+%% transition/2
+
+%% Incoming message.
+transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock,
+ mode = {accept, _}}
+ = S) ->
+ recv(Data, S);
+
+transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
+ setopts(Sock),
+ recv(Data, S);
+
+%% Outgoing message.
+transition({diameter, {send, Msg}}, S) ->
+ send(Msg, S);
+
+%% Request to close the transport connection.
+transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
+ stop;
+
+%% Listener process has died.
+transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) ->
+ stop;
+
+%% Parent process has died.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+ stop.
+
+%% Crash on anything unexpected.
+
+%% accept/2
+%%
+%% Start a new transport process or use one that's already been
+%% started as a consequence of association establishment.
+
+%% No pending associations: spawn a new transport.
+accept(Pid, #listener{socket = Sock,
+ tmap = T,
+ pending = {0,_} = Q}
+ = S) ->
+ Arg = {accept, Pid, self(), Sock},
+ {ok, TPid} = diameter_sctp_sup:start_child(Arg),
+ MRef = erlang:monitor(process, TPid),
+ ets:insert(T, [{MRef, TPid}, {TPid, MRef}]),
+ {TPid, S#listener{pending = [TPid | Q]}};
+%% Placing the transport in the pending field makes it available to
+%% the next association. The stack starts a new accepting transport
+%% only after this one brings the connection up (or dies).
+
+%% Accepting transport has died. This can happen if a new transport is
+%% started before the DOWN has arrived.
+accept(Pid, #listener{pending = [TPid | {0,_} = Q]} = S) ->
+ false = is_process_alive(TPid), %% assert
+ accept(Pid, S#listener{pending = Q});
+
+%% Pending associations: attach to the first in the queue.
+accept(Pid, #listener{ref = Ref, pending = {N,Q}} = S) ->
+ TPid = ets:first(Q),
+ TPid ! {Ref, Pid},
+ ets:delete(Q, TPid),
+ {TPid, S#listener{pending = {N-1, Q}}}.
+
+%% send/2
+
+%% Outbound Diameter message on a specified stream ...
+send(#diameter_packet{bin = Bin, transport_data = {stream, SId}}, S) ->
+ send(SId, Bin, S),
+ S;
+
+%% ... or not: rotate through all steams.
+send(Bin, #transport{streams = {_, OS},
+ os = N}
+ = S)
+ when is_binary(Bin) ->
+ send(N, Bin, S),
+ S#transport{os = (N + 1) rem OS}.
+
+%% send/3
+
+%% Messages have to be sent from the controlling process, which is
+%% probably a bug. Sending from here causes an inet_reply, Sock,
+%% Status} message to be sent to the controlling process while
+%% gen_sctp:send/4 here hangs.
+send(StreamId, Bin, #transport{assoc_id = AId,
+ mode = {accept, LPid}}) ->
+ LPid ! {send, AId, StreamId, Bin};
+
+send(StreamId, Bin, #transport{socket = Sock,
+ assoc_id = AId}) ->
+ send(Sock, AId, StreamId, Bin).
+
+%% send/4
+
+send(Sock, AssocId, Stream, Bin) ->
+ case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ x({send, Reason})
+ end.
+
+%% recv/2
+
+%% Association established ...
+recv({[], #sctp_assoc_change{state = comm_up,
+ outbound_streams = OS,
+ inbound_streams = IS,
+ assoc_id = Id}},
+ #transport{assoc_id = undefined}
+ = S) ->
+ up(S#transport{assoc_id = Id,
+ streams = {IS, OS}});
+
+%% ... or not: try the next address.
+recv({[], #sctp_assoc_change{} = E},
+ #transport{assoc_id = undefined,
+ socket = Sock,
+ mode = {connect = C, {[RA|RAs], RP, Es}}}
+ = S) ->
+ S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}};
+
+%% Lost association after establishment.
+recv({[], #sctp_assoc_change{}}, _) ->
+ stop;
+
+%% Inbound Diameter message.
+recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
+ when is_binary(Bin) ->
+ diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id},
+ bin = Bin}),
+ ok;
+
+recv({[], #sctp_shutdown_event{assoc_id = Id}},
+ #transport{assoc_id = Id}) ->
+ stop.
+
+%% up/1
+
+up(#transport{parent = Pid,
+ mode = {connect = C, {[RA | _], RP, _}}}
+ = S) ->
+ diameter_peer:up(Pid, {RA,RP}),
+ S#transport{mode = C};
+
+up(#transport{parent = Pid,
+ mode = {accept, _}}
+ = S) ->
+ diameter_peer:up(Pid),
+ S.
+
+%% find/2
+
+find({[#sctp_sndrcvinfo{assoc_id = Id}], _}
+ = Data,
+ #listener{tmap = T}
+ = S) ->
+ f(ets:lookup(T, Id), Data, S);
+
+find({_, Rec} = Data, #listener{tmap = T} = S) ->
+ f(ets:lookup(T, assoc_id(Rec)), Data, S).
+
+%% New association and a transport waiting for one: use it.
+f([],
+ {_, #sctp_assoc_change{state = comm_up,
+ assoc_id = Id}},
+ #listener{tmap = T,
+ pending = [TPid | {_,_} = Q]}
+ = S) ->
+ [{TPid, MRef}] = ets:lookup(T, TPid),
+ ets:insert(T, [{MRef, Id}, {Id, TPid}]),
+ ets:delete(T, TPid),
+ {TPid, S#listener{pending = Q}};
+
+%% New association and no transport start yet: spawn one and place it
+%% in the queue.
+f([],
+ {_, #sctp_assoc_change{state = comm_up,
+ assoc_id = Id}},
+ #listener{ref = Ref,
+ socket = Sock,
+ tmap = T,
+ pending = {N,Q}}
+ = S) ->
+ Arg = {accept, Ref, self(), Sock, Id},
+ {ok, TPid} = diameter_sctp_sup:start_child(Arg),
+ MRef = erlang:monitor(process, TPid),
+ ets:insert(T, [{MRef, Id}, {Id, TPid}]),
+ ets:insert(Q, {TPid, now()}),
+ {TPid, S#listener{pending = {N+1, Q}}};
+
+%% Known association ...
+f([{_, TPid}], _, S) ->
+ {TPid, S};
+
+%% ... or not: discard.
+f([], _, _) ->
+ false.
+
+%% assoc_id/1
+
+assoc_id(#sctp_shutdown_event{assoc_id = Id}) -> %% undocumented
+ Id;
+assoc_id(#sctp_assoc_change{assoc_id = Id}) ->
+ Id;
+assoc_id(#sctp_sndrcvinfo{assoc_id = Id}) ->
+ Id;
+assoc_id(#sctp_paddr_change{assoc_id = Id}) ->
+ Id;
+assoc_id(#sctp_adaptation_event{assoc_id = Id}) ->
+ Id.
+
+%% connect/4
+
+connect(_, [], _, Reasons) ->
+ x({connect, lists:reverse(Reasons)});
+
+connect(Sock, [Addr | AT] = As, Port, Reasons) ->
+ case gen_sctp:connect_init(Sock, Addr, Port, []) of
+ ok ->
+ {As, Port, Reasons};
+ {error, _} = E ->
+ connect(Sock, AT, Port, [{Addr, E} | Reasons])
+ end.
+
+%% setopts/1
+
+setopts(Sock) ->
+ case inet:setopts(Sock, [{active, once}]) of
+ ok -> ok;
+ X -> x({setopts, Sock, X}) %% possibly on peer disconnect
+ end.
diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl
new file mode 100644
index 0000000000..3bdae02d68
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_sctp_sup.erl
@@ -0,0 +1,74 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(diameter_sctp_sup).
+
+-behaviour(supervisor).
+
+%% interface
+-export([start/0,
+ start_child/1]).
+
+%% internal exports
+-export([start_link/1,
+ init/1]).
+
+%% Start multiple supervisors only because a child can't start another
+%% child before supervisor:start_child/2 has returned.
+-define(TRANSPORT_SUP, diameter_sctp_transport_sup).
+-define(LISTENER_SUP, diameter_sctp_listener_sup).
+
+%% start/0
+%%
+%% Start the TCP-specific supervisors.
+
+start() ->
+ diameter_transport_sup:start_child(?TRANSPORT_SUP, ?MODULE),
+ diameter_transport_sup:start_child(?LISTENER_SUP, ?MODULE).
+
+%% start_child/1
+%%
+%% Start a worker under one of the child supervisors.
+
+start_child(T) ->
+ SupRef = case element(1,T) of
+ connect -> ?TRANSPORT_SUP;
+ accept -> ?TRANSPORT_SUP;
+ listen -> ?LISTENER_SUP
+ end,
+ supervisor:start_child(SupRef, [T]).
+
+%% start_link/1
+%%
+%% Callback from diameter_transport_sup as a result of start/0.
+%% Starts a child supervisor under the transport supervisor.
+
+start_link(Name) ->
+ supervisor:start_link({local, Name}, ?MODULE, []).
+
+init([]) ->
+ Mod = diameter_sctp,
+ Flags = {simple_one_for_one, 0, 1},
+ ChildSpec = {Mod,
+ {Mod, start_link, []},
+ temporary,
+ 1000,
+ worker,
+ [Mod]},
+ {ok, {Flags, [ChildSpec]}}.
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
new file mode 100644
index 0000000000..653c114471
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -0,0 +1,531 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(diameter_tcp).
+
+-behaviour(gen_server).
+
+%% interface
+-export([start/3]).
+
+%% child start from supervisor
+-export([start_link/1]).
+
+%% child start from here
+-export([init/1]).
+
+%% gen_server callbacks
+-export([handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2]).
+
+-include_lib("diameter/include/diameter.hrl").
+
+-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
+
+-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
+-define(LISTENER_TIMEOUT, 30000).
+-define(FRAGMENT_TIMEOUT, 1000).
+
+%% The same gen_server implementation supports three different kinds
+%% of processes: an actual transport process, one that will club it to
+%% death should the parent die before a connection is established, and
+%% a process owning the listening port.
+
+%% Listener process state.
+-record(listener, {socket :: inet:socket(),
+ count = 1 :: non_neg_integer(),
+ tref :: reference()}).
+
+%% Monitor process state.
+-record(monitor,
+ {parent :: pid(),
+ transport = self() :: pid()}).
+
+-type tref() :: reference(). %% timer reference
+-type length() :: 0..16#FFFFFF. %% message length from Diameter header
+-type size() :: non_neg_integer(). %% accumulated binary size
+-type frag() :: {length(), size(), binary(), list(binary())}
+ | binary().
+
+%% Accepting/connecting transport process state.
+-record(transport,
+ {socket :: inet:socket(), %% accept or connect socket
+ parent :: pid(), %% of process that started us
+ module :: module(), %% gen_tcp-like module
+ frag = <<>> :: binary() | {tref(), frag()}}). %% message fragment
+
+%% The usual transport using gen_tcp can be replaced by anything
+%% sufficiently gen_tcp-like by passing a 'module' option as the first
+%% (for simplicity) transport option. The transport_module diameter_etcp
+%% uses this to set itself as the module to call, its start/3 just
+%% calling start/3 here with the option set.
+
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
+
+start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) ->
+ diameter_tcp_sup:start(), %% start tcp supervisors on demand
+ {Mod, Rest} = split(Opts),
+ Addrs = Caps#diameter_caps.host_ip_address,
+ Arg = {T, Ref, Mod, self(), Rest, Addrs},
+ diameter_tcp_sup:start_child(Arg).
+
+split([{module, M} | Opts]) ->
+ {M, Opts};
+split(Opts) ->
+ {gen_tcp, Opts}.
+
+%% start_link/1
+
+start_link(T) ->
+ proc_lib:start_link(?MODULE,
+ init,
+ [T],
+ infinity,
+ diameter_lib:spawn_opts(server, [])).
+
+%% ---------------------------------------------------------------------------
+%% # init/1
+%% ---------------------------------------------------------------------------
+
+init(T) ->
+ gen_server:enter_loop(?MODULE, [], i(T)).
+
+%% i/1
+
+%% A transport process.
+i({T, Ref, Mod, Pid, Opts, Addrs})
+ when T == accept;
+ T == connect ->
+ erlang:monitor(process, Pid),
+ %% Since accept/connect might block indefinitely, spawn a process
+ %% that does nothing but kill us with the parent until call
+ %% returns.
+ {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ Sock = i(T, Ref, Mod, Pid, Opts, Addrs),
+ MPid ! {stop, self()}, %% tell the monitor to die
+ setopts(Mod, Sock),
+ #transport{parent = Pid,
+ module = Mod,
+ socket = Sock};
+
+%% A monitor process to kill the transport if the parent dies.
+i(#monitor{parent = Pid, transport = TPid} = S) ->
+ proc_lib:init_ack({ok, self()}),
+ erlang:monitor(process, Pid),
+ erlang:monitor(process, TPid),
+ S;
+%% In principle a link between the transport and killer processes
+%% could do the same thing: have the accepting/connecting process be
+%% killed when the killer process dies as a consequence of parent
+%% death. However, a link can be unlinked and this is exactly what
+%% gen_tcp seems to so. Links should be left to supervisors.
+
+i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
+ {[LA, LP], Rest} = proplists:split(Opts, [ip, port]),
+ LAddr = get_addr(LA, Addrs),
+ LPort = get_port(LP),
+ {ok, LSock} = Mod:listen(LPort, gen_opts(LAddr, Rest)),
+ proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
+ erlang:monitor(process, APid),
+ true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}),
+ start_timer(#listener{socket = LSock}).
+
+%% i/6
+
+i(accept, Ref, Mod, Pid, Opts, Addrs) ->
+ {LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}),
+ proc_lib:init_ack({ok, self(), [LAddr]}),
+ Sock = ok(accept(Mod, LSock)),
+ diameter_peer:up(Pid),
+ Sock;
+
+i(connect, _, Mod, Pid, Opts, Addrs) ->
+ {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]),
+ LAddr = get_addr(LA, Addrs),
+ RAddr = get_addr(RA, []),
+ RPort = get_port(RP),
+ proc_lib:init_ack({ok, self(), [LAddr]}),
+ Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddr, Rest))),
+ diameter_peer:up(Pid, {RAddr, RPort}),
+ Sock.
+
+ok({ok, T}) ->
+ T;
+ok(No) ->
+ x(No).
+
+x(Reason) ->
+ exit({shutdown, Reason}).
+
+%% listener/2
+
+listener(LRef, T) ->
+ l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T).
+
+%% Existing process with the listening socket ...
+l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
+ LPid ! {accept, self()},
+ AS;
+
+%% ... or not: start one.
+l([], LRef, T) ->
+ {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, self(), T}),
+ AS.
+
+%% get_addr/2
+
+get_addr(As, Def) ->
+ diameter_lib:ipaddr(addr(As, Def)).
+
+%% Take the first address from the service if several are unspecified.
+addr([], [Addr | _]) ->
+ Addr;
+addr([{_, Addr}], _) ->
+ Addr;
+addr(As, Addrs) ->
+ ?ERROR({invalid_addrs, As, Addrs}).
+
+%% get_port/1
+
+get_port([{_, Port}]) ->
+ Port;
+get_port([]) ->
+ ?DEFAULT_PORT;
+get_port(Ps) ->
+ ?ERROR({invalid_ports, Ps}).
+
+%% gen_opts/2
+
+gen_opts(LAddr, Opts) ->
+ {L,_} = proplists:split(Opts, [binary, packet, active]),
+ [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
+ [binary,
+ {packet, 0},
+ {active, once},
+ {ip, LAddr}
+ | Opts].
+
+%% ---------------------------------------------------------------------------
+%% # handle_call/3
+%% ---------------------------------------------------------------------------
+
+handle_call(_, _, State) ->
+ {reply, nok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_cast/2
+%% ---------------------------------------------------------------------------
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_info/2
+%% ---------------------------------------------------------------------------
+
+handle_info(T, #transport{} = S) ->
+ {noreply, #transport{} = t(T,S)};
+
+handle_info(T, #listener{} = S) ->
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, #monitor{} = S) ->
+ m(T,S),
+ x(T).
+
+%% ---------------------------------------------------------------------------
+%% # code_change/3
+%% ---------------------------------------------------------------------------
+
+code_change(_, State, _) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # terminate/2
+%% ---------------------------------------------------------------------------
+
+terminate(_, _) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+
+%% start_timer/1
+
+start_timer(#listener{count = 0} = S) ->
+ S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)};
+start_timer(S) ->
+ S.
+
+%% m/2
+%%
+%% Transition monitor state.
+
+%% Transport is telling us to die.
+m({stop, TPid}, #monitor{transport = TPid}) ->
+ ok;
+
+%% Transport has died.
+m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) ->
+ ok;
+
+%% Transport parent has died.
+m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
+ transport = TPid}) ->
+ exit(TPid, {shutdown, parent}).
+
+%% l/2
+%%
+%% Transition listener state.
+
+%% Another accept transport is attaching.
+l({accept, TPid}, #listener{count = N} = S) ->
+ erlang:monitor(process, TPid),
+ S#listener{count = N+1};
+
+%% Accepting process has died.
+l({'DOWN', _, process, _, _}, #listener{count = N} = S) ->
+ start_timer(S#listener{count = N-1});
+
+%% Timeout after the last accepting process has died.
+l({timeout, TRef, close = T}, #listener{tref = TRef,
+ count = 0}) ->
+ x(T);
+l({timeout, _, close}, #listener{} = S) ->
+ S.
+
+%% t/2
+%%
+%% Transition transport state.
+
+t(T,S) ->
+ case transition(T,S) of
+ ok ->
+ S;
+ #transport{} = NS ->
+ NS;
+ {stop, Reason} ->
+ x(Reason);
+ stop ->
+ x(T)
+ end.
+
+%% transition/2
+
+%% Incoming message.
+transition({tcp, Sock, Data}, #transport{socket = Sock,
+ module = M}
+ = S) ->
+ setopts(M, Sock),
+ recv(Data, S);
+
+transition({tcp_closed, Sock}, #transport{socket = Sock}) ->
+ stop;
+
+transition({tcp_error, Sock, _Reason} = T, #transport{socket = Sock} = S) ->
+ ?ERROR({T,S});
+
+%% Outgoing message.
+transition({diameter, {send, Bin}}, #transport{socket = Sock,
+ module = M}) ->
+ case send(M, Sock, Bin) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ {stop, {send, Reason}}
+ end;
+
+%% Request to close the transport connection.
+transition({diameter, {close, Pid}}, #transport{parent = Pid,
+ socket = Sock,
+ module = M}) ->
+ M:close(Sock),
+ stop;
+
+%% Timeout for reception of outstanding packets.
+transition({timeout, TRef, flush}, S) ->
+ flush(TRef, S);
+
+%% Request for the local port number.
+transition({resolve_port, RPid}, #transport{socket = Sock,
+ module = M})
+ when is_pid(RPid) ->
+ RPid ! lport(M, Sock),
+ ok;
+
+%% Parent process has died.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+ stop.
+
+%% Crash on anything unexpected.
+
+%% recv/2
+%%
+%% Reassemble fragmented messages and extract multple message sent
+%% using Nagle.
+
+recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
+ S#transport{frag = recv(Pid, Head, Bin)}.
+
+%% recv/3
+
+%% No previous fragment.
+recv(Pid, <<>>, Bin) ->
+ rcv(Pid, Bin);
+
+recv(Pid, {TRef, Head}, Bin) ->
+ erlang:cancel_timer(TRef),
+ rcv(Pid, Head, Bin).
+
+%% rcv/3
+
+%% Not even the first four bytes of the header.
+rcv(Pid, Head, Bin)
+ when is_binary(Head) ->
+ rcv(Pid, <<Head/binary, Bin/binary>>);
+
+%% Or enough to know how many bytes to extract.
+rcv(Pid, {Len, N, Head, Acc}, Bin) ->
+ rcv(Pid, Len, N + size(Bin), Head, [Bin | Acc]).
+
+%% rcv/5
+
+%% Extract a message for which we have all bytes.
+rcv(Pid, Len, N, Head, Acc)
+ when Len =< N ->
+ rcv(Pid, rcv1(Pid, Len, bin(Head, Acc)));
+
+%% Wait for more packets.
+rcv(_, Len, N, Head, Acc) ->
+ {start_timer(), {Len, N, Head, Acc}}.
+
+%% rcv/2
+
+%% Nothing left.
+rcv(_, <<>> = Bin) ->
+ Bin;
+
+%% Well, this isn't good. Chances are things will go south from here
+%% but if we're lucky then the bytes we have extend to an intended
+%% message boundary and we can recover by simply discarding them,
+%% which is the result of receiving them.
+rcv(Pid, <<_:1/binary, Len:24, _/binary>> = Bin)
+ when Len < 20 ->
+ diameter_peer:recv(Pid, Bin),
+ <<>>;
+
+%% Enough bytes to extract a message.
+rcv(Pid, <<_:1/binary, Len:24, _/binary>> = Bin)
+ when Len =< size(Bin) ->
+ rcv(Pid, rcv1(Pid, Len, Bin));
+
+%% Or not: wait for more packets.
+rcv(_, <<_:1/binary, Len:24, _/binary>> = Head) ->
+ {start_timer(), {Len, size(Head), Head, []}};
+
+%% Not even 4 bytes yet.
+rcv(_, Head) ->
+ {start_timer(), Head}.
+
+%% rcv1/3
+
+rcv1(Pid, Len, Bin) ->
+ <<Msg:Len/binary, Rest/binary>> = Bin,
+ diameter_peer:recv(Pid, Msg),
+ Rest.
+
+%% bin/[12]
+
+bin(Head, Acc) ->
+ list_to_binary([Head | lists:reverse(Acc)]).
+
+bin({_, _, Head, Acc}) ->
+ bin(Head, Acc);
+bin(Bin)
+ when is_binary(Bin) ->
+ Bin.
+
+%% start_timer/0
+
+%% An erroneously large message length may leave us with a fragment
+%% that lingers if the peer doesn't have anything more to send. Start
+%% a timer to force reception if an incoming message doesn't arrive
+%% first. This won't stop a peer from sending a large bogus value and
+%% following it up however but such a state of affairs can only go on
+%% for so long since an unanswered DWR will eventually be the result.
+%%
+%% An erroneously small message length causes problems as well but
+%% since all messages with length problems are discarded this should
+%% also eventually lead to watchdog failover.
+
+start_timer() ->
+ erlang:start_timer(?FRAGMENT_TIMEOUT, self(), flush).
+
+flush(TRef, #transport{parent = Pid, frag = {TRef, Head}} = S) ->
+ diameter_peer:recv(Pid, bin(Head)),
+ S#transport{frag = <<>>};
+flush(_, S) ->
+ S.
+
+%% accept/2
+
+accept(gen_tcp, LSock) ->
+ gen_tcp:accept(LSock);
+accept(Mod, LSock) ->
+ Mod:accept(LSock).
+
+%% connect/4
+
+connect(gen_tcp, Host, Port, Opts) ->
+ gen_tcp:connect(Host, Port, Opts);
+connect(Mod, Host, Port, Opts) ->
+ Mod:connect(Host, Port, Opts).
+
+%% send/3
+
+send(gen_tcp, Sock, Bin) ->
+ gen_tcp:send(Sock, Bin);
+send(M, Sock, Bin) ->
+ M:send(Sock, Bin).
+
+%% setopts/3
+
+setopts(gen_tcp, Sock, Opts) ->
+ inet:setopts(Sock, Opts);
+setopts(M, Sock, Opts) ->
+ M:setopts(Sock, Opts).
+
+%% setopts/2
+
+setopts(M, Sock) ->
+ case setopts(M, Sock, [{active, once}]) of
+ ok -> ok;
+ X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
+ end.
+
+%% lport/2
+
+lport(gen_tcp, Sock) ->
+ inet:port(Sock);
+lport(M, Sock) ->
+ M:port(Sock).
diff --git a/lib/diameter/src/transport/diameter_tcp_sup.erl b/lib/diameter/src/transport/diameter_tcp_sup.erl
new file mode 100644
index 0000000000..1016fa2d9b
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_tcp_sup.erl
@@ -0,0 +1,78 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(diameter_tcp_sup).
+
+-behaviour(supervisor).
+
+%% interface
+-export([start/0,
+ start_child/1]).
+
+%% internal exports
+-export([start_link/1,
+ init/1]).
+
+%% Start multiple supervisors only because a child can't start another
+%% child before supervisor:start_child/2 has returned, although two is
+%% really sufficient (listeners and monitors can be under the same).
+-define(TRANSPORT_SUP, diameter_tcp_transport_sup).
+-define(LISTENER_SUP, diameter_tcp_listener_sup).
+-define(MONITOR_SUP, diameter_tcp_monitor_sup).
+
+%% start/0
+%%
+%% Start the TCP-specific supervisors.
+
+start() ->
+ diameter_transport_sup:start_child(?TRANSPORT_SUP, ?MODULE),
+ diameter_transport_sup:start_child(?LISTENER_SUP, ?MODULE),
+ diameter_transport_sup:start_child(?MONITOR_SUP, ?MODULE).
+
+%% start_child/1
+%%
+%% Start a worker under one of the child supervisors.
+
+start_child(T) ->
+ SupRef = case element(1,T) of
+ accept -> ?TRANSPORT_SUP;
+ connect -> ?TRANSPORT_SUP;
+ listen -> ?LISTENER_SUP;
+ monitor -> ?MONITOR_SUP
+ end,
+ supervisor:start_child(SupRef, [T]).
+
+%% start_link/1
+%%
+%% Callback from diameter_transport_sup as a result of start/0.
+%% Starts a child supervisor under the transport supervisor.
+
+start_link(Name) ->
+ supervisor:start_link({local, Name}, ?MODULE, []).
+
+init([]) ->
+ Mod = diameter_tcp,
+ Flags = {simple_one_for_one, 0, 1},
+ ChildSpec = {Mod,
+ {Mod, start_link, []},
+ temporary,
+ 1000,
+ worker,
+ [Mod]},
+ {ok, {Flags, [ChildSpec]}}.
diff --git a/lib/diameter/src/transport/diameter_transport_sup.erl b/lib/diameter/src/transport/diameter_transport_sup.erl
new file mode 100644
index 0000000000..6457ab78b0
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_transport_sup.erl
@@ -0,0 +1,68 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Top supervisor for transport processes.
+%%
+
+-module(diameter_transport_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, %% supervisor start
+ start_child/2]).
+
+%% supervisor callback
+-export([init/1]).
+
+%% ---------------------------------------------------------------------------
+
+%% start_link/0
+%%
+%% Start the transport top supervisor. This is started as a child at
+%% at application start, from diameter_sup.erl. Protocol-specific
+%% supervisors are started as children of this supervisor dynamically
+%% by calling start_child/2. (Eg. diameter_tcp_sup:start/0, which
+%% is called from diameter_tcp:start/3 to start supervisors the
+%% first time a TCP transport process is started.)
+
+start_link() ->
+ SupName = {local, ?MODULE},
+ supervisor:start_link(SupName, ?MODULE, []).
+
+%% start_child/2
+%%
+%% Start a protocol-specific supervisor under the top supervisor.
+
+start_child(Name, Module) ->
+ Spec = {Name,
+ {Module, start_link, [Name]},
+ permanent,
+ 1000,
+ supervisor,
+ [Module]},
+ supervisor:start_child(?MODULE, Spec).
+
+%% ---------------------------------------------------------------------------
+
+%% Top supervisor callback.
+init([]) ->
+ Flags = {one_for_one, 0, 1},
+ Workers = [], %% Each protocol starts its supervisor on demand.
+ {ok, {Flags, Workers}}.
diff --git a/lib/diameter/src/transport/modules.mk b/lib/diameter/src/transport/modules.mk
new file mode 100644
index 0000000000..a0dc3cf2c0
--- /dev/null
+++ b/lib/diameter/src/transport/modules.mk
@@ -0,0 +1,29 @@
+#-*-makefile-*- ; force emacs to enter makefile-mode
+
+# %CopyrightBegin%
+#
+# Copyright Ericsson AB 2010-2011. All Rights Reserved.
+#
+# The contents of this file are subject to the Erlang Public License,
+# Version 1.1, (the "License"); you may not use this file except in
+# compliance with the License. You should have received a copy of the
+# Erlang Public License along with this software. If not, it can be
+# retrieved online at http://www.erlang.org/.
+#
+# Software distributed under the License is distributed on an "AS IS"
+# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+# the License for the specific language governing rights and limitations
+# under the License.
+#
+# %CopyrightEnd%
+
+MODULES = \
+ diameter_etcp \
+ diameter_etcp_sup \
+ diameter_tcp \
+ diameter_tcp_sup \
+ diameter_sctp \
+ diameter_sctp_sup \
+ diameter_transport_sup
+
+HRL_FILES =