diff options
Diffstat (limited to 'lib/diameter/src/transport')
-rw-r--r-- | lib/diameter/src/transport/.gitignore | 3 | ||||
-rw-r--r-- | lib/diameter/src/transport/Makefile | 141 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_etcp.erl | 311 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_etcp_sup.erl | 64 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 624 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp_sup.erl | 74 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 531 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp_sup.erl | 78 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_transport_sup.erl | 68 | ||||
-rw-r--r-- | lib/diameter/src/transport/modules.mk | 29 |
10 files changed, 1923 insertions, 0 deletions
diff --git a/lib/diameter/src/transport/.gitignore b/lib/diameter/src/transport/.gitignore new file mode 100644 index 0000000000..d9f072e262 --- /dev/null +++ b/lib/diameter/src/transport/.gitignore @@ -0,0 +1,3 @@ + +/depend.mk + diff --git a/lib/diameter/src/transport/Makefile b/lib/diameter/src/transport/Makefile new file mode 100644 index 0000000000..4b53100fd2 --- /dev/null +++ b/lib/diameter/src/transport/Makefile @@ -0,0 +1,141 @@ +# +# %CopyrightBegin% +# +# Copyright Ericsson AB 2010-2011. All Rights Reserved. +# +# The contents of this file are subject to the Erlang Public License, +# Version 1.1, (the "License"); you may not use this file except in +# compliance with the License. You should have received a copy of the +# Erlang Public License along with this software. If not, it can be +# retrieved online at http://www.erlang.org/. +# +# Software distributed under the License is distributed on an "AS IS" +# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +# the License for the specific language governing rights and limitations +# under the License. +# +# %CopyrightEnd% +# +# + +ifneq ($(ERL_TOP),) +include $(ERL_TOP)/make/target.mk +EBIN = ../../ebin +include $(ERL_TOP)/make/$(TARGET)/otp.mk +else +include $(DIAMETER_TOP)/make/target.mk +EBIN = ../../ebin +include $(DIAMETER_TOP)/make/$(TARGET)/rules.mk +endif + + +# ---------------------------------------------------- +# Application version +# ---------------------------------------------------- + +include ../../vsn.mk +VSN=$(DIAMETER_VSN) + +# ---------------------------------------------------- +# Release directory specification +# ---------------------------------------------------- + +RELSYSDIR = $(RELEASE_PATH)/lib/diameter-$(VSN) + +INCDIR = ../../include + +# ---------------------------------------------------- +# Target Specs +# ---------------------------------------------------- + +include modules.mk + +ERL_FILES = \ + $(MODULES:%=%.erl) + +TARGET_FILES = \ + $(MODULES:%=$(EBIN)/%.$(EMULATOR)) + +# ---------------------------------------------------- +# FLAGS +# ---------------------------------------------------- + +ifeq ($(TYPE),debug) +ERL_COMPILE_FLAGS += -Ddebug +endif + +include ../app/diameter.mk + +ERL_COMPILE_FLAGS += \ + $(DIAMETER_ERL_COMPILE_FLAGS) \ + -I$(INCDIR) + +# ---------------------------------------------------- +# Targets +# ---------------------------------------------------- + +debug: + @${MAKE} TYPE=debug opt + +opt: $(TARGET_FILES) + +clean: + rm -f $(TARGET_FILES) + rm -f errs core *~ + rm -f depend.mk + +docs: + +info: + @echo "" + @echo "ERL_FILES = $(ERL_FILES)" + @echo "HRL_FILES = $(HRL_FILES)" + @echo "" + @echo "TARGET_FILES = $(TARGET_FILES)" + @echo "" + +# ---------------------------------------------------- +# Special Build Targets +# ---------------------------------------------------- + +# Invoked from ../app to add modules to the app file. +$(APP_TARGET): force + M=`echo $(MODULES) | sed -e 's/^ *//' -e 's/ *$$//' -e 'y/ /,/'`; \ + echo "/%TRANSPORT_MODULES%/s//$$M/;w;q" | tr ';' '\n' \ + | ed -s $@ + +# ---------------------------------------------------- +# Release Target +# ---------------------------------------------------- +ifneq ($(ERL_TOP),) +include $(ERL_TOP)/make/otp_release_targets.mk +else +include $(DIAMETER_TOP)/make/release_targets.mk +endif + +release_spec: opt + $(INSTALL_DIR) $(RELSYSDIR)/ebin + $(INSTALL_DATA) $(TARGET_FILES) $(RELSYSDIR)/ebin + $(INSTALL_DIR) $(RELSYSDIR)/src/transport + $(INSTALL_DATA) $(ERL_FILES) $(HRL_FILES) $(RELSYSDIR)/src/transport + +release_docs_spec: + +force: + +# ---------------------------------------------------- +# Dependencies +# ---------------------------------------------------- + +depend: depend.mk + +# Generate dependencies makefile. +depend.mk: ../app/depend.sed $(ERL_FILES) Makefile + for f in $(MODULES); do \ + sed -f $< $$f.erl | sed "s@/@/$$f@"; \ + done \ + > $@ + +-include depend.mk + +.PHONY: clean debug depend docs force info opt release_docs_spec release_spec diff --git a/lib/diameter/src/transport/diameter_etcp.erl b/lib/diameter/src/transport/diameter_etcp.erl new file mode 100644 index 0000000000..d925d62545 --- /dev/null +++ b/lib/diameter/src/transport/diameter_etcp.erl @@ -0,0 +1,311 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% This module implements a transport_module that uses Erlang message +%% passing for transport. +%% + +-module(diameter_etcp). + +-behaviour(gen_server). + +%% transport_module interface. +-export([start/3]). + +%% gen_tcp-ish interface used by diameter_tcp. +-export([listen/2, + accept/1, + connect/3, + send/2, + close/1, + setopts/2, + port/1]). + +%% child start +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2]). + +%% Server states. + +-record(listener, + {acceptors = [] :: [pid()]}). + +-record(connection, + {parent :: pid(), + peer :: {connect, reference()} %% {connect, MRef} + | accept + | pid()}). + +%% start/3 + +%% 'module' option makes diameter_tcp call here instead of gen_tcp/inet. +start(T, Svc, Opts) + when is_list(Opts) -> + diameter_etcp_sup:start(), + diameter_tcp:start(T, Svc, [{module, ?MODULE} | Opts]). + +%% listen/2 +%% +%% Spawn a process that represents the listening socket. The local +%% port number can be any term, not just an integer. The listener +%% process registers its host/port with diameter_reg and this is the +%% handle with which connect/3 finds the appropriate listening +%% process. + +listen(LPort, Opts) -> + Parent = self(), + diameter_etcp_sup:start_child({listen, Parent, LPort, Opts}). + +%% accept/1 +%% +%% Output: pid() + +accept(LPid) -> + start(fun(Ref, Parent) -> acceptor(LPid, Ref, Parent) end). + +%% connect/3 +%% +%% Output: pid() + +%% RAddr here can either be a 4/8-tuple address or {Node, Addr}. +connect(RAddr, RPort, _Opts) -> + start(fun(Ref, Parent) -> connector(RAddr, RPort, Ref, Parent) end). + +%% send/2 + +send(Pid, Bin) -> + Pid ! {send, Bin}, + ok. + +%% close/1 + +close(Pid) -> + Pid ! close, + monitor(Pid), + receive {'DOWN', _, process, Pid, _} -> ok end. + +%% setopts/2 + +setopts(_, _) -> + ok. + +%% port/1 + +port(_) -> + 3868. %% We have no local port: fake it. + +%% start_link/1 + +start_link(T) -> + gen_server:start_link(?MODULE, T, []). + +%% --------------------------------------------------------------------------- +%% # init/1 +%% --------------------------------------------------------------------------- + +%% Maintain a list of acceptor pids as the process state. Each accept +%% adds a pid to the list, each connect removes one. +init({listen, Parent, LPort, Opts}) -> + monitor(Parent), + {ip, LAddr} = lists:keyfind(ip, 1, Opts), + true = diameter_reg:add_new({?MODULE, listener, LAddr, LPort}), + {ok, #listener{}}; + +init({connect, Fun, Ref, Parent}) -> + {ok, #connection{parent = Parent, + peer = Fun(Ref, Parent)}}. + +%% --------------------------------------------------------------------------- +%% # handle_call/3 +%% --------------------------------------------------------------------------- + +handle_call(_, _, State) -> + {reply, nok, State}. + +%% --------------------------------------------------------------------------- +%% # handle_cast/2 +%% --------------------------------------------------------------------------- + +handle_cast(_, State) -> + {noreply, State}. + +%% --------------------------------------------------------------------------- +%% # handle_info/2 +%% --------------------------------------------------------------------------- + +handle_info(T, #listener{acceptors = L} = S) -> + {noreply, S#listener{acceptors = l(T,L)}}; + +handle_info(T, State) -> + {noreply, transition(T, State)}. + +%% --------------------------------------------------------------------------- +%% # code_change/3 +%% --------------------------------------------------------------------------- + +code_change(_, State, _) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% # terminate/2 +%% --------------------------------------------------------------------------- + +terminate(_, _) -> + ok. + +%% --------------------------------------------------------------------------- + +monitor(Pid) -> + erlang:monitor(process, Pid). + +putr(Key, Val) -> + put({?MODULE, Key}, Val). + +eraser(Key) -> + erase({?MODULE, Key}). + +%% l/2 + +l({'DOWN', _, process, _, _} = T, _) -> + x(T); + +%% New accepting process. +l({accept, APid}, As) -> + As ++ [APid]; + +%% Peer wants to connect but we have no acceptor ... +l({connect, Peer}, [] = As) -> + Peer ! {refused, self()}, + As; + +%% ... or we do. +l({connect, Peer}, [APid | Rest]) -> + Peer ! {accepted, APid}, + Rest. + +x(T) -> + exit({shutdown, T}). + +%% start/1 + +start(Fun) -> + Ref = make_ref(), + {ok, Pid} + = T + = diameter_etcp_sup:start_child({connect, Fun, Ref, self()}), + MRef = monitor(Pid), + receive + {ok, Ref} -> + T; + {'DOWN', MRef, process, _, Reason} -> + {error, Reason} + end. + +%% acceptor/3 + +acceptor(LPid, Ref, Parent) -> + LPid ! {accept, self()}, %% announce that we're accepting + putr(ref, {ok, Ref}), + monitor(Parent), + monitor(LPid), + accept. + +%% connector/4 + +connector(RAddr, RPort, Ref, Parent) -> + c(match(RAddr, RPort), Ref, Parent). + +c([], _, _) -> + x(refused); + +c([{_,LPid}], Ref, Parent) -> + LPid ! {connect, self()}, + putr(ref, {ok, Ref}), + monitor(Parent), + {connect, monitor(LPid)}. + +match({Node, RAddr}, RPort) -> + rpc:call(Node, diameter_reg, match, [{?MODULE, listener, RAddr, RPort}]); + +match(RAddr, RPort) -> + match({node(), RAddr}, RPort). + +%% transition/2 + +%% Unexpected parent or peer death. +transition({'DOWN', _, process, _, _} = T, S) -> + element(2,S) ! {tcp_error, self(), T}, + x(T); + +%% Connector is receiving acceptor pid from listener. +transition({accepted, Peer}, #connection{parent = Parent, + peer = {connect, MRef}}) -> + monitor(Peer), + erlang:demonitor(MRef, [flush]), + Peer ! {connect, self()}, + Parent ! {ok, _} = eraser(ref), + #connection{parent = Parent, + peer = Peer}; + +%% Connector is receiving connection refusal from listener. +transition({refused, _} = T, #connection{peer = {connect, _}}) -> + x(T); + +%% Acceptor is receiving peer connect. +transition({connect, Peer}, #connection{parent = Parent, + peer = accept}) -> + monitor(Peer), + Parent ! {ok, _} = eraser(ref), + #connection{parent = Parent, + peer = Peer}; + +%% Incoming message. +transition({recv, Bin}, #connection{parent = Parent} = S) -> + Parent ! {tcp, self(), Bin}, + S; + +%% Outgoing message. +transition({send, Bin}, #connection{peer = Peer} = S) -> + Peer ! {recv, Bin}, + S; + +%% diameter_etcp:close/1 call when a peer is connected ... +transition(close = T, #connection{peer = Peer}) + when is_pid(Peer) -> + Peer ! {close, self()}, + x(T); + +%% ... or not. +transition(close = T, #connection{}) -> + x(T); + +%% Peer is closing the connection. +transition({close, Peer} = T, #connection{parent = Parent, + peer = Peer}) + when is_pid(Peer) -> + Parent ! {tcp_closed, self()}, + x(T). diff --git a/lib/diameter/src/transport/diameter_etcp_sup.erl b/lib/diameter/src/transport/diameter_etcp_sup.erl new file mode 100644 index 0000000000..bd089cf041 --- /dev/null +++ b/lib/diameter/src/transport/diameter_etcp_sup.erl @@ -0,0 +1,64 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_etcp_sup). + +-behaviour(supervisor). + +%% interface +-export([start/0, + start_child/1]). + +%% internal exports +-export([start_link/1, + init/1]). + +%% start/0 +%% +%% Start the etcp top supervisor. + +start() -> + diameter_transport_sup:start_child(?MODULE, ?MODULE). + +%% start_child/1 +%% +%% Start a worker under the etcp supervisor. + +start_child(T) -> + supervisor:start_child(?MODULE, [T]). + +%% start_link/1 +%% +%% Callback from diameter_transport_sup as a result of start/0. +%% Starts a child supervisor under the transport supervisor. + +start_link(?MODULE) -> + SupName = {local, ?MODULE}, + supervisor:start_link(SupName, ?MODULE, []). + +init([]) -> + Mod = diameter_etcp, + Flags = {simple_one_for_one, 0, 1}, + ChildSpec = {Mod, + {Mod, start_link, []}, + temporary, + 1000, + worker, + [Mod]}, + {ok, {Flags, [ChildSpec]}}. diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl new file mode 100644 index 0000000000..92aa8488a0 --- /dev/null +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -0,0 +1,624 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_sctp). + +-behaviour(gen_server). + +%% interface +-export([start/3]). + +%% child start from supervisor +-export([start_link/1]). + +%% child start from here +-export([init/1]). + +%% gen_server callbacks +-export([handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2]). + +-include_lib("kernel/include/inet_sctp.hrl"). +-include_lib("diameter/include/diameter.hrl"). + +-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). + +%% The default port for a listener. +-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 + +%% How long a listener with no associations lives before offing +%% itself. +-define(LISTENER_TIMEOUT, 30000). + +%% How long to wait for a transport process to attach after +%% association establishment. +-define(ACCEPT_TIMEOUT, 5000). + +-type uint() :: non_neg_integer(). + +%% Accepting/connecting transport process state. +-record(transport, + {parent :: pid(), + mode :: {accept, pid()} + | {connect, {list(inet:ip_address()), uint(), list()}} + %% {RAs, RP, Errors} + | connect, + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id(), %% association identifier + peer :: {[inet:ip_address()], uint()}, %% {RAs, RP} + streams :: {uint(), uint()}, %% {InStream, OutStream} counts + os = 0 :: uint()}). %% next output stream + +%% Listener process state. +-record(listener, + {ref :: reference(), + socket :: gen_sctp:sctp_socket(), + count = 0 :: uint(), + tmap = ets:new(?MODULE, []) :: ets:tid(), + %% {MRef, Pid|AssocId}, {AssocId, Pid} + pending = {0, ets:new(?MODULE, [ordered_set])}, + tref :: reference()}). +%% Field tmap is used to map an incoming message or event to the +%% relevent transport process. Field pending implements a queue of +%% transport processes to which an association has been assigned (at +%% comm_up and written into tmap) but for which diameter hasn't yet +%% spawned a transport process: a short-lived state of affairs as a +%% new transport is spawned as a consequence of a peer being taken up, +%% transport processes being spawned by the listener on demand. In +%% case diameter starts a transport before comm_up on a new +%% association, pending is set to an improper list with the spawned +%% transport as head and the queue as tail. + +%% --------------------------------------------------------------------------- +%% # start/3 +%% --------------------------------------------------------------------------- + +start(T, #diameter_service{capabilities = Caps}, Opts) + when is_list(Opts) -> + diameter_sctp_sup:start(), %% start supervisors on demand + Addrs = Caps#diameter_caps.host_ip_address, + s(T, Addrs, lists:map(fun ip/1, Opts)). + +ip({ifaddr, A}) -> + {ip, A}; +ip(T) -> + T. + +%% A listener spawns transports either as a consequence of this call +%% when there is not yet an association to associate with it, or at +%% comm_up on a new association in which case the call retrieves a +%% transport from the pending queue. +s({accept, Ref} = A, Addrs, Opts) -> + {LPid, LAs} = listener(Ref, {Opts, Addrs}), + try gen_server:call(LPid, {A, self()}, infinity) of + {ok, TPid} -> {ok, TPid, LAs} + catch + exit: Reason -> {error, Reason} + end; +%% This implementation is due to there being no accept call in +%% gen_sctp in order to be able to accept a new association only +%% *after* an accepting transport has been spawned. + +s({connect = C, _}, Addrs, Opts) -> + diameter_sctp_sup:start_child({C, self(), Opts, Addrs}). + +%% start_link/1 + +start_link(T) -> + proc_lib:start_link(?MODULE, + init, + [T], + infinity, + diameter_lib:spawn_opts(server, [])). + +%% --------------------------------------------------------------------------- +%% # init/1 +%% --------------------------------------------------------------------------- + +init(T) -> + gen_server:enter_loop(?MODULE, [], i(T)). + +%% i/1 + +%% A process owning a listening socket. +i({listen, Ref, {Opts, Addrs}}) -> + {LAs, Sock} = AS = open(Addrs, Opts, ?DEFAULT_PORT), + proc_lib:init_ack({ok, self(), LAs}), + ok = gen_sctp:listen(Sock, true), + true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), + start_timer(#listener{ref = Ref, + socket = Sock}); + +%% A connecting transport. +i({connect, Pid, Opts, Addrs}) -> + {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], + {LAs, Sock} = open(Addrs, Rest, 0), + proc_lib:init_ack({ok, self(), LAs}), + erlang:monitor(process, Pid), + #transport{parent = Pid, + mode = {connect, connect(Sock, RAs, RP, [])}, + socket = Sock}; + +%% An accepting transport spawned by diameter. +i({accept, Pid, LPid, Sock}) -> + proc_lib:init_ack({ok, self()}), + erlang:monitor(process, Pid), + erlang:monitor(process, LPid), + #transport{parent = Pid, + mode = {accept, LPid}, + socket = Sock}; + +%% An accepting transport spawned at association establishment. +i({accept, Ref, LPid, Sock, Id}) -> + proc_lib:init_ack({ok, self()}), + MRef = erlang:monitor(process, LPid), + %% Wait for a signal that the transport has been started before + %% processing other messages. + receive + {Ref, Pid} -> %% transport started + #transport{parent = Pid, + mode = {accept, LPid}, + socket = Sock}; + {'DOWN', MRef, process, _, _} = T -> %% listener down + close(Sock, Id), + x(T) + after ?ACCEPT_TIMEOUT -> + close(Sock, Id), + x(timeout) + end. + +%% close/2 + +close(Sock, Id) -> + gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}). +%% Having to pass a record here is hokey. + +%% listener/2 + +listener(LRef, T) -> + l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). + +%% Existing process with the listening socket ... +l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> + {LAs, _Sock} = AS, + {LPid, LAs}; + +%% ... or not: start one. +l([], LRef, T) -> + {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), + {LPid, LAs}. + +%% open/3 + +open(Addrs, Opts, PortNr) -> + {LAs, Os} = addrs(Addrs, Opts), + {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of + {ok, Sock} -> + Sock; + {error, Reason} -> + x({open, Reason}) + end}. + +addrs(Addrs, Opts) -> + case proplists:split(Opts, [ip]) of + {[[]], _} -> + {Addrs, Opts ++ [{ip, A} || A <- Addrs]}; + {[As], Os} -> + LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As], + {LAs, Os ++ [{ip, A} || A <- LAs]} + end. + +portnr(Opts, PortNr) -> + case proplists:get_value(port, Opts) of + undefined -> + [{port, PortNr} | Opts]; + _ -> + Opts + end. + +%% x/1 + +x(Reason) -> + exit({shutdown, Reason}). + +%% gen_opts/1 + +gen_opts(Opts) -> + {L,_} = proplists:split(Opts, [binary, list, mode, active, sctp_events]), + [[],[],[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), + [binary, {active, once} | Opts]. + +%% --------------------------------------------------------------------------- +%% # handle_call/3 +%% --------------------------------------------------------------------------- + +handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, + count = N} + = S) -> + {TPid, NewS} = accept(Pid, S), + {reply, {ok, TPid}, NewS#listener{count = N+1}}; + +handle_call(_, _, State) -> + {reply, nok, State}. + +%% --------------------------------------------------------------------------- +%% # handle_cast/2 +%% --------------------------------------------------------------------------- + +handle_cast(_, State) -> + {noreply, State}. + +%% --------------------------------------------------------------------------- +%% # handle_info/2 +%% --------------------------------------------------------------------------- + +handle_info(T, #transport{} = S) -> + {noreply, #transport{} = t(T,S)}; + +handle_info(T, #listener{} = S) -> + {noreply, #listener{} = l(T,S)}. + +%% --------------------------------------------------------------------------- +%% # code_change/3 +%% --------------------------------------------------------------------------- + +code_change(_, State, _) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% # terminate/2 +%% --------------------------------------------------------------------------- + +terminate(_, #transport{assoc_id = undefined}) -> + ok; + +terminate(_, #transport{socket = Sock, + mode = {accept, _}, + assoc_id = Id}) -> + close(Sock, Id); + +terminate(_, #transport{socket = Sock}) -> + gen_sctp:close(Sock); + +terminate(_, #listener{socket = Sock}) -> + gen_sctp:close(Sock). + +%% --------------------------------------------------------------------------- + +%% start_timer/1 + +start_timer(#listener{count = 0} = S) -> + S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)}; +start_timer(S) -> + S. + +%% l/2 +%% +%% Transition listener state. + +%% Incoming message from SCTP. +l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> + setopts(Sock), + case find(Data, S) of + {TPid, NewS} -> + TPid ! Msg, + NewS; + false -> + S + end; + +%% Transport is asking message to be sent. See send/3 for why the send +%% isn't directly from the transport. +l({send, AssocId, StreamId, Bin}, #listener{socket = Sock} = S) -> + send(Sock, AssocId, StreamId, Bin), + S; + +%% Accepting transport has died. One that's awaiting an association ... +l({'DOWN', MRef, process, TPid, _}, #listener{pending = [TPid | Q], + tmap = T, + count = N} + = S) -> + ets:delete(T, MRef), + ets:delete(T, TPid), + start_timer(S#listener{count = N-1, + pending = Q}); + +%% ... ditto and a new transport has already been started ... +l({'DOWN', _, process, _, _} = T, #listener{pending = [TPid | Q]} + = S) -> + #listener{pending = NQ} + = NewS + = l(T, S#listener{pending = Q}), + NewS#listener{pending = [TPid | NQ]}; + +%% ... or not. +l({'DOWN', MRef, process, TPid, _}, #listener{socket = Sock, + tmap = T, + count = N, + pending = {P,Q}} + = S) -> + [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId + ets:delete(T, MRef), + ets:delete(T, Id), + Id == TPid orelse close(Sock, Id), + case ets:lookup(Q, TPid) of + [{TPid, _}] -> %% transport in the pending queue ... + ets:delete(Q, TPid), + S#listener{pending = {P-1, Q}}; + [] -> %% ... or not + start_timer(S#listener{count = N-1}) + end; + +%% Timeout after the last accepting process has died. +l({timeout, TRef, close = T}, #listener{tref = TRef, + count = 0}) -> + x(T); +l({timeout, _, close}, #listener{} = S) -> + S. + +%% t/2 +%% +%% Transition transport state. + +t(T,S) -> + case transition(T,S) of + ok -> + S; + #transport{} = NS -> + NS; + stop -> + x(T) + end. + +%% transition/2 + +%% Incoming message. +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock, + mode = {accept, _}} + = S) -> + recv(Data, S); + +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> + setopts(Sock), + recv(Data, S); + +%% Outgoing message. +transition({diameter, {send, Msg}}, S) -> + send(Msg, S); + +%% Request to close the transport connection. +transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> + stop; + +%% Listener process has died. +transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> + stop; + +%% Parent process has died. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> + stop. + +%% Crash on anything unexpected. + +%% accept/2 +%% +%% Start a new transport process or use one that's already been +%% started as a consequence of association establishment. + +%% No pending associations: spawn a new transport. +accept(Pid, #listener{socket = Sock, + tmap = T, + pending = {0,_} = Q} + = S) -> + Arg = {accept, Pid, self(), Sock}, + {ok, TPid} = diameter_sctp_sup:start_child(Arg), + MRef = erlang:monitor(process, TPid), + ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), + {TPid, S#listener{pending = [TPid | Q]}}; +%% Placing the transport in the pending field makes it available to +%% the next association. The stack starts a new accepting transport +%% only after this one brings the connection up (or dies). + +%% Accepting transport has died. This can happen if a new transport is +%% started before the DOWN has arrived. +accept(Pid, #listener{pending = [TPid | {0,_} = Q]} = S) -> + false = is_process_alive(TPid), %% assert + accept(Pid, S#listener{pending = Q}); + +%% Pending associations: attach to the first in the queue. +accept(Pid, #listener{ref = Ref, pending = {N,Q}} = S) -> + TPid = ets:first(Q), + TPid ! {Ref, Pid}, + ets:delete(Q, TPid), + {TPid, S#listener{pending = {N-1, Q}}}. + +%% send/2 + +%% Outbound Diameter message on a specified stream ... +send(#diameter_packet{bin = Bin, transport_data = {stream, SId}}, S) -> + send(SId, Bin, S), + S; + +%% ... or not: rotate through all steams. +send(Bin, #transport{streams = {_, OS}, + os = N} + = S) + when is_binary(Bin) -> + send(N, Bin, S), + S#transport{os = (N + 1) rem OS}. + +%% send/3 + +%% Messages have to be sent from the controlling process, which is +%% probably a bug. Sending from here causes an inet_reply, Sock, +%% Status} message to be sent to the controlling process while +%% gen_sctp:send/4 here hangs. +send(StreamId, Bin, #transport{assoc_id = AId, + mode = {accept, LPid}}) -> + LPid ! {send, AId, StreamId, Bin}; + +send(StreamId, Bin, #transport{socket = Sock, + assoc_id = AId}) -> + send(Sock, AId, StreamId, Bin). + +%% send/4 + +send(Sock, AssocId, Stream, Bin) -> + case gen_sctp:send(Sock, AssocId, Stream, Bin) of + ok -> + ok; + {error, Reason} -> + x({send, Reason}) + end. + +%% recv/2 + +%% Association established ... +recv({[], #sctp_assoc_change{state = comm_up, + outbound_streams = OS, + inbound_streams = IS, + assoc_id = Id}}, + #transport{assoc_id = undefined} + = S) -> + up(S#transport{assoc_id = Id, + streams = {IS, OS}}); + +%% ... or not: try the next address. +recv({[], #sctp_assoc_change{} = E}, + #transport{assoc_id = undefined, + socket = Sock, + mode = {connect = C, {[RA|RAs], RP, Es}}} + = S) -> + S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; + +%% Lost association after establishment. +recv({[], #sctp_assoc_change{}}, _) -> + stop; + +%% Inbound Diameter message. +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) + when is_binary(Bin) -> + diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, + bin = Bin}), + ok; + +recv({[], #sctp_shutdown_event{assoc_id = Id}}, + #transport{assoc_id = Id}) -> + stop. + +%% up/1 + +up(#transport{parent = Pid, + mode = {connect = C, {[RA | _], RP, _}}} + = S) -> + diameter_peer:up(Pid, {RA,RP}), + S#transport{mode = C}; + +up(#transport{parent = Pid, + mode = {accept, _}} + = S) -> + diameter_peer:up(Pid), + S. + +%% find/2 + +find({[#sctp_sndrcvinfo{assoc_id = Id}], _} + = Data, + #listener{tmap = T} + = S) -> + f(ets:lookup(T, Id), Data, S); + +find({_, Rec} = Data, #listener{tmap = T} = S) -> + f(ets:lookup(T, assoc_id(Rec)), Data, S). + +%% New association and a transport waiting for one: use it. +f([], + {_, #sctp_assoc_change{state = comm_up, + assoc_id = Id}}, + #listener{tmap = T, + pending = [TPid | {_,_} = Q]} + = S) -> + [{TPid, MRef}] = ets:lookup(T, TPid), + ets:insert(T, [{MRef, Id}, {Id, TPid}]), + ets:delete(T, TPid), + {TPid, S#listener{pending = Q}}; + +%% New association and no transport start yet: spawn one and place it +%% in the queue. +f([], + {_, #sctp_assoc_change{state = comm_up, + assoc_id = Id}}, + #listener{ref = Ref, + socket = Sock, + tmap = T, + pending = {N,Q}} + = S) -> + Arg = {accept, Ref, self(), Sock, Id}, + {ok, TPid} = diameter_sctp_sup:start_child(Arg), + MRef = erlang:monitor(process, TPid), + ets:insert(T, [{MRef, Id}, {Id, TPid}]), + ets:insert(Q, {TPid, now()}), + {TPid, S#listener{pending = {N+1, Q}}}; + +%% Known association ... +f([{_, TPid}], _, S) -> + {TPid, S}; + +%% ... or not: discard. +f([], _, _) -> + false. + +%% assoc_id/1 + +assoc_id(#sctp_shutdown_event{assoc_id = Id}) -> %% undocumented + Id; +assoc_id(#sctp_assoc_change{assoc_id = Id}) -> + Id; +assoc_id(#sctp_sndrcvinfo{assoc_id = Id}) -> + Id; +assoc_id(#sctp_paddr_change{assoc_id = Id}) -> + Id; +assoc_id(#sctp_adaptation_event{assoc_id = Id}) -> + Id. + +%% connect/4 + +connect(_, [], _, Reasons) -> + x({connect, lists:reverse(Reasons)}); + +connect(Sock, [Addr | AT] = As, Port, Reasons) -> + case gen_sctp:connect_init(Sock, Addr, Port, []) of + ok -> + {As, Port, Reasons}; + {error, _} = E -> + connect(Sock, AT, Port, [{Addr, E} | Reasons]) + end. + +%% setopts/1 + +setopts(Sock) -> + case inet:setopts(Sock, [{active, once}]) of + ok -> ok; + X -> x({setopts, Sock, X}) %% possibly on peer disconnect + end. diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl new file mode 100644 index 0000000000..3bdae02d68 --- /dev/null +++ b/lib/diameter/src/transport/diameter_sctp_sup.erl @@ -0,0 +1,74 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_sctp_sup). + +-behaviour(supervisor). + +%% interface +-export([start/0, + start_child/1]). + +%% internal exports +-export([start_link/1, + init/1]). + +%% Start multiple supervisors only because a child can't start another +%% child before supervisor:start_child/2 has returned. +-define(TRANSPORT_SUP, diameter_sctp_transport_sup). +-define(LISTENER_SUP, diameter_sctp_listener_sup). + +%% start/0 +%% +%% Start the TCP-specific supervisors. + +start() -> + diameter_transport_sup:start_child(?TRANSPORT_SUP, ?MODULE), + diameter_transport_sup:start_child(?LISTENER_SUP, ?MODULE). + +%% start_child/1 +%% +%% Start a worker under one of the child supervisors. + +start_child(T) -> + SupRef = case element(1,T) of + connect -> ?TRANSPORT_SUP; + accept -> ?TRANSPORT_SUP; + listen -> ?LISTENER_SUP + end, + supervisor:start_child(SupRef, [T]). + +%% start_link/1 +%% +%% Callback from diameter_transport_sup as a result of start/0. +%% Starts a child supervisor under the transport supervisor. + +start_link(Name) -> + supervisor:start_link({local, Name}, ?MODULE, []). + +init([]) -> + Mod = diameter_sctp, + Flags = {simple_one_for_one, 0, 1}, + ChildSpec = {Mod, + {Mod, start_link, []}, + temporary, + 1000, + worker, + [Mod]}, + {ok, {Flags, [ChildSpec]}}. diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl new file mode 100644 index 0000000000..653c114471 --- /dev/null +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -0,0 +1,531 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_tcp). + +-behaviour(gen_server). + +%% interface +-export([start/3]). + +%% child start from supervisor +-export([start_link/1]). + +%% child start from here +-export([init/1]). + +%% gen_server callbacks +-export([handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2]). + +-include_lib("diameter/include/diameter.hrl"). + +-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). + +-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 +-define(LISTENER_TIMEOUT, 30000). +-define(FRAGMENT_TIMEOUT, 1000). + +%% The same gen_server implementation supports three different kinds +%% of processes: an actual transport process, one that will club it to +%% death should the parent die before a connection is established, and +%% a process owning the listening port. + +%% Listener process state. +-record(listener, {socket :: inet:socket(), + count = 1 :: non_neg_integer(), + tref :: reference()}). + +%% Monitor process state. +-record(monitor, + {parent :: pid(), + transport = self() :: pid()}). + +-type tref() :: reference(). %% timer reference +-type length() :: 0..16#FFFFFF. %% message length from Diameter header +-type size() :: non_neg_integer(). %% accumulated binary size +-type frag() :: {length(), size(), binary(), list(binary())} + | binary(). + +%% Accepting/connecting transport process state. +-record(transport, + {socket :: inet:socket(), %% accept or connect socket + parent :: pid(), %% of process that started us + module :: module(), %% gen_tcp-like module + frag = <<>> :: binary() | {tref(), frag()}}). %% message fragment + +%% The usual transport using gen_tcp can be replaced by anything +%% sufficiently gen_tcp-like by passing a 'module' option as the first +%% (for simplicity) transport option. The transport_module diameter_etcp +%% uses this to set itself as the module to call, its start/3 just +%% calling start/3 here with the option set. + +%% --------------------------------------------------------------------------- +%% # start/3 +%% --------------------------------------------------------------------------- + +start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) -> + diameter_tcp_sup:start(), %% start tcp supervisors on demand + {Mod, Rest} = split(Opts), + Addrs = Caps#diameter_caps.host_ip_address, + Arg = {T, Ref, Mod, self(), Rest, Addrs}, + diameter_tcp_sup:start_child(Arg). + +split([{module, M} | Opts]) -> + {M, Opts}; +split(Opts) -> + {gen_tcp, Opts}. + +%% start_link/1 + +start_link(T) -> + proc_lib:start_link(?MODULE, + init, + [T], + infinity, + diameter_lib:spawn_opts(server, [])). + +%% --------------------------------------------------------------------------- +%% # init/1 +%% --------------------------------------------------------------------------- + +init(T) -> + gen_server:enter_loop(?MODULE, [], i(T)). + +%% i/1 + +%% A transport process. +i({T, Ref, Mod, Pid, Opts, Addrs}) + when T == accept; + T == connect -> + erlang:monitor(process, Pid), + %% Since accept/connect might block indefinitely, spawn a process + %% that does nothing but kill us with the parent until call + %% returns. + {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), + Sock = i(T, Ref, Mod, Pid, Opts, Addrs), + MPid ! {stop, self()}, %% tell the monitor to die + setopts(Mod, Sock), + #transport{parent = Pid, + module = Mod, + socket = Sock}; + +%% A monitor process to kill the transport if the parent dies. +i(#monitor{parent = Pid, transport = TPid} = S) -> + proc_lib:init_ack({ok, self()}), + erlang:monitor(process, Pid), + erlang:monitor(process, TPid), + S; +%% In principle a link between the transport and killer processes +%% could do the same thing: have the accepting/connecting process be +%% killed when the killer process dies as a consequence of parent +%% death. However, a link can be unlinked and this is exactly what +%% gen_tcp seems to so. Links should be left to supervisors. + +i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> + {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), + LAddr = get_addr(LA, Addrs), + LPort = get_port(LP), + {ok, LSock} = Mod:listen(LPort, gen_opts(LAddr, Rest)), + proc_lib:init_ack({ok, self(), {LAddr, LSock}}), + erlang:monitor(process, APid), + true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}), + start_timer(#listener{socket = LSock}). + +%% i/6 + +i(accept, Ref, Mod, Pid, Opts, Addrs) -> + {LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}), + proc_lib:init_ack({ok, self(), [LAddr]}), + Sock = ok(accept(Mod, LSock)), + diameter_peer:up(Pid), + Sock; + +i(connect, _, Mod, Pid, Opts, Addrs) -> + {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), + LAddr = get_addr(LA, Addrs), + RAddr = get_addr(RA, []), + RPort = get_port(RP), + proc_lib:init_ack({ok, self(), [LAddr]}), + Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddr, Rest))), + diameter_peer:up(Pid, {RAddr, RPort}), + Sock. + +ok({ok, T}) -> + T; +ok(No) -> + x(No). + +x(Reason) -> + exit({shutdown, Reason}). + +%% listener/2 + +listener(LRef, T) -> + l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). + +%% Existing process with the listening socket ... +l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> + LPid ! {accept, self()}, + AS; + +%% ... or not: start one. +l([], LRef, T) -> + {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, self(), T}), + AS. + +%% get_addr/2 + +get_addr(As, Def) -> + diameter_lib:ipaddr(addr(As, Def)). + +%% Take the first address from the service if several are unspecified. +addr([], [Addr | _]) -> + Addr; +addr([{_, Addr}], _) -> + Addr; +addr(As, Addrs) -> + ?ERROR({invalid_addrs, As, Addrs}). + +%% get_port/1 + +get_port([{_, Port}]) -> + Port; +get_port([]) -> + ?DEFAULT_PORT; +get_port(Ps) -> + ?ERROR({invalid_ports, Ps}). + +%% gen_opts/2 + +gen_opts(LAddr, Opts) -> + {L,_} = proplists:split(Opts, [binary, packet, active]), + [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), + [binary, + {packet, 0}, + {active, once}, + {ip, LAddr} + | Opts]. + +%% --------------------------------------------------------------------------- +%% # handle_call/3 +%% --------------------------------------------------------------------------- + +handle_call(_, _, State) -> + {reply, nok, State}. + +%% --------------------------------------------------------------------------- +%% # handle_cast/2 +%% --------------------------------------------------------------------------- + +handle_cast(_, State) -> + {noreply, State}. + +%% --------------------------------------------------------------------------- +%% # handle_info/2 +%% --------------------------------------------------------------------------- + +handle_info(T, #transport{} = S) -> + {noreply, #transport{} = t(T,S)}; + +handle_info(T, #listener{} = S) -> + {noreply, #listener{} = l(T,S)}; + +handle_info(T, #monitor{} = S) -> + m(T,S), + x(T). + +%% --------------------------------------------------------------------------- +%% # code_change/3 +%% --------------------------------------------------------------------------- + +code_change(_, State, _) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% # terminate/2 +%% --------------------------------------------------------------------------- + +terminate(_, _) -> + ok. + +%% --------------------------------------------------------------------------- + +%% start_timer/1 + +start_timer(#listener{count = 0} = S) -> + S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)}; +start_timer(S) -> + S. + +%% m/2 +%% +%% Transition monitor state. + +%% Transport is telling us to die. +m({stop, TPid}, #monitor{transport = TPid}) -> + ok; + +%% Transport has died. +m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) -> + ok; + +%% Transport parent has died. +m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, + transport = TPid}) -> + exit(TPid, {shutdown, parent}). + +%% l/2 +%% +%% Transition listener state. + +%% Another accept transport is attaching. +l({accept, TPid}, #listener{count = N} = S) -> + erlang:monitor(process, TPid), + S#listener{count = N+1}; + +%% Accepting process has died. +l({'DOWN', _, process, _, _}, #listener{count = N} = S) -> + start_timer(S#listener{count = N-1}); + +%% Timeout after the last accepting process has died. +l({timeout, TRef, close = T}, #listener{tref = TRef, + count = 0}) -> + x(T); +l({timeout, _, close}, #listener{} = S) -> + S. + +%% t/2 +%% +%% Transition transport state. + +t(T,S) -> + case transition(T,S) of + ok -> + S; + #transport{} = NS -> + NS; + {stop, Reason} -> + x(Reason); + stop -> + x(T) + end. + +%% transition/2 + +%% Incoming message. +transition({tcp, Sock, Data}, #transport{socket = Sock, + module = M} + = S) -> + setopts(M, Sock), + recv(Data, S); + +transition({tcp_closed, Sock}, #transport{socket = Sock}) -> + stop; + +transition({tcp_error, Sock, _Reason} = T, #transport{socket = Sock} = S) -> + ?ERROR({T,S}); + +%% Outgoing message. +transition({diameter, {send, Bin}}, #transport{socket = Sock, + module = M}) -> + case send(M, Sock, Bin) of + ok -> + ok; + {error, Reason} -> + {stop, {send, Reason}} + end; + +%% Request to close the transport connection. +transition({diameter, {close, Pid}}, #transport{parent = Pid, + socket = Sock, + module = M}) -> + M:close(Sock), + stop; + +%% Timeout for reception of outstanding packets. +transition({timeout, TRef, flush}, S) -> + flush(TRef, S); + +%% Request for the local port number. +transition({resolve_port, RPid}, #transport{socket = Sock, + module = M}) + when is_pid(RPid) -> + RPid ! lport(M, Sock), + ok; + +%% Parent process has died. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> + stop. + +%% Crash on anything unexpected. + +%% recv/2 +%% +%% Reassemble fragmented messages and extract multple message sent +%% using Nagle. + +recv(Bin, #transport{parent = Pid, frag = Head} = S) -> + S#transport{frag = recv(Pid, Head, Bin)}. + +%% recv/3 + +%% No previous fragment. +recv(Pid, <<>>, Bin) -> + rcv(Pid, Bin); + +recv(Pid, {TRef, Head}, Bin) -> + erlang:cancel_timer(TRef), + rcv(Pid, Head, Bin). + +%% rcv/3 + +%% Not even the first four bytes of the header. +rcv(Pid, Head, Bin) + when is_binary(Head) -> + rcv(Pid, <<Head/binary, Bin/binary>>); + +%% Or enough to know how many bytes to extract. +rcv(Pid, {Len, N, Head, Acc}, Bin) -> + rcv(Pid, Len, N + size(Bin), Head, [Bin | Acc]). + +%% rcv/5 + +%% Extract a message for which we have all bytes. +rcv(Pid, Len, N, Head, Acc) + when Len =< N -> + rcv(Pid, rcv1(Pid, Len, bin(Head, Acc))); + +%% Wait for more packets. +rcv(_, Len, N, Head, Acc) -> + {start_timer(), {Len, N, Head, Acc}}. + +%% rcv/2 + +%% Nothing left. +rcv(_, <<>> = Bin) -> + Bin; + +%% Well, this isn't good. Chances are things will go south from here +%% but if we're lucky then the bytes we have extend to an intended +%% message boundary and we can recover by simply discarding them, +%% which is the result of receiving them. +rcv(Pid, <<_:1/binary, Len:24, _/binary>> = Bin) + when Len < 20 -> + diameter_peer:recv(Pid, Bin), + <<>>; + +%% Enough bytes to extract a message. +rcv(Pid, <<_:1/binary, Len:24, _/binary>> = Bin) + when Len =< size(Bin) -> + rcv(Pid, rcv1(Pid, Len, Bin)); + +%% Or not: wait for more packets. +rcv(_, <<_:1/binary, Len:24, _/binary>> = Head) -> + {start_timer(), {Len, size(Head), Head, []}}; + +%% Not even 4 bytes yet. +rcv(_, Head) -> + {start_timer(), Head}. + +%% rcv1/3 + +rcv1(Pid, Len, Bin) -> + <<Msg:Len/binary, Rest/binary>> = Bin, + diameter_peer:recv(Pid, Msg), + Rest. + +%% bin/[12] + +bin(Head, Acc) -> + list_to_binary([Head | lists:reverse(Acc)]). + +bin({_, _, Head, Acc}) -> + bin(Head, Acc); +bin(Bin) + when is_binary(Bin) -> + Bin. + +%% start_timer/0 + +%% An erroneously large message length may leave us with a fragment +%% that lingers if the peer doesn't have anything more to send. Start +%% a timer to force reception if an incoming message doesn't arrive +%% first. This won't stop a peer from sending a large bogus value and +%% following it up however but such a state of affairs can only go on +%% for so long since an unanswered DWR will eventually be the result. +%% +%% An erroneously small message length causes problems as well but +%% since all messages with length problems are discarded this should +%% also eventually lead to watchdog failover. + +start_timer() -> + erlang:start_timer(?FRAGMENT_TIMEOUT, self(), flush). + +flush(TRef, #transport{parent = Pid, frag = {TRef, Head}} = S) -> + diameter_peer:recv(Pid, bin(Head)), + S#transport{frag = <<>>}; +flush(_, S) -> + S. + +%% accept/2 + +accept(gen_tcp, LSock) -> + gen_tcp:accept(LSock); +accept(Mod, LSock) -> + Mod:accept(LSock). + +%% connect/4 + +connect(gen_tcp, Host, Port, Opts) -> + gen_tcp:connect(Host, Port, Opts); +connect(Mod, Host, Port, Opts) -> + Mod:connect(Host, Port, Opts). + +%% send/3 + +send(gen_tcp, Sock, Bin) -> + gen_tcp:send(Sock, Bin); +send(M, Sock, Bin) -> + M:send(Sock, Bin). + +%% setopts/3 + +setopts(gen_tcp, Sock, Opts) -> + inet:setopts(Sock, Opts); +setopts(M, Sock, Opts) -> + M:setopts(Sock, Opts). + +%% setopts/2 + +setopts(M, Sock) -> + case setopts(M, Sock, [{active, once}]) of + ok -> ok; + X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect + end. + +%% lport/2 + +lport(gen_tcp, Sock) -> + inet:port(Sock); +lport(M, Sock) -> + M:port(Sock). diff --git a/lib/diameter/src/transport/diameter_tcp_sup.erl b/lib/diameter/src/transport/diameter_tcp_sup.erl new file mode 100644 index 0000000000..1016fa2d9b --- /dev/null +++ b/lib/diameter/src/transport/diameter_tcp_sup.erl @@ -0,0 +1,78 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_tcp_sup). + +-behaviour(supervisor). + +%% interface +-export([start/0, + start_child/1]). + +%% internal exports +-export([start_link/1, + init/1]). + +%% Start multiple supervisors only because a child can't start another +%% child before supervisor:start_child/2 has returned, although two is +%% really sufficient (listeners and monitors can be under the same). +-define(TRANSPORT_SUP, diameter_tcp_transport_sup). +-define(LISTENER_SUP, diameter_tcp_listener_sup). +-define(MONITOR_SUP, diameter_tcp_monitor_sup). + +%% start/0 +%% +%% Start the TCP-specific supervisors. + +start() -> + diameter_transport_sup:start_child(?TRANSPORT_SUP, ?MODULE), + diameter_transport_sup:start_child(?LISTENER_SUP, ?MODULE), + diameter_transport_sup:start_child(?MONITOR_SUP, ?MODULE). + +%% start_child/1 +%% +%% Start a worker under one of the child supervisors. + +start_child(T) -> + SupRef = case element(1,T) of + accept -> ?TRANSPORT_SUP; + connect -> ?TRANSPORT_SUP; + listen -> ?LISTENER_SUP; + monitor -> ?MONITOR_SUP + end, + supervisor:start_child(SupRef, [T]). + +%% start_link/1 +%% +%% Callback from diameter_transport_sup as a result of start/0. +%% Starts a child supervisor under the transport supervisor. + +start_link(Name) -> + supervisor:start_link({local, Name}, ?MODULE, []). + +init([]) -> + Mod = diameter_tcp, + Flags = {simple_one_for_one, 0, 1}, + ChildSpec = {Mod, + {Mod, start_link, []}, + temporary, + 1000, + worker, + [Mod]}, + {ok, {Flags, [ChildSpec]}}. diff --git a/lib/diameter/src/transport/diameter_transport_sup.erl b/lib/diameter/src/transport/diameter_transport_sup.erl new file mode 100644 index 0000000000..6457ab78b0 --- /dev/null +++ b/lib/diameter/src/transport/diameter_transport_sup.erl @@ -0,0 +1,68 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Top supervisor for transport processes. +%% + +-module(diameter_transport_sup). + +-behaviour(supervisor). + +-export([start_link/0, %% supervisor start + start_child/2]). + +%% supervisor callback +-export([init/1]). + +%% --------------------------------------------------------------------------- + +%% start_link/0 +%% +%% Start the transport top supervisor. This is started as a child at +%% at application start, from diameter_sup.erl. Protocol-specific +%% supervisors are started as children of this supervisor dynamically +%% by calling start_child/2. (Eg. diameter_tcp_sup:start/0, which +%% is called from diameter_tcp:start/3 to start supervisors the +%% first time a TCP transport process is started.) + +start_link() -> + SupName = {local, ?MODULE}, + supervisor:start_link(SupName, ?MODULE, []). + +%% start_child/2 +%% +%% Start a protocol-specific supervisor under the top supervisor. + +start_child(Name, Module) -> + Spec = {Name, + {Module, start_link, [Name]}, + permanent, + 1000, + supervisor, + [Module]}, + supervisor:start_child(?MODULE, Spec). + +%% --------------------------------------------------------------------------- + +%% Top supervisor callback. +init([]) -> + Flags = {one_for_one, 0, 1}, + Workers = [], %% Each protocol starts its supervisor on demand. + {ok, {Flags, Workers}}. diff --git a/lib/diameter/src/transport/modules.mk b/lib/diameter/src/transport/modules.mk new file mode 100644 index 0000000000..a0dc3cf2c0 --- /dev/null +++ b/lib/diameter/src/transport/modules.mk @@ -0,0 +1,29 @@ +#-*-makefile-*- ; force emacs to enter makefile-mode + +# %CopyrightBegin% +# +# Copyright Ericsson AB 2010-2011. All Rights Reserved. +# +# The contents of this file are subject to the Erlang Public License, +# Version 1.1, (the "License"); you may not use this file except in +# compliance with the License. You should have received a copy of the +# Erlang Public License along with this software. If not, it can be +# retrieved online at http://www.erlang.org/. +# +# Software distributed under the License is distributed on an "AS IS" +# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +# the License for the specific language governing rights and limitations +# under the License. +# +# %CopyrightEnd% + +MODULES = \ + diameter_etcp \ + diameter_etcp_sup \ + diameter_tcp \ + diameter_tcp_sup \ + diameter_sctp \ + diameter_sctp_sup \ + diameter_transport_sup + +HRL_FILES = |