aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaimo Niskanen <[email protected]>2011-09-22 11:11:57 +0200
committerRaimo Niskanen <[email protected]>2011-11-17 15:42:13 +0100
commitc501ca26c01d222af4b05b1732059a7a9684b1c3 (patch)
treeac14b875586936118233ea40c7702020b5636187
parent4ef92bf4416c988a459a361e71e402c286e889b5 (diff)
downloadotp-c501ca26c01d222af4b05b1732059a7a9684b1c3.tar.gz
otp-c501ca26c01d222af4b05b1732059a7a9684b1c3.tar.bz2
otp-c501ca26c01d222af4b05b1732059a7a9684b1c3.zip
kernel: Rewrite SCTP test socket handler
The socket handler needs more flexibility regarding which events are expected to be received.
-rw-r--r--lib/kernel/test/gen_sctp_SUITE.erl603
1 files changed, 360 insertions, 243 deletions
diff --git a/lib/kernel/test/gen_sctp_SUITE.erl b/lib/kernel/test/gen_sctp_SUITE.erl
index 8011c22ce5..07434df8da 100644
--- a/lib/kernel/test/gen_sctp_SUITE.erl
+++ b/lib/kernel/test/gen_sctp_SUITE.erl
@@ -110,7 +110,7 @@ xfer_min(Config) when is_list(Config) ->
assoc_id=SaAssocId}=SaAssocChange} =
gen_sctp:connect(Sa, Loopback, Pb, []),
?line {SbAssocId,SaOutboundStreams,SaInboundStreams} =
- case recv_event(ok(gen_sctp:recv(Sb, infinity))) of
+ case recv_event(log_ok(gen_sctp:recv(Sb, infinity))) of
{Loopback,Pa,
#sctp_assoc_change{state=comm_up,
error=0,
@@ -129,12 +129,12 @@ xfer_min(Config) when is_list(Config) ->
outbound_streams=SbOutboundStreams,
inbound_streams=SbInboundStreams,
assoc_id=AssocId}} =
- ?line recv_event(ok(gen_sctp:recv(Sb, infinity))),
+ ?line recv_event(log_ok(gen_sctp:recv(Sb, infinity))),
{AssocId,SbInboundStreams,SbOutboundStreams}
end,
?line ok = gen_sctp:send(Sa, SaAssocId, 0, Data),
- ?line case ok(gen_sctp:recv(Sb, infinity)) of
+ ?line case log_ok(gen_sctp:recv(Sb, infinity)) of
{Loopback,
Pa,
[#sctp_sndrcvinfo{stream=Stream,
@@ -154,7 +154,7 @@ xfer_min(Config) when is_list(Config) ->
Data}} = gen_sctp:recv(Sb, infinity)
end,
?line ok = gen_sctp:send(Sb, SbAssocId, 0, Data),
- ?line case ok(gen_sctp:recv(Sa, infinity)) of
+ ?line case log_ok(gen_sctp:recv(Sa, infinity)) of
{Loopback,Pb,
[#sctp_sndrcvinfo{stream=Stream,
assoc_id=SaAssocId}],
@@ -172,22 +172,22 @@ xfer_min(Config) when is_list(Config) ->
[#sctp_sndrcvinfo{stream=Stream,
assoc_id=SaAssocId}],
Data} =
- ok(gen_sctp:recv(Sa, infinity))
+ log_ok(gen_sctp:recv(Sa, infinity))
end,
%%
?line ok = gen_sctp:eof(Sa, SaAssocChange),
?line {Loopback,Pa,#sctp_shutdown_event{assoc_id=SbAssocId}} =
- recv_event(ok(gen_sctp:recv(Sb, infinity))),
+ recv_event(log_ok(gen_sctp:recv(Sb, infinity))),
?line {Loopback,Pb,
#sctp_assoc_change{state=shutdown_comp,
error=0,
assoc_id=SaAssocId}} =
- recv_event(ok(gen_sctp:recv(Sa, infinity))),
+ recv_event(log_ok(gen_sctp:recv(Sa, infinity))),
?line {Loopback,Pa,
#sctp_assoc_change{state=shutdown_comp,
error=0,
assoc_id=SbAssocId}} =
- recv_event(ok(gen_sctp:recv(Sb, infinity))),
+ recv_event(log_ok(gen_sctp:recv(Sb, infinity))),
?line ok = gen_sctp:close(Sa),
?line ok = gen_sctp:close(Sb),
@@ -338,11 +338,11 @@ def_sndrcvinfo(Config) when is_list(Config) ->
?line Data = <<"What goes up, must come down.">>,
%%
?line S1 =
- ok(gen_sctp:open(
+ log_ok(gen_sctp:open(
0, [{sctp_default_send_param,#sctp_sndrcvinfo{ppid=17}}])),
?LOGVAR(S1),
?line P1 =
- ok(inet:port(S1)),
+ log_ok(inet:port(S1)),
?LOGVAR(P1),
?line #sctp_sndrcvinfo{ppid=17, context=0, timetolive=0, assoc_id=0} =
getopt(S1, sctp_default_send_param),
@@ -350,10 +350,10 @@ def_sndrcvinfo(Config) when is_list(Config) ->
gen_sctp:listen(S1, true),
%%
?line S2 =
- ok(gen_sctp:open()),
+ log_ok(gen_sctp:open()),
?LOGVAR(S2),
?line P2 =
- ok(inet:port(S2)),
+ log_ok(inet:port(S2)),
?LOGVAR(P2),
?line #sctp_sndrcvinfo{ppid=0, context=0, timetolive=0, assoc_id=0} =
getopt(S2, sctp_default_send_param),
@@ -362,9 +362,9 @@ def_sndrcvinfo(Config) when is_list(Config) ->
state=comm_up,
error=0,
assoc_id=S2AssocId} = S2AssocChange =
- ok(gen_sctp:connect(S2, Loopback, P1, [])),
+ log_ok(gen_sctp:connect(S2, Loopback, P1, [])),
?LOGVAR(S2AssocChange),
- ?line case recv_event(ok(gen_sctp:recv(S1))) of
+ ?line case recv_event(log_ok(gen_sctp:recv(S1))) of
{Loopback,P2,
#sctp_assoc_change{
state=comm_up,
@@ -382,7 +382,7 @@ def_sndrcvinfo(Config) when is_list(Config) ->
state=comm_up,
error=0,
assoc_id=S1AssocId}} =
- recv_event(ok(gen_sctp:recv(S1)))
+ recv_event(log_ok(gen_sctp:recv(S1)))
end,
?line #sctp_sndrcvinfo{
@@ -396,7 +396,7 @@ def_sndrcvinfo(Config) when is_list(Config) ->
%%
?line ok =
gen_sctp:send(S1, S1AssocId, 1, <<"1: ",Data/binary>>),
- ?line case ok(gen_sctp:recv(S2)) of
+ ?line case log_ok(gen_sctp:recv(S2)) of
{Loopback,P1,
[#sctp_sndrcvinfo{
stream=1, ppid=17, context=0, assoc_id=S2AssocId}],
@@ -412,7 +412,7 @@ def_sndrcvinfo(Config) when is_list(Config) ->
[#sctp_sndrcvinfo{
stream=1, ppid=17, context=0, assoc_id=S2AssocId}],
<<"1: ",Data/binary>>} =
- ok(gen_sctp:recv(S2))
+ log_ok(gen_sctp:recv(S2))
end,
%%
?line ok =
@@ -432,7 +432,7 @@ def_sndrcvinfo(Config) when is_list(Config) ->
%%
?line ok =
gen_sctp:send(S1, S1AssocId, 0, <<"2: ",Data/binary>>),
- ?line case ok(gen_sctp:recv(S2)) of
+ ?line case log_ok(gen_sctp:recv(S2)) of
{Loopback,P1,
[#sctp_sndrcvinfo{
stream=0, ppid=19, context=0, assoc_id=S2AssocId}],
@@ -440,7 +440,7 @@ def_sndrcvinfo(Config) when is_list(Config) ->
end,
?line ok =
gen_sctp:send(S2, S2AssocChange, 1, <<"3: ",Data/binary>>),
- ?line case ok(gen_sctp:recv(S1)) of
+ ?line case log_ok(gen_sctp:recv(S1)) of
{Loopback,P2,
[#sctp_sndrcvinfo{
stream=1, ppid=0, context=0, assoc_id=S1AssocId}],
@@ -451,7 +451,7 @@ def_sndrcvinfo(Config) when is_list(Config) ->
addr={Loopback,_}, state=addr_available,
error=0, assoc_id=S1AssocId}} =
recv_event(Event2),
- ?line case ok(gen_sctp:recv(S1)) of
+ ?line case log_ok(gen_sctp:recv(S1)) of
{Loopback,P2,
[#sctp_sndrcvinfo{
stream=1, ppid=0, context=0,
@@ -467,7 +467,7 @@ def_sndrcvinfo(Config) when is_list(Config) ->
#sctp_sndrcvinfo{stream=0, ppid=20, assoc_id=S2AssocId},
<<"4: ",Data/binary>>)
end),
- ?line case ok(do_from_other_process(fun() -> gen_sctp:recv(S1) end)) of
+ ?line case log_ok(do_from_other_process(fun() -> gen_sctp:recv(S1) end)) of
{Loopback,P2,
[#sctp_sndrcvinfo{
stream=0, ppid=20, context=0, assoc_id=S1AssocId}],
@@ -496,9 +496,9 @@ getopt(S, Opt, Param) ->
setopt(S, Opt, Val) ->
inet:setopts(S, [{Opt,Val}]).
-ok({ok,X}) ->
- io:format("OK[~w]: ~p~n", [self(),X]),
- X.
+log_ok(X) -> log(ok(X)).
+
+ok({ok,X}) -> X.
log(X) ->
io:format("LOG[~w]: ~p~n", [self(),X]),
@@ -640,16 +640,16 @@ api_connect_init(Config) when is_list(Config) ->
?line case gen_sctp:connect_init(Sa, localhost, Pb, []) of
{error,econnrefused} ->
?line {Localhost,Pb,#sctp_assoc_change{state=comm_lost}} =
- recv_event(ok(gen_sctp:recv(Sa, infinity)));
+ recv_event(log_ok(gen_sctp:recv(Sa, infinity)));
ok ->
?line {Localhost,Pb,#sctp_assoc_change{state=cant_assoc}} =
- recv_event(ok(gen_sctp:recv(Sa, infinity)))
+ recv_event(log_ok(gen_sctp:recv(Sa, infinity)))
end,
?line ok = gen_sctp:listen(Sb, true),
?line case gen_sctp:connect_init(Sa, localhost, Pb, []) of
ok ->
?line {Localhost,Pb,#sctp_assoc_change{state=comm_up}} =
- recv_event(ok(gen_sctp:recv(Sa, infinity)))
+ recv_event(log_ok(gen_sctp:recv(Sa, infinity)))
end,
?line ok = gen_sctp:close(Sa),
?line ok = gen_sctp:close(Sb),
@@ -699,7 +699,7 @@ api_opts(Config) when is_list(Config) ->
end.
implicit_inet6(Config) when is_list(Config) ->
- ?line Hostname = ok(inet:gethostname()),
+ ?line Hostname = log_ok(inet:gethostname()),
?line
case gen_sctp:open(0, [inet6]) of
{ok,S1} ->
@@ -712,16 +712,16 @@ implicit_inet6(Config) when is_list(Config) ->
?line ok = gen_sctp:close(S1),
%%
?line Localhost =
- ok(inet:getaddr("localhost", inet6)),
+ log_ok(inet:getaddr("localhost", inet6)),
?line io:format("~s ~p~n", ["localhost",Localhost]),
?line S2 =
- ok(gen_sctp:open(0, [{ip,Localhost}])),
+ log_ok(gen_sctp:open(0, [{ip,Localhost}])),
?line implicit_inet6(S2, Localhost),
?line ok = gen_sctp:close(S2),
%%
?line io:format("~s ~p~n", [Hostname,Host]),
?line S3 =
- ok(gen_sctp:open(0, [{ifaddr,Host}])),
+ log_ok(gen_sctp:open(0, [{ifaddr,Host}])),
?line implicit_inet6(S3, Host),
?line ok = gen_sctp:close(S1);
{error,eafnosupport} ->
@@ -734,25 +734,25 @@ implicit_inet6(Config) when is_list(Config) ->
implicit_inet6(S1, Addr) ->
?line ok = gen_sctp:listen(S1, true),
- ?line P1 = ok(inet:port(S1)),
- ?line S2 = ok(gen_sctp:open(0, [inet6])),
- ?line P2 = ok(inet:port(S2)),
+ ?line P1 = log_ok(inet:port(S1)),
+ ?line S2 = log_ok(gen_sctp:open(0, [inet6])),
+ ?line P2 = log_ok(inet:port(S2)),
?line #sctp_assoc_change{state=comm_up} =
- ok(gen_sctp:connect(S2, Addr, P1, [])),
- ?line case recv_event(ok(gen_sctp:recv(S1))) of
+ log_ok(gen_sctp:connect(S2, Addr, P1, [])),
+ ?line case recv_event(log_ok(gen_sctp:recv(S1))) of
{Addr,P2,#sctp_assoc_change{state=comm_up}} ->
ok;
{Addr,P2,#sctp_paddr_change{state=addr_confirmed,
addr={Addr,P2},
error=0}} ->
{Addr,P2,#sctp_assoc_change{state=comm_up}} =
- recv_event(ok(gen_sctp:recv(S1)))
+ recv_event(log_ok(gen_sctp:recv(S1)))
end,
- ?line case ok(inet:sockname(S1)) of
+ ?line case log_ok(inet:sockname(S1)) of
{Addr,P1} -> ok;
{{0,0,0,0,0,0,0,0},P1} -> ok
end,
- ?line case ok(inet:sockname(S2)) of
+ ?line case log_ok(inet:sockname(S2)) of
{Addr,P2} -> ok;
{{0,0,0,0,0,0,0,0},P2} -> ok
end,
@@ -792,7 +792,7 @@ xfer_stream_min(Config) when is_list(Config) ->
assoc_id=SaAssocId}} =
gen_sctp:connect(Sa, Loopback, Pb, []),
?line {SbOutboundStreams,SbInboundStreams,SbAssocId} =
- case recv_event(ok(gen_sctp:recv(Sb, infinity))) of
+ case recv_event(log_ok(gen_sctp:recv(Sb, infinity))) of
{Loopback,Pa,
#sctp_assoc_change{state=comm_up,
error=0,
@@ -811,7 +811,7 @@ xfer_stream_min(Config) when is_list(Config) ->
outbound_streams=OS,
inbound_streams=IS,
assoc_id=AI}} =
- recv_event(ok(gen_sctp:recv(Sb, infinity))),
+ recv_event(log_ok(gen_sctp:recv(Sb, infinity))),
{OS,IS,AI}
end,
?line SaOutboundStreams = SbInboundStreams,
@@ -838,7 +838,7 @@ xfer_stream_min(Config) when is_list(Config) ->
?line ok =
do_from_other_process(
fun () -> gen_sctp:send(Sb, SbAssocId, 0, Data) end),
- ?line case ok(gen_sctp:recv(Sa, infinity)) of
+ ?line case log_ok(gen_sctp:recv(Sa, infinity)) of
{Loopback,Pb,
[#sctp_sndrcvinfo{stream=Stream,
assoc_id=SaAssocId}],
@@ -854,17 +854,17 @@ xfer_stream_min(Config) when is_list(Config) ->
[#sctp_sndrcvinfo{stream=Stream,
assoc_id=SaAssocId}],
Data} =
- ok(gen_sctp:recv(Sa, infinity))
+ log_ok(gen_sctp:recv(Sa, infinity))
end,
?line ok = gen_sctp:close(Sa),
?line {Loopback,Pa,
#sctp_shutdown_event{assoc_id=SbAssocId}} =
- recv_event(ok(gen_sctp:recv(Sb, infinity))),
+ recv_event(log_ok(gen_sctp:recv(Sb, infinity))),
?line {Loopback,Pa,
#sctp_assoc_change{state=shutdown_comp,
error=0,
assoc_id=SbAssocId}} =
- recv_event(ok(gen_sctp:recv(Sb, infinity))),
+ recv_event(log_ok(gen_sctp:recv(Sb, infinity))),
?line ok = gen_sctp:close(Sb),
?line receive
@@ -911,42 +911,97 @@ peeloff(Config) when is_list(Config) ->
?line Addr = {127,0,0,1},
?line Stream = 0,
?line Timeout = 333,
- ?line S1 = socket_start(Addr, Timeout),
- ?line P1 = socket_call(S1, port),
- ?line Socket1 = socket_call(S1, socket),
- ?line ok = socket_call(S1, {listen,true}),
- ?line S2 = socket_start(Addr, Timeout),
- ?line P2 = socket_call(S2, port),
- ?line Socket2 = socket_call(S2, socket),
+ ?line S1 = socket_open([{ifaddr,Addr}], Timeout),
+ ?line ?LOGVAR(S1),
+ ?line P1 = socket_call(S1, get_port),
+ ?line ?LOGVAR(P1),
+ ?line Socket1 = socket_call(S1, get_socket),
+ ?line ?LOGVAR(Socket1),
+ ?line socket_call(S1, {listen,true}),
+ ?line S2 = socket_open([{ifaddr,Addr}], Timeout),
+ ?line ?LOGVAR(S2),
+ ?line P2 = socket_call(S2, get_port),
+ ?line ?LOGVAR(P2),
+ ?line Socket2 = socket_call(S2, get_socket),
+ ?line ?LOGVAR(Socket2),
%%
- ?line H_a = socket_req(S1, recv_assoc),
- ?line {S2Ai,Sa,Sb} = socket_call(S2, {connect,Addr,P1,[]}),
- ?line {S1Ai,Sb,Sa,Addr,P2} = socket_resp(H_a),
+ ?line socket_call(S2, {connect_init,Addr,P1,[]}),
+ ?line S2Ai =
+ receive
+ {S2,{Addr,P1,
+ #sctp_assoc_change{
+ state=comm_up,
+ assoc_id=AssocId2}}} -> AssocId2
+ after Timeout ->
+ socket_bailout([S1,S2])
+ end,
+ ?line ?LOGVAR(S2Ai),
+ ?line S1Ai =
+ receive
+ {S1,{Addr,P2,
+ #sctp_assoc_change{
+ state=comm_up,
+ assoc_id=AssocId1}}} -> AssocId1
+ after Timeout ->
+ socket_bailout([S1,S2])
+ end,
+ ?line ?LOGVAR(S1Ai),
%%
- ?line H_b = socket_req(S1, recv),
- ?line ok = socket_call(S2, {send,S2Ai,Stream,<<"Data H_b">>}),
- ?line {Addr,P2,S1Ai,Stream,<<"Data H_b">>} = socket_resp(H_b),
- ?line H_c = socket_req(S1, {recv,Socket2}),
- ?line ok =
- socket_call(S2, {send,Socket1,S1Ai,Stream,<<"Data H_c">>}),
- ?line {Addr,P1,S2Ai,Stream,<<"Data H_c">>} = socket_resp(H_c),
+ ?line socket_call(S2, {send,S2Ai,Stream,<<"Number one">>}),
+ ?line
+ receive
+ {S1,{Addr,P2,S1Ai,Stream,<<"Number one">>}} -> ok
+ after Timeout ->
+ socket_bailout([S1,S2])
+ end,
+ ?line socket_call(S2, {send,Socket1,S1Ai,Stream,<<"Number two">>}),
+ ?line
+ receive
+ {S2,{Addr,P1,S2Ai,Stream,<<"Number two">>}} -> ok
+ after Timeout ->
+ socket_bailout([S1,S2])
+ end,
%%
?line S3 = socket_peeloff(Socket1, S1Ai, Timeout),
- ?line P3 = socket_call(S3, port),
- ?line Socket3 = socket_call(S3, socket),
+ ?line ?LOGVAR(S3),
+ ?line P3 = socket_call(S3, get_port),
+ ?line ?LOGVAR(P3),
?line S3Ai = S1Ai,
+ ?line ?LOGVAR(S3Ai),
%%
- ?line H_d = socket_req(S2, recv),
- ?line ok = socket_call(S3, {send,S3Ai,Stream,<<"Data H_d">>}),
- ?line {Addr,P3,S2Ai,Stream,<<"Data H_d">>} = socket_resp(H_d),
- ?line ok = socket_call(S3, {send,Socket2,S2Ai,Stream,<<"Data S2">>}),
- ?line {Addr,P2,S3Ai,Stream,<<"Data S2">>} = socket_call(S2, {recv,Socket3}),
+ ?line socket_call(S3, {send,S3Ai,Stream,<<"Number three">>}),
+ ?line
+ receive
+ {S2,{Addr,P3,S2Ai,Stream,<<"Number three">>}} -> ok
+ after Timeout ->
+ socket_bailout([S1,S2,S3])
+ end,
+ ?line socket_call(S3, {send,Socket2,S2Ai,Stream,<<"Number four">>}),
+ ?line
+ receive
+ {S3,{Addr,P2,S3Ai,Stream,<<"Number four">>}} -> ok
+ after Timeout ->
+ socket_bailout([S1,S2,S3])
+ end,
%%
?line inet:i(sctp),
- ?line ok = socket_stop(S1),
- ?line ok = socket_stop(S2),
- ?line {Addr,P2,#sctp_shutdown_event{assoc_id=S1Ai}} =
- recv_event(ok(socket_stop(S3))),
+ ?line socket_close_verbose(S1),
+ ?line socket_close_verbose(S2),
+ ?line
+ receive
+ {S3,{Addr,P2,#sctp_shutdown_event{assoc_id=S3Ai}}} -> ok
+ after Timeout ->
+ socket_bailout([S3])
+ end,
+ ?line
+ receive
+ {S3,{Addr,P2,#sctp_assoc_change{state=shutdown_comp,
+ assoc_id=S3Ai}}} -> ok
+ after Timeout ->
+ socket_bailout([S3])
+ end,
+ ?line socket_close_verbose(S3),
+ ?line [] = flush(),
ok.
@@ -960,28 +1015,65 @@ buffers(Config) when is_list(Config) ->
?line Data = mk_data(Limit),
?line Addr = {127,0,0,1},
?line Stream = 1,
- ?line Timeout = 333,
- ?line S1 = socket_start(Addr, Timeout),
- ?line P1 = socket_call(S1, port),
+ ?line Timeout = 1333,
+ ?line S1 = socket_open([{ip,Addr}], Timeout),
+ ?line ?LOGVAR(S1),
+ ?line P1 = socket_call(S1, get_port),
+ ?line ?LOGVAR(P1),
?line ok = socket_call(S1, {listen,true}),
- ?line S2 = socket_start(Addr, Timeout),
- ?line P2 = socket_call(S2, port),
+ ?line S2 = socket_open([{ip,Addr}], Timeout),
+ ?line ?LOGVAR(S2),
+ ?line P2 = socket_call(S2, get_port),
+ ?line ?LOGVAR(P2),
%%
- ?line H_a = socket_req(S1, recv_assoc),
- ?line {S2Ai,Sa,Sb} = socket_call(S2, {connect,Addr,P1,[]}),
- ?line {S1Ai,Sb,Sa,Addr,P2} = socket_resp(H_a),
+ ?line socket_call(S2, {connect_init,Addr,P1,[]}),
+ ?line S2Ai =
+ receive
+ {S2,{Addr,P1,
+ #sctp_assoc_change{
+ state=comm_up,
+ assoc_id=AssocId2}}} -> AssocId2
+ after Timeout ->
+ socket_bailout([S1,S2])
+ end,
+ ?line S1Ai =
+ receive
+ {S1,{Addr,P2,
+ #sctp_assoc_change{
+ state=comm_up,
+ assoc_id=AssocId1}}} -> AssocId1
+ after Timeout ->
+ socket_bailout([S1,S2])
+ end,
%%
- ?line ok = socket_call(S1, {setopts,[{recbuf,Limit}]}),
+ ?line socket_call(S1, {setopts,[{recbuf,Limit}]}),
?line case socket_call(S1, {getopts,[recbuf]}) of
- {ok,[{recbuf,RB1}]} when RB1 >= Limit -> ok
+ [{recbuf,RB1}] when RB1 >= Limit -> ok
end,
- ?line H_b = socket_req(S1, recv),
- ?line ok = socket_call(S2, {send,S2Ai,Stream,Data}),
- ?line {Addr,P2,S1Ai,Stream,Data} = socket_resp(H_b),
+ ?line socket_call(S2, {send,S2Ai,Stream,Data}),
+ ?line
+ receive
+ {S1,{Addr,P2,S1Ai,Stream,Data}} -> ok
+ after Timeout ->
+ socket_bailout([S1,S2])
+ end,
%%
- ?line ok = socket_stop(S1),
- ?line {Addr,P1,#sctp_shutdown_event{assoc_id=S2Ai}} =
- recv_event(ok(socket_stop(S2))),
+ ?line socket_close_verbose(S1),
+ ?line
+ receive
+ {S2,{Addr,P1,#sctp_shutdown_event{assoc_id=S2Ai}}} -> ok
+ after Timeout ->
+ socket_bailout([S2])
+ end,
+ ?line
+ receive
+ {S2,{Addr,P1,#sctp_assoc_change{state=shutdown_comp,
+ assoc_id=S2Ai}}} -> ok
+ after Timeout ->
+ socket_bailout([S2])
+ end,
+ ?line socket_close_verbose(S2),
+ ?line [] = flush(),
ok.
mk_data(Words) ->
@@ -995,38 +1087,101 @@ mk_data(N, Words, Bin) ->
%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% socket gen_server ultra light
+socket_open(SocketOpts, Timeout) ->
+ Opts = [{type,seqpacket},{active,once},binary|SocketOpts],
+ Starter =
+ fun () ->
+ {ok,Socket} =
+ gen_sctp:open(Opts),
+ Socket
+ end,
+ s_start(Starter, Timeout).
+
socket_peeloff(Socket, AssocId, Timeout) ->
+ Opts = [{active,once},binary],
Starter =
fun () ->
{ok,NewSocket} =
gen_sctp:peeloff(Socket, AssocId),
+ ok = inet:setopts(NewSocket, Opts),
NewSocket
end,
- socket_starter(Starter, Timeout).
+ s_start(Starter, Timeout).
+
+socket_close_verbose(S) ->
+ History = socket_history(socket_close(S)),
+ io:format("socket_close ~p:~n ~p.~n", [S,History]),
+ History.
+
+socket_close(S) ->
+ s_req(S, close).
+
+socket_call(S, Request) ->
+ s_req(S, {Request}).
+
+%% socket_get(S, Key) ->
+%% s_req(S, {get,Key}).
+
+socket_bailout([S|Ss]) ->
+ History = socket_history(socket_close(S)),
+ io:format("bailout ~p:~n ~p.~n", [S,History]),
+ socket_bailout(Ss);
+socket_bailout([]) ->
+ io:format("flush: ~p.~n", [flush()]),
+ test_server:fail(socket_bailout).
+
+socket_history({State,Flush}) ->
+ {lists:keysort(
+ 2,
+ lists:flatten(
+ [[{Key,Val} || Val <- Vals]
+ || {Key,Vals} <- gb_trees:to_list(State)])),
+ Flush}.
+
+s_handler(Socket) ->
+ fun ({listen,Listen}) ->
+ ok = gen_sctp:listen(Socket, Listen);
+ (get_port) ->
+ ok(inet:port(Socket));
+ (get_socket) ->
+ Socket;
+ ({connect_init,ConAddr,ConPort,ConOpts}) ->
+ ok = gen_sctp:connect_init(Socket, ConAddr, ConPort, ConOpts);
+ ({send,AssocId,Stream,Data}) ->
+ ok = gen_sctp:send(Socket, AssocId, Stream, Data);
+ ({send,OtherSocket,AssocId,Stream,Data}) ->
+ ok = gen_sctp:send(OtherSocket, AssocId, Stream, Data);
+ ({setopts,Opts}) ->
+ ok = inet:setopts(Socket, Opts);
+ ({getopts,Optnames}) ->
+ ok(inet:getopts(Socket, Optnames))
+ end.
-socket_start(Addr, Timeout) ->
- Starter =
- fun () ->
- {ok,Socket} =
- gen_sctp:open([{type,seqpacket},{ifaddr,Addr}]),
- Socket
- end,
- socket_starter(Starter, Timeout).
+s_req(S, Req) ->
+ Mref = erlang:monitor(process, S),
+ S ! {self(),Mref,Req},
+ receive
+ {'DOWN',Mref,_,_,Error} ->
+ exit(Error);
+ {S,Mref,Reply} ->
+ erlang:demonitor(Mref),
+ receive {'DOWN',Mref,_,_,_} -> ok after 0 -> ok end,
+ Reply
+ end.
-socket_starter(Starter, Timeout) ->
+s_start(Starter, Timeout) ->
Parent = self(),
Owner =
spawn_link(
fun () ->
- socket_starter(Starter(), Timeout, Parent)
+ s_start(Starter(), Timeout, Parent)
end),
- io:format("Started socket ~w.~n", [Owner]),
Owner.
-socket_starter(Socket, Timeout, Parent) ->
+s_start(Socket, Timeout, Parent) ->
+ Handler = s_handler(Socket),
try
- Handler = socket_handler(Socket, Timeout),
- socket_loop(Socket, Timeout, Parent, Handler)
+ s_loop(Socket, Timeout, Parent, Handler, gb_trees:empty())
catch
Class:Reason ->
Stacktrace = erlang:get_stacktrace(),
@@ -1035,150 +1190,112 @@ socket_starter(Socket, Timeout, Parent) ->
erlang:raise(Class, Reason, Stacktrace)
end.
-socket_loop(Socket, Timeout, Parent, Handler) ->
+s_loop(Socket, Timeout, Parent, Handler, State) ->
receive
- {Parent,Ref} -> % socket_stop()
- Result =
- case log(gen_sctp:recv(Socket, Timeout)) of
- {error,timeout} -> ok;
- R -> R
- end,
+ {Parent,Ref,close} -> % socket_close()
+ erlang:send_after(Timeout, self(), {Parent,Ref,exit}),
+ s_loop(Socket, Timeout, Parent, Handler, State);
+ {Parent,Ref,exit} ->
ok = gen_sctp:close(Socket),
- Parent ! {self(),Ref, Result};
- {Parent,Ref,Msg} ->
- Parent ! {self(),Ref,Handler(Msg)},
- socket_loop(Socket, Timeout, Parent, Handler)
+ Key = exit,
+ Val = {now(),Socket},
+ NewState = gb_push(Key, Val, State),
+ Parent ! {self(),Ref,{NewState,flush()}};
+ {Parent,Ref,{Msg}} ->
+ Result = Handler(Msg),
+ Key = req,
+ Val = {now(),{Msg,Result}},
+ NewState = gb_push(Key, Val, State),
+ Parent ! {self(),Ref,Result},
+ s_loop(Socket, Timeout, Parent, Handler, NewState);
+ %% {Parent,Ref,{get,Key}} ->
+ %% Parent ! {self(),Ref,gb_get(Key, State)},
+ %% s_loop(Socket, Timeout, Parent, Handler, State);
+ {sctp,Socket,Addr,Port,
+ {[#sctp_sndrcvinfo{stream=Stream,assoc_id=AssocId}=SRI],Data}}
+ when not is_tuple(Data) ->
+ case gb_get({assoc_change,AssocId}, State) of
+ [{_,{Addr,Port,
+ #sctp_assoc_change{
+ state=comm_up,
+ inbound_streams=Is}}}|_]
+ when 0 =< Stream, Stream < Is-> ok;
+ [] -> ok
+ end,
+ Key = {msg,AssocId,Stream},
+ Val = {now(),{Addr,Port,SRI,Data}},
+ NewState = gb_push(Key, Val, State),
+ Parent ! {self(),{Addr,Port,AssocId,Stream,Data}},
+ again(Socket),
+ s_loop(Socket, Timeout, Parent, Handler, NewState);
+ {sctp,Socket,Addr,Port,
+ {SRI,#sctp_assoc_change{assoc_id=AssocId,state=St}=SAC}} ->
+ case SRI of
+ [#sctp_sndrcvinfo{assoc_id=AssocId,stream=0}] -> ok;
+ [] -> ok
+ end,
+ Key = {assoc_change,AssocId},
+ Val = {now(),{Addr,Port,SAC}},
+ case {gb_get(Key, State),St} of
+ {[],_} -> ok;
+ {[{_,{Addr,Port,#sctp_assoc_change{state=comm_up}}}|_],_}
+ when St =:= comm_lost; St =:= shutdown_comp -> ok
+ end,
+ NewState = gb_push(Key, Val, State),
+ Parent ! {self(),{Addr,Port,SAC}},
+ again(Socket),
+ s_loop(Socket, Timeout, Parent, Handler, NewState);
+ {sctp,Socket,Addr,Port,
+ {SRI,#sctp_paddr_change{assoc_id=AssocId,
+ addr={_,Port},
+ state=St}=SPC}} ->
+ case SRI of
+ [#sctp_sndrcvinfo{assoc_id=AssocId,stream=0}] -> ok;
+ [] -> ok
+ end,
+ case {gb_get({assoc_change,AssocId}, State),St} of
+ {[],_} when St =:= addr_confirmed -> ok
+ end,
+ Key = {paddr_change,AssocId},
+ Val = {now(),{Addr,Port,SPC}},
+ NewState = gb_push(Key, Val, State),
+ again(Socket),
+ s_loop(Socket, Timeout, Parent, Handler, NewState);
+ {sctp,Socket,Addr,Port,
+ {SRI,#sctp_shutdown_event{assoc_id=AssocId}=SSE}} ->
+ case SRI of
+ [#sctp_sndrcvinfo{assoc_id=AssocId,stream=0}] -> ok;
+ [] -> ok
+ end,
+ case gb_get({assoc_change,AssocId}, State) of
+ [{_,{Addr,Port,#sctp_assoc_change{state=comm_up}}}|_] -> ok;
+ [] -> ok
+ end,
+ Key = {shutdown_event,AssocId},
+ Val = {now(),{Addr,Port}},
+ NewState = gb_push(Key, Val, State),
+ Parent ! {self(), {Addr,Port,SSE}},
+ again(Socket),
+ s_loop(Socket, Timeout, Parent, Handler, NewState);
+ Unexpected ->
+ erlang:error({unexpected,Unexpected})
end.
-socket_handler(Socket, Timeout) ->
- fun ({listen,Listen}) ->
- gen_sctp:listen(Socket, Listen);
- (port) ->
- ok(inet:port(Socket));
- (socket) ->
- Socket;
- (recv_assoc) ->
- case recv_event(ok(gen_sctp:recv(Socket, Timeout))) of
- {AssocAddr,AssocPort,
- #sctp_paddr_change{state=addr_confirmed,
- addr={_,AssocPort},
- error=0,
- assoc_id=AssocId}} ->
- {AssocAddr,AssocPort,
- #sctp_assoc_change{state=comm_up,
- error=0,
- outbound_streams=Os,
- inbound_streams=Is,
- assoc_id=AssocId}} =
- recv_event(ok(gen_sctp:recv(Socket, infinity))),
- {AssocId,Os,Is,AssocAddr,AssocPort};
- {AssocAddr,AssocPort,
- #sctp_assoc_change{state=comm_up,
- error=0,
- outbound_streams=Os,
- inbound_streams=Is,
- assoc_id=AssocId}} ->
- {AssocId,Os,Is,AssocAddr,AssocPort}
- end;
- %% {AssocAddr,AssocPort,[],
- %% #sctp_assoc_change{state=comm_up,
- %% error=0,
- %% outbound_streams=Os,
- %% inbound_streams=Is,
- %% assoc_id=AssocId}} =
- %% ok(gen_sctp:recv(Socket, infinity)),
- %% case log(gen_sctp:recv(Socket, Timeout)) of
- %% {ok,AssocAddr,AssocPort,[],
- %% #sctp_paddr_change{addr = {AssocAddr,AssocPort},
- %% state = addr_available,
- %% error = 0,
- %% assoc_id = AssocId}} -> ok;
- %% {error,timeout} -> ok
- %% end,
- %% {AssocId,Os,Is,AssocAddr,AssocPort};
- ({connect,ConAddr,ConPort,ConOpts}) ->
- ok = gen_sctp:connect_init(Socket, ConAddr, ConPort, ConOpts),
- case recv_event(ok(gen_sctp:recv(Socket, Timeout))) of
- {ConAddr,ConPort,
- #sctp_paddr_change{state=addr_confirmed,
- addr={_,ConPort},
- error=0,
- assoc_id=AssocId}} ->
- {ConAddr,ConPort,
- #sctp_assoc_change{state=comm_up,
- error=0,
- outbound_streams=Os,
- inbound_streams=Is,
- assoc_id=AssocId}} =
- recv_event(ok(gen_sctp:recv(Socket, infinity))),
- {AssocId,Os,Is};
- {ConAddr,ConPort,
- #sctp_assoc_change{state=comm_up,
- error=0,
- outbound_streams=Os,
- inbound_streams=Is,
- assoc_id=AssocId}} ->
- {AssocId,Os,Is}
- end;
- %% #sctp_assoc_change{state=comm_up,
- %% error=0,
- %% outbound_streams=Os,
- %% inbound_streams=Is,
- %% assoc_id=AssocId} =
- %% ok(gen_sctp:connect(Socket, ConAddr, ConPort, ConOpts)),
- %% case log(gen_sctp:recv(Socket, Timeout)) of
- %% {ok,ConAddr,ConPort,[],
- %% #sctp_paddr_change{addr = {ConAddr,ConPort},
- %% state = addr_available,
- %% error = 0,
- %% assoc_id = AssocId}} -> ok;
- %% {error,timeout} -> ok
- %% end,
- %% {AssocId,Os,Is};
- ({send,AssocId,Stream,Data}) ->
- gen_sctp:send(Socket, AssocId, Stream, Data);
- ({send,S,AssocId,Stream,Data}) ->
- gen_sctp:send(S, AssocId, Stream, Data);
- (recv) ->
- {Addr,Port,
- [#sctp_sndrcvinfo{stream=Stream,assoc_id=AssocId}],Data} =
- ok(gen_sctp:recv(Socket, infinity)),
- {Addr,Port,AssocId,Stream,Data};
- ({recv,S}) ->
- {Addr,Port,
- [#sctp_sndrcvinfo{stream=Stream,assoc_id=AssocId}],Data} =
- ok(gen_sctp:recv(S, infinity)),
- {Addr,Port,AssocId,Stream,Data};
- ({setopts,Opts}) ->
- inet:setopts(Socket, Opts);
- ({getopts,Optnames}) ->
- inet:getopts(Socket, Optnames)
- end.
+again(Socket) ->
+ inet:setopts(Socket, [{active,once}]).
-socket_stop(Handler) ->
- Mref = erlang:monitor(process, Handler),
- Handler ! {self(),Mref},
- receive
- {Handler,Mref,Result} ->
- receive {'DOWN',Mref,_,_,_} -> Result end;
- {'DOWN',Mref,_,_,Error} ->
- exit(Error)
+gb_push(Key, Val, GBT) ->
+ case gb_trees:lookup(Key, GBT) of
+ none ->
+ gb_trees:insert(Key, [Val], GBT);
+ {value,V} ->
+ gb_trees:update(Key, [Val|V], GBT)
end.
-socket_call(Handler, Request) ->
- socket_resp(socket_req(Handler, Request)).
-
-socket_req(Handler, Request) ->
- Mref = erlang:monitor(process, Handler),
- Handler ! {self(),Mref,Request},
- {Handler,Mref}.
-
-socket_resp({Handler,Mref}) ->
- receive
- {'DOWN',Mref,_,_,Error} ->
- exit(Error);
- {Handler,Mref,Reply} ->
- erlang:demonitor(Mref),
- receive {'DOWN',Mref,_,_,_} -> ok after 0 -> ok end,
- Reply
+gb_get(Key, GBT) ->
+ case gb_trees:lookup(Key, GBT) of
+ none ->
+ [];
+ {value,V} ->
+ V
end.