diff options
author | Simon Cornish <[email protected]> | 2009-12-21 22:48:42 -0800 |
---|---|---|
committer | Björn Gustavsson <[email protected]> | 2010-02-06 09:44:40 +0100 |
commit | 827bb85bafd96ec3037c849ea42878e4f581d22f (patch) | |
tree | 663df2c2751babc414e02cbc83357e535618dcaf | |
parent | c6bf34b7035a6292650babf2eb8af80c19256fab (diff) | |
download | otp-827bb85bafd96ec3037c849ea42878e4f581d22f.tar.gz otp-827bb85bafd96ec3037c849ea42878e4f581d22f.tar.bz2 otp-827bb85bafd96ec3037c849ea42878e4f581d22f.zip |
Implement a non-blocking SCTP connect
This patch adds a new set of functions - gen_sctp:connect_init/* that initiate
an SCTP connection without blocking for the result. The result is delivered
asynchronously as an sctp_assoc_change event.
The new functions have the same API as documented for gen_sctp:connect/* with
the following exceptions:
* Timeout is only used to supervise resolving Addr (the peer address)
* The possible return values are ok | {error, posix()}
The caller application is responsible for receiving the #sctp_assoc_change{}
event and correctly determining the connect it originated from (for example,
by examining the remote host and/or port). The application should have at
least {active, once} or use gen_sctp:recv to retrieve the connect result.
The implementation of gen_sctp:connect suffers from a number of
shortcomings which the user may avoid by using gen_sctp:connect_init and
adding code to receive the connect result.
First, irrespective of the Timeout value given to gen_sctp:connect, the OS
attempts and retries the SCTP INIT according to various kernel parameters. If
the Timeout value is shorter than the entire attempt then the application will
still receive an sctp_assoc_change event after the {error, timeout} is
returned from the initial call. This could be somewhat confusing (either to
the application or the designer!) especially if the status is
comm_up. Subsequent calls to connect before the OS has finished this process
return {error, ealready} which may also be counter-intuitive.
Second, there is a race-condition (documented in comments in inet_sctp.erl)
that can cause the wrong sctp_assoc_change record to be returned to an
application calling gen_sctp:connect. The race seriously affects connection
attempts when using one-to-many sockets.
-rw-r--r-- | lib/kernel/doc/src/gen_sctp.xml | 113 | ||||
-rw-r--r-- | lib/kernel/src/gen_sctp.erl | 36 | ||||
-rw-r--r-- | lib/kernel/src/inet_sctp.erl | 20 | ||||
-rw-r--r-- | lib/kernel/test/gen_sctp_SUITE.erl | 52 |
4 files changed, 202 insertions, 19 deletions
diff --git a/lib/kernel/doc/src/gen_sctp.xml b/lib/kernel/doc/src/gen_sctp.xml index de41178a17..40efba2bb7 100644 --- a/lib/kernel/doc/src/gen_sctp.xml +++ b/lib/kernel/doc/src/gen_sctp.xml @@ -170,10 +170,19 @@ <p>Establishes a new association for the socket <c>Socket</c>, with the peer (SCTP server socket) given by <c>Addr</c> and <c>Port</c>. The <c>Timeout</c>, - is expressed in milliseconds.</p> - <p>A socket can be associated with multiple peers. - <marker id="record-sctp_assoc_change"></marker> + is expressed in milliseconds. A socket can be associated with multiple peers.</p> + <p><b>WARNING:</b>Using a value of <c>Timeout</c> less than + the maximum time taken by the OS to establish an association (around 4.5 minutes + if the default values from RFC 4960 are used) can result in + inconsistent or incorrect return values. This is especially + relevant for associations sharing the same <c>Socket</c> + (i.e. source address and port) since the controlling process + blocks until <c>connect/*</c> returns. + <seealso marker="#connect_init/4">connect_init/*</seealso> + provides an alternative not subject to this limitation.</p> + + <p><marker id="record-sctp_assoc_change"></marker> The result of <c>connect/*</c> is an <c>#sctp_assoc_change{}</c> event which contains, in particular, the new <seealso marker="#type-assoc_id">Association ID:</seealso></p> @@ -233,6 +242,45 @@ </desc> </func> <func> + <name>connect_init(Socket, Addr, Port, Opts) -> ok | {error, posix()}</name> + <fsummary>Same as <c>connect_init(Socket, Addr, Port, Opts, infinity)</c>.</fsummary> + <desc> + <p>Same as <c>connect_init(Socket, Addr, Port, Opts, infinity)</c>.</p> + </desc> + </func> + <func> + <name>connect_init(Socket, Addr, Port, [Opt], Timeout) -> ok | {error, posix()}</name> + <fsummary>Initiate a new association for the socket <c>Socket</c>, with a peer (SCTP server socket)</fsummary> + <type> + <v>Socket = sctp_socket()</v> + <v>Addr = ip_address() | Host</v> + <v>Port = port_number()</v> + <v>Opt = sctp_option()</v> + <v>Timeout = timeout()</v> + <v>Host = atom() | string()</v> + </type> + <desc> + <p>Initiates a new association for the socket <c>Socket</c>, + with the peer (SCTP server socket) given by + <c>Addr</c> and <c>Port</c>.</p> + <p>The fundamental difference between this API + and <c>connect/*</c> is that the return value is that of the + underlying OS connect(2) system call. If <c>ok</c> is returned + then the result of the association establishement is received + by the calling process as + an <seealso marker="#record-sctp_assoc_change"> + #sctp_assoc_change{}</seealso> + event. The calling process must be prepared to receive this, or + poll for it using <c>recv/*</c> depending on the value of the + active option.</p> + <p>The parameters are as described + in <seealso marker="#connect/5">connect/*</seealso>, with the + exception of the <c>Timeout</c> value.</p> + <p>The timer associated with <c>Timeout</c> only supervises + IP resolution of <c>Addr</c></p> + </desc> + </func> + <func> <name>controlling_process(sctp_socket(), pid()) -> ok</name> <fsummary>Assign a new controlling process pid to the socket</fsummary> <desc> @@ -1058,6 +1106,65 @@ gen_sctp:close(S). </pre> <p></p> </item> + <item> + <p>A very simple Erlang SCTP Client which uses the + connect_init API.</p> + <pre> +-module(ex3). + +-export([client/4]). +-include_lib("kernel/include/inet.hrl"). +-include_lib("kernel/include/inet_sctp.hrl"). + +client(Peer1, Port1, Peer2, Port2) + when is_tuple(Peer1), is_integer(Port1), is_tuple(Peer2), is_integer(Port2) -> + {ok,S} = gen_sctp:open(), + SctpInitMsgOpt = {sctp_initmsg,#sctp_initmsg{num_ostreams=5}}, + ActiveOpt = {active, true}, + Opts = [SctpInitMsgOpt, ActiveOpt], + ok = gen_sctp:connect(S, Peer1, Port1, Opts), + ok = gen_sctp:connect(S, Peer2, Port2, Opts), + io:format("Connections initiated~n", []), + client_loop(S, Peer1, Port1, undefined, Peer2, Port2, undefined). + +client_loop(S, Peer1, Port1, AssocId1, Peer2, Port2, AssocId2) -> + receive + {sctp, S, Peer1, Port1, {_Anc, SAC}} + when is_record(SAC, sctp_assoc_change), AssocId1 == undefined -> + io:format("Association 1 connect result: ~p. AssocId: ~p~n", + [SAC#sctp_assoc_change.state, + SAC#sctp_assoc_change.assoc_id]), + client_loop(S, Peer1, Port1, SAC#sctp_assoc_change.assoc_id, + Peer2, Port2, AssocId2); + + {sctp, S, Peer2, Port2, {_Anc, SAC}} + when is_record(SAC, sctp_assoc_change), AssocId2 == undefined -> + io:format("Association 2 connect result: ~p. AssocId: ~p~n", + [SAC#sctp_assoc_change.state, SAC#sctp_assoc_change.assoc_id]), + client_loop(S, Peer1, Port1, AssocId1, Peer2, Port2, + SAC#sctp_assoc_change.assoc_id); + + {sctp, S, Peer1, Port1, Data} -> + io:format("Association 1: received ~p~n", [Data]), + client_loop(S, Peer1, Port1, AssocId1, + Peer2, Port2, AssocId2); + + {sctp, S, Peer2, Port2, Data} -> + io:format("Association 2: received ~p~n", [Data]), + client_loop(S, Peer1, Port1, AssocId1, + Peer2, Port2, AssocId2); + + Other -> + io:format("Other ~p~n", [Other]), + client_loop(S, Peer1, Port1, AssocId1, + Peer2, Port2, AssocId2) + + after 5000 -> + ok + end. +</pre> + <p></p> + </item> </list> </section> diff --git a/lib/kernel/src/gen_sctp.erl b/lib/kernel/src/gen_sctp.erl index fcd1d1564a..0665d2e14d 100644 --- a/lib/kernel/src/gen_sctp.erl +++ b/lib/kernel/src/gen_sctp.erl @@ -27,7 +27,7 @@ -include("inet_sctp.hrl"). -export([open/0,open/1,open/2,close/1]). --export([listen/2,connect/4,connect/5]). +-export([listen/2,connect/4,connect/5,connect_init/4,connect_init/5]). -export([eof/2,abort/2]). -export([send/3,send/4,recv/1,recv/2]). -export([error_string/1]). @@ -80,7 +80,26 @@ listen(S, Flag) -> connect(S, Addr, Port, Opts) -> connect(S, Addr, Port, Opts, infinity). -connect(S, Addr, Port, Opts, Timeout) when is_port(S), is_list(Opts) -> +connect(S, Addr, Port, Opts, Timeout) -> + case do_connect(S, Addr, Port, Opts, Timeout, true) of + badarg -> + erlang:error(badarg, [S,Addr,Port,Opts,Timeout]); + Result -> + Result + end. + +connect_init(S, Addr, Port, Opts) -> + connect_init(S, Addr, Port, Opts, infinity). + +connect_init(S, Addr, Port, Opts, Timeout) -> + case do_connect(S, Addr, Port, Opts, Timeout, false) of + badarg -> + erlang:error(badarg, [S,Addr,Port,Opts,Timeout]); + Result -> + Result + end. + +do_connect(S, Addr, Port, Opts, Timeout, ConnWait) when is_port(S), is_list(Opts) -> case inet_db:lookup_socket(S) of {ok,Mod} -> case Mod:getserv(Port) of @@ -89,21 +108,26 @@ connect(S, Addr, Port, Opts, Timeout) when is_port(S), is_list(Opts) -> Timer -> try Mod:getaddr(Addr, Timer) of {ok,IP} -> - Mod:connect(S, IP, Port, Opts, Timer); + ConnectTimer = if ConnWait == false -> + nowait; + true -> + Timer + end, + Mod:connect(S, IP, Port, Opts, ConnectTimer); Error -> Error after inet:stop_timer(Timer) end catch error:badarg -> - erlang:error(badarg, [S,Addr,Port,Opts,Timeout]) + badarg end; Error -> Error end; Error -> Error end; -connect(S, Addr, Port, Opts, Timeout) -> - erlang:error(badarg, [S,Addr,Port,Opts,Timeout]). +do_connect(_S, _Addr, _Port, _Opts, _Timeout, _ConnWait) -> + badarg. diff --git a/lib/kernel/src/inet_sctp.erl b/lib/kernel/src/inet_sctp.erl index 30c0e85dd9..6fdd4bf789 100644 --- a/lib/kernel/src/inet_sctp.erl +++ b/lib/kernel/src/inet_sctp.erl @@ -64,16 +64,22 @@ close(S) -> listen(S, Flag) -> prim_inet:listen(S, Flag). +%% A non-blocking connect is implemented when the initial call is to +%% gen_sctp:connect_init which passes the value nowait as the Timer connect(S, Addr, Port, Opts, Timer) -> case prim_inet:chgopts(S, Opts) of ok -> case prim_inet:getopt(S, active) of {ok,Active} -> - Timeout = inet:timeout(Timer), + Timeout = if Timer =:= nowait -> + infinity; %% don't start driver timer in inet_drv + true -> + inet:timeout(Timer) + end, case prim_inet:connect(S, Addr, Port, Timeout) of - ok -> + ok when Timer =/= nowait -> connect_get_assoc(S, Addr, Port, Active, Timer); - Err1 -> Err1 + OkOrErr1 -> OkOrErr1 end; Err2 -> Err2 end; @@ -89,10 +95,10 @@ connect(S, Addr, Port, Opts, Timer) -> %% connect_get_assoc/5 below mistakes it for an invalid response %% for a socket in {active,false} or {active,once} modes. %% -%% In {active,true} mode it probably gets right, but it is -%% a blocking connect that is implemented even for {active,true}, -%% and that may be a shortcoming. A non-blocking connect -%% would be nice to have. +%% In {active,true} mode the window of time for the race is smaller, +%% but it is possible and also it is a blocking connect that is +%% implemented even for {active,true}, and that may be a +%% shortcoming. connect_get_assoc(S, Addr, Port, false, Timer) -> case recv(S, inet:timeout(Timer)) of diff --git a/lib/kernel/test/gen_sctp_SUITE.erl b/lib/kernel/test/gen_sctp_SUITE.erl index dd7d5f111a..b19ce4d40b 100644 --- a/lib/kernel/test/gen_sctp_SUITE.erl +++ b/lib/kernel/test/gen_sctp_SUITE.erl @@ -24,10 +24,11 @@ %%-compile(export_all). -export([all/1,init_per_testcase/2,fin_per_testcase/2, - basic/1,xfer_min/1,xfer_active/1,api_open_close/1,api_listen/1]). + basic/1,api_open_close/1,api_listen/1,api_connect_init/1, + xfer_min/1,xfer_active/1]). all(suite) -> - [basic,xfer_min,xfer_active,api_open_close,api_listen]. + [basic,api_open_close,api_listen,api_connect_init,xfer_min,xfer_active]. init_per_testcase(_Func, Config) -> Dog = test_server:timetrap(test_server:seconds(15)), @@ -325,7 +326,7 @@ api_listen(Config) when is_list(Config) -> ?line {ok,{Localhost, Pb,[], #sctp_assoc_change{ - state = comm_lost}}} = + state=comm_lost}}} = gen_sctp:recv(Sa, infinity); {error,#sctp_assoc_change{state=cant_assoc}} -> ok end, @@ -336,3 +337,48 @@ api_listen(Config) when is_list(Config) -> ?line ok = gen_sctp:close(Sa), ?line ok = gen_sctp:close(Sb), ok. + +api_connect_init(doc) -> + "Test the API function connect_init/4"; +api_connect_init(suite) -> + []; +api_connect_init(Config) when is_list(Config) -> + ?line Localhost = {127,0,0,1}, + + ?line {ok,S} = gen_sctp:open(), + ?line {ok,Pb} = inet:port(S), + ?line try gen_sctp:connect_init(S, Localhost, not_allowed_for_port, []) + catch error:badarg -> ok + end, + ?line try gen_sctp:connect_init(S, Localhost, 12345, not_allowed_for_opts) + catch error:badarg -> ok + end, + ?line ok = gen_sctp:close(S), + ?line {error,closed} = gen_sctp:connect_init(S, Localhost, 12345, []), + + ?line {ok,Sb} = gen_sctp:open(Pb), + ?line {ok,Sa} = gen_sctp:open(), + ?line case gen_sctp:connect_init(Sa, localhost, Pb, []) of + {error,econnrefused} -> + ?line {ok,{Localhost, + Pb,[], + #sctp_assoc_change{state=comm_lost}}} = + gen_sctp:recv(Sa, infinity); + ok -> + ?line {ok,{Localhost, + Pb,[], + #sctp_assoc_change{state=cant_assoc}}} = + gen_sctp:recv(Sa, infinity) + end, + ?line ok = gen_sctp:listen(Sb, true), + ?line case gen_sctp:connect_init(Sa, localhost, Pb, []) of + ok -> + ?line {ok,{Localhost, + Pb,[], + #sctp_assoc_change{ + state = comm_up}}} = + gen_sctp:recv(Sa, infinity) + end, + ?line ok = gen_sctp:close(Sa), + ?line ok = gen_sctp:close(Sb), + ok. |