diff options
-rw-r--r-- | lib/kernel/test/gen_sctp_SUITE.erl | 603 |
1 files changed, 360 insertions, 243 deletions
diff --git a/lib/kernel/test/gen_sctp_SUITE.erl b/lib/kernel/test/gen_sctp_SUITE.erl index 8011c22ce5..07434df8da 100644 --- a/lib/kernel/test/gen_sctp_SUITE.erl +++ b/lib/kernel/test/gen_sctp_SUITE.erl @@ -110,7 +110,7 @@ xfer_min(Config) when is_list(Config) -> assoc_id=SaAssocId}=SaAssocChange} = gen_sctp:connect(Sa, Loopback, Pb, []), ?line {SbAssocId,SaOutboundStreams,SaInboundStreams} = - case recv_event(ok(gen_sctp:recv(Sb, infinity))) of + case recv_event(log_ok(gen_sctp:recv(Sb, infinity))) of {Loopback,Pa, #sctp_assoc_change{state=comm_up, error=0, @@ -129,12 +129,12 @@ xfer_min(Config) when is_list(Config) -> outbound_streams=SbOutboundStreams, inbound_streams=SbInboundStreams, assoc_id=AssocId}} = - ?line recv_event(ok(gen_sctp:recv(Sb, infinity))), + ?line recv_event(log_ok(gen_sctp:recv(Sb, infinity))), {AssocId,SbInboundStreams,SbOutboundStreams} end, ?line ok = gen_sctp:send(Sa, SaAssocId, 0, Data), - ?line case ok(gen_sctp:recv(Sb, infinity)) of + ?line case log_ok(gen_sctp:recv(Sb, infinity)) of {Loopback, Pa, [#sctp_sndrcvinfo{stream=Stream, @@ -154,7 +154,7 @@ xfer_min(Config) when is_list(Config) -> Data}} = gen_sctp:recv(Sb, infinity) end, ?line ok = gen_sctp:send(Sb, SbAssocId, 0, Data), - ?line case ok(gen_sctp:recv(Sa, infinity)) of + ?line case log_ok(gen_sctp:recv(Sa, infinity)) of {Loopback,Pb, [#sctp_sndrcvinfo{stream=Stream, assoc_id=SaAssocId}], @@ -172,22 +172,22 @@ xfer_min(Config) when is_list(Config) -> [#sctp_sndrcvinfo{stream=Stream, assoc_id=SaAssocId}], Data} = - ok(gen_sctp:recv(Sa, infinity)) + log_ok(gen_sctp:recv(Sa, infinity)) end, %% ?line ok = gen_sctp:eof(Sa, SaAssocChange), ?line {Loopback,Pa,#sctp_shutdown_event{assoc_id=SbAssocId}} = - recv_event(ok(gen_sctp:recv(Sb, infinity))), + recv_event(log_ok(gen_sctp:recv(Sb, infinity))), ?line {Loopback,Pb, #sctp_assoc_change{state=shutdown_comp, error=0, assoc_id=SaAssocId}} = - recv_event(ok(gen_sctp:recv(Sa, infinity))), + recv_event(log_ok(gen_sctp:recv(Sa, infinity))), ?line {Loopback,Pa, #sctp_assoc_change{state=shutdown_comp, error=0, assoc_id=SbAssocId}} = - recv_event(ok(gen_sctp:recv(Sb, infinity))), + recv_event(log_ok(gen_sctp:recv(Sb, infinity))), ?line ok = gen_sctp:close(Sa), ?line ok = gen_sctp:close(Sb), @@ -338,11 +338,11 @@ def_sndrcvinfo(Config) when is_list(Config) -> ?line Data = <<"What goes up, must come down.">>, %% ?line S1 = - ok(gen_sctp:open( + log_ok(gen_sctp:open( 0, [{sctp_default_send_param,#sctp_sndrcvinfo{ppid=17}}])), ?LOGVAR(S1), ?line P1 = - ok(inet:port(S1)), + log_ok(inet:port(S1)), ?LOGVAR(P1), ?line #sctp_sndrcvinfo{ppid=17, context=0, timetolive=0, assoc_id=0} = getopt(S1, sctp_default_send_param), @@ -350,10 +350,10 @@ def_sndrcvinfo(Config) when is_list(Config) -> gen_sctp:listen(S1, true), %% ?line S2 = - ok(gen_sctp:open()), + log_ok(gen_sctp:open()), ?LOGVAR(S2), ?line P2 = - ok(inet:port(S2)), + log_ok(inet:port(S2)), ?LOGVAR(P2), ?line #sctp_sndrcvinfo{ppid=0, context=0, timetolive=0, assoc_id=0} = getopt(S2, sctp_default_send_param), @@ -362,9 +362,9 @@ def_sndrcvinfo(Config) when is_list(Config) -> state=comm_up, error=0, assoc_id=S2AssocId} = S2AssocChange = - ok(gen_sctp:connect(S2, Loopback, P1, [])), + log_ok(gen_sctp:connect(S2, Loopback, P1, [])), ?LOGVAR(S2AssocChange), - ?line case recv_event(ok(gen_sctp:recv(S1))) of + ?line case recv_event(log_ok(gen_sctp:recv(S1))) of {Loopback,P2, #sctp_assoc_change{ state=comm_up, @@ -382,7 +382,7 @@ def_sndrcvinfo(Config) when is_list(Config) -> state=comm_up, error=0, assoc_id=S1AssocId}} = - recv_event(ok(gen_sctp:recv(S1))) + recv_event(log_ok(gen_sctp:recv(S1))) end, ?line #sctp_sndrcvinfo{ @@ -396,7 +396,7 @@ def_sndrcvinfo(Config) when is_list(Config) -> %% ?line ok = gen_sctp:send(S1, S1AssocId, 1, <<"1: ",Data/binary>>), - ?line case ok(gen_sctp:recv(S2)) of + ?line case log_ok(gen_sctp:recv(S2)) of {Loopback,P1, [#sctp_sndrcvinfo{ stream=1, ppid=17, context=0, assoc_id=S2AssocId}], @@ -412,7 +412,7 @@ def_sndrcvinfo(Config) when is_list(Config) -> [#sctp_sndrcvinfo{ stream=1, ppid=17, context=0, assoc_id=S2AssocId}], <<"1: ",Data/binary>>} = - ok(gen_sctp:recv(S2)) + log_ok(gen_sctp:recv(S2)) end, %% ?line ok = @@ -432,7 +432,7 @@ def_sndrcvinfo(Config) when is_list(Config) -> %% ?line ok = gen_sctp:send(S1, S1AssocId, 0, <<"2: ",Data/binary>>), - ?line case ok(gen_sctp:recv(S2)) of + ?line case log_ok(gen_sctp:recv(S2)) of {Loopback,P1, [#sctp_sndrcvinfo{ stream=0, ppid=19, context=0, assoc_id=S2AssocId}], @@ -440,7 +440,7 @@ def_sndrcvinfo(Config) when is_list(Config) -> end, ?line ok = gen_sctp:send(S2, S2AssocChange, 1, <<"3: ",Data/binary>>), - ?line case ok(gen_sctp:recv(S1)) of + ?line case log_ok(gen_sctp:recv(S1)) of {Loopback,P2, [#sctp_sndrcvinfo{ stream=1, ppid=0, context=0, assoc_id=S1AssocId}], @@ -451,7 +451,7 @@ def_sndrcvinfo(Config) when is_list(Config) -> addr={Loopback,_}, state=addr_available, error=0, assoc_id=S1AssocId}} = recv_event(Event2), - ?line case ok(gen_sctp:recv(S1)) of + ?line case log_ok(gen_sctp:recv(S1)) of {Loopback,P2, [#sctp_sndrcvinfo{ stream=1, ppid=0, context=0, @@ -467,7 +467,7 @@ def_sndrcvinfo(Config) when is_list(Config) -> #sctp_sndrcvinfo{stream=0, ppid=20, assoc_id=S2AssocId}, <<"4: ",Data/binary>>) end), - ?line case ok(do_from_other_process(fun() -> gen_sctp:recv(S1) end)) of + ?line case log_ok(do_from_other_process(fun() -> gen_sctp:recv(S1) end)) of {Loopback,P2, [#sctp_sndrcvinfo{ stream=0, ppid=20, context=0, assoc_id=S1AssocId}], @@ -496,9 +496,9 @@ getopt(S, Opt, Param) -> setopt(S, Opt, Val) -> inet:setopts(S, [{Opt,Val}]). -ok({ok,X}) -> - io:format("OK[~w]: ~p~n", [self(),X]), - X. +log_ok(X) -> log(ok(X)). + +ok({ok,X}) -> X. log(X) -> io:format("LOG[~w]: ~p~n", [self(),X]), @@ -640,16 +640,16 @@ api_connect_init(Config) when is_list(Config) -> ?line case gen_sctp:connect_init(Sa, localhost, Pb, []) of {error,econnrefused} -> ?line {Localhost,Pb,#sctp_assoc_change{state=comm_lost}} = - recv_event(ok(gen_sctp:recv(Sa, infinity))); + recv_event(log_ok(gen_sctp:recv(Sa, infinity))); ok -> ?line {Localhost,Pb,#sctp_assoc_change{state=cant_assoc}} = - recv_event(ok(gen_sctp:recv(Sa, infinity))) + recv_event(log_ok(gen_sctp:recv(Sa, infinity))) end, ?line ok = gen_sctp:listen(Sb, true), ?line case gen_sctp:connect_init(Sa, localhost, Pb, []) of ok -> ?line {Localhost,Pb,#sctp_assoc_change{state=comm_up}} = - recv_event(ok(gen_sctp:recv(Sa, infinity))) + recv_event(log_ok(gen_sctp:recv(Sa, infinity))) end, ?line ok = gen_sctp:close(Sa), ?line ok = gen_sctp:close(Sb), @@ -699,7 +699,7 @@ api_opts(Config) when is_list(Config) -> end. implicit_inet6(Config) when is_list(Config) -> - ?line Hostname = ok(inet:gethostname()), + ?line Hostname = log_ok(inet:gethostname()), ?line case gen_sctp:open(0, [inet6]) of {ok,S1} -> @@ -712,16 +712,16 @@ implicit_inet6(Config) when is_list(Config) -> ?line ok = gen_sctp:close(S1), %% ?line Localhost = - ok(inet:getaddr("localhost", inet6)), + log_ok(inet:getaddr("localhost", inet6)), ?line io:format("~s ~p~n", ["localhost",Localhost]), ?line S2 = - ok(gen_sctp:open(0, [{ip,Localhost}])), + log_ok(gen_sctp:open(0, [{ip,Localhost}])), ?line implicit_inet6(S2, Localhost), ?line ok = gen_sctp:close(S2), %% ?line io:format("~s ~p~n", [Hostname,Host]), ?line S3 = - ok(gen_sctp:open(0, [{ifaddr,Host}])), + log_ok(gen_sctp:open(0, [{ifaddr,Host}])), ?line implicit_inet6(S3, Host), ?line ok = gen_sctp:close(S1); {error,eafnosupport} -> @@ -734,25 +734,25 @@ implicit_inet6(Config) when is_list(Config) -> implicit_inet6(S1, Addr) -> ?line ok = gen_sctp:listen(S1, true), - ?line P1 = ok(inet:port(S1)), - ?line S2 = ok(gen_sctp:open(0, [inet6])), - ?line P2 = ok(inet:port(S2)), + ?line P1 = log_ok(inet:port(S1)), + ?line S2 = log_ok(gen_sctp:open(0, [inet6])), + ?line P2 = log_ok(inet:port(S2)), ?line #sctp_assoc_change{state=comm_up} = - ok(gen_sctp:connect(S2, Addr, P1, [])), - ?line case recv_event(ok(gen_sctp:recv(S1))) of + log_ok(gen_sctp:connect(S2, Addr, P1, [])), + ?line case recv_event(log_ok(gen_sctp:recv(S1))) of {Addr,P2,#sctp_assoc_change{state=comm_up}} -> ok; {Addr,P2,#sctp_paddr_change{state=addr_confirmed, addr={Addr,P2}, error=0}} -> {Addr,P2,#sctp_assoc_change{state=comm_up}} = - recv_event(ok(gen_sctp:recv(S1))) + recv_event(log_ok(gen_sctp:recv(S1))) end, - ?line case ok(inet:sockname(S1)) of + ?line case log_ok(inet:sockname(S1)) of {Addr,P1} -> ok; {{0,0,0,0,0,0,0,0},P1} -> ok end, - ?line case ok(inet:sockname(S2)) of + ?line case log_ok(inet:sockname(S2)) of {Addr,P2} -> ok; {{0,0,0,0,0,0,0,0},P2} -> ok end, @@ -792,7 +792,7 @@ xfer_stream_min(Config) when is_list(Config) -> assoc_id=SaAssocId}} = gen_sctp:connect(Sa, Loopback, Pb, []), ?line {SbOutboundStreams,SbInboundStreams,SbAssocId} = - case recv_event(ok(gen_sctp:recv(Sb, infinity))) of + case recv_event(log_ok(gen_sctp:recv(Sb, infinity))) of {Loopback,Pa, #sctp_assoc_change{state=comm_up, error=0, @@ -811,7 +811,7 @@ xfer_stream_min(Config) when is_list(Config) -> outbound_streams=OS, inbound_streams=IS, assoc_id=AI}} = - recv_event(ok(gen_sctp:recv(Sb, infinity))), + recv_event(log_ok(gen_sctp:recv(Sb, infinity))), {OS,IS,AI} end, ?line SaOutboundStreams = SbInboundStreams, @@ -838,7 +838,7 @@ xfer_stream_min(Config) when is_list(Config) -> ?line ok = do_from_other_process( fun () -> gen_sctp:send(Sb, SbAssocId, 0, Data) end), - ?line case ok(gen_sctp:recv(Sa, infinity)) of + ?line case log_ok(gen_sctp:recv(Sa, infinity)) of {Loopback,Pb, [#sctp_sndrcvinfo{stream=Stream, assoc_id=SaAssocId}], @@ -854,17 +854,17 @@ xfer_stream_min(Config) when is_list(Config) -> [#sctp_sndrcvinfo{stream=Stream, assoc_id=SaAssocId}], Data} = - ok(gen_sctp:recv(Sa, infinity)) + log_ok(gen_sctp:recv(Sa, infinity)) end, ?line ok = gen_sctp:close(Sa), ?line {Loopback,Pa, #sctp_shutdown_event{assoc_id=SbAssocId}} = - recv_event(ok(gen_sctp:recv(Sb, infinity))), + recv_event(log_ok(gen_sctp:recv(Sb, infinity))), ?line {Loopback,Pa, #sctp_assoc_change{state=shutdown_comp, error=0, assoc_id=SbAssocId}} = - recv_event(ok(gen_sctp:recv(Sb, infinity))), + recv_event(log_ok(gen_sctp:recv(Sb, infinity))), ?line ok = gen_sctp:close(Sb), ?line receive @@ -911,42 +911,97 @@ peeloff(Config) when is_list(Config) -> ?line Addr = {127,0,0,1}, ?line Stream = 0, ?line Timeout = 333, - ?line S1 = socket_start(Addr, Timeout), - ?line P1 = socket_call(S1, port), - ?line Socket1 = socket_call(S1, socket), - ?line ok = socket_call(S1, {listen,true}), - ?line S2 = socket_start(Addr, Timeout), - ?line P2 = socket_call(S2, port), - ?line Socket2 = socket_call(S2, socket), + ?line S1 = socket_open([{ifaddr,Addr}], Timeout), + ?line ?LOGVAR(S1), + ?line P1 = socket_call(S1, get_port), + ?line ?LOGVAR(P1), + ?line Socket1 = socket_call(S1, get_socket), + ?line ?LOGVAR(Socket1), + ?line socket_call(S1, {listen,true}), + ?line S2 = socket_open([{ifaddr,Addr}], Timeout), + ?line ?LOGVAR(S2), + ?line P2 = socket_call(S2, get_port), + ?line ?LOGVAR(P2), + ?line Socket2 = socket_call(S2, get_socket), + ?line ?LOGVAR(Socket2), %% - ?line H_a = socket_req(S1, recv_assoc), - ?line {S2Ai,Sa,Sb} = socket_call(S2, {connect,Addr,P1,[]}), - ?line {S1Ai,Sb,Sa,Addr,P2} = socket_resp(H_a), + ?line socket_call(S2, {connect_init,Addr,P1,[]}), + ?line S2Ai = + receive + {S2,{Addr,P1, + #sctp_assoc_change{ + state=comm_up, + assoc_id=AssocId2}}} -> AssocId2 + after Timeout -> + socket_bailout([S1,S2]) + end, + ?line ?LOGVAR(S2Ai), + ?line S1Ai = + receive + {S1,{Addr,P2, + #sctp_assoc_change{ + state=comm_up, + assoc_id=AssocId1}}} -> AssocId1 + after Timeout -> + socket_bailout([S1,S2]) + end, + ?line ?LOGVAR(S1Ai), %% - ?line H_b = socket_req(S1, recv), - ?line ok = socket_call(S2, {send,S2Ai,Stream,<<"Data H_b">>}), - ?line {Addr,P2,S1Ai,Stream,<<"Data H_b">>} = socket_resp(H_b), - ?line H_c = socket_req(S1, {recv,Socket2}), - ?line ok = - socket_call(S2, {send,Socket1,S1Ai,Stream,<<"Data H_c">>}), - ?line {Addr,P1,S2Ai,Stream,<<"Data H_c">>} = socket_resp(H_c), + ?line socket_call(S2, {send,S2Ai,Stream,<<"Number one">>}), + ?line + receive + {S1,{Addr,P2,S1Ai,Stream,<<"Number one">>}} -> ok + after Timeout -> + socket_bailout([S1,S2]) + end, + ?line socket_call(S2, {send,Socket1,S1Ai,Stream,<<"Number two">>}), + ?line + receive + {S2,{Addr,P1,S2Ai,Stream,<<"Number two">>}} -> ok + after Timeout -> + socket_bailout([S1,S2]) + end, %% ?line S3 = socket_peeloff(Socket1, S1Ai, Timeout), - ?line P3 = socket_call(S3, port), - ?line Socket3 = socket_call(S3, socket), + ?line ?LOGVAR(S3), + ?line P3 = socket_call(S3, get_port), + ?line ?LOGVAR(P3), ?line S3Ai = S1Ai, + ?line ?LOGVAR(S3Ai), %% - ?line H_d = socket_req(S2, recv), - ?line ok = socket_call(S3, {send,S3Ai,Stream,<<"Data H_d">>}), - ?line {Addr,P3,S2Ai,Stream,<<"Data H_d">>} = socket_resp(H_d), - ?line ok = socket_call(S3, {send,Socket2,S2Ai,Stream,<<"Data S2">>}), - ?line {Addr,P2,S3Ai,Stream,<<"Data S2">>} = socket_call(S2, {recv,Socket3}), + ?line socket_call(S3, {send,S3Ai,Stream,<<"Number three">>}), + ?line + receive + {S2,{Addr,P3,S2Ai,Stream,<<"Number three">>}} -> ok + after Timeout -> + socket_bailout([S1,S2,S3]) + end, + ?line socket_call(S3, {send,Socket2,S2Ai,Stream,<<"Number four">>}), + ?line + receive + {S3,{Addr,P2,S3Ai,Stream,<<"Number four">>}} -> ok + after Timeout -> + socket_bailout([S1,S2,S3]) + end, %% ?line inet:i(sctp), - ?line ok = socket_stop(S1), - ?line ok = socket_stop(S2), - ?line {Addr,P2,#sctp_shutdown_event{assoc_id=S1Ai}} = - recv_event(ok(socket_stop(S3))), + ?line socket_close_verbose(S1), + ?line socket_close_verbose(S2), + ?line + receive + {S3,{Addr,P2,#sctp_shutdown_event{assoc_id=S3Ai}}} -> ok + after Timeout -> + socket_bailout([S3]) + end, + ?line + receive + {S3,{Addr,P2,#sctp_assoc_change{state=shutdown_comp, + assoc_id=S3Ai}}} -> ok + after Timeout -> + socket_bailout([S3]) + end, + ?line socket_close_verbose(S3), + ?line [] = flush(), ok. @@ -960,28 +1015,65 @@ buffers(Config) when is_list(Config) -> ?line Data = mk_data(Limit), ?line Addr = {127,0,0,1}, ?line Stream = 1, - ?line Timeout = 333, - ?line S1 = socket_start(Addr, Timeout), - ?line P1 = socket_call(S1, port), + ?line Timeout = 1333, + ?line S1 = socket_open([{ip,Addr}], Timeout), + ?line ?LOGVAR(S1), + ?line P1 = socket_call(S1, get_port), + ?line ?LOGVAR(P1), ?line ok = socket_call(S1, {listen,true}), - ?line S2 = socket_start(Addr, Timeout), - ?line P2 = socket_call(S2, port), + ?line S2 = socket_open([{ip,Addr}], Timeout), + ?line ?LOGVAR(S2), + ?line P2 = socket_call(S2, get_port), + ?line ?LOGVAR(P2), %% - ?line H_a = socket_req(S1, recv_assoc), - ?line {S2Ai,Sa,Sb} = socket_call(S2, {connect,Addr,P1,[]}), - ?line {S1Ai,Sb,Sa,Addr,P2} = socket_resp(H_a), + ?line socket_call(S2, {connect_init,Addr,P1,[]}), + ?line S2Ai = + receive + {S2,{Addr,P1, + #sctp_assoc_change{ + state=comm_up, + assoc_id=AssocId2}}} -> AssocId2 + after Timeout -> + socket_bailout([S1,S2]) + end, + ?line S1Ai = + receive + {S1,{Addr,P2, + #sctp_assoc_change{ + state=comm_up, + assoc_id=AssocId1}}} -> AssocId1 + after Timeout -> + socket_bailout([S1,S2]) + end, %% - ?line ok = socket_call(S1, {setopts,[{recbuf,Limit}]}), + ?line socket_call(S1, {setopts,[{recbuf,Limit}]}), ?line case socket_call(S1, {getopts,[recbuf]}) of - {ok,[{recbuf,RB1}]} when RB1 >= Limit -> ok + [{recbuf,RB1}] when RB1 >= Limit -> ok end, - ?line H_b = socket_req(S1, recv), - ?line ok = socket_call(S2, {send,S2Ai,Stream,Data}), - ?line {Addr,P2,S1Ai,Stream,Data} = socket_resp(H_b), + ?line socket_call(S2, {send,S2Ai,Stream,Data}), + ?line + receive + {S1,{Addr,P2,S1Ai,Stream,Data}} -> ok + after Timeout -> + socket_bailout([S1,S2]) + end, %% - ?line ok = socket_stop(S1), - ?line {Addr,P1,#sctp_shutdown_event{assoc_id=S2Ai}} = - recv_event(ok(socket_stop(S2))), + ?line socket_close_verbose(S1), + ?line + receive + {S2,{Addr,P1,#sctp_shutdown_event{assoc_id=S2Ai}}} -> ok + after Timeout -> + socket_bailout([S2]) + end, + ?line + receive + {S2,{Addr,P1,#sctp_assoc_change{state=shutdown_comp, + assoc_id=S2Ai}}} -> ok + after Timeout -> + socket_bailout([S2]) + end, + ?line socket_close_verbose(S2), + ?line [] = flush(), ok. mk_data(Words) -> @@ -995,38 +1087,101 @@ mk_data(N, Words, Bin) -> %%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% socket gen_server ultra light +socket_open(SocketOpts, Timeout) -> + Opts = [{type,seqpacket},{active,once},binary|SocketOpts], + Starter = + fun () -> + {ok,Socket} = + gen_sctp:open(Opts), + Socket + end, + s_start(Starter, Timeout). + socket_peeloff(Socket, AssocId, Timeout) -> + Opts = [{active,once},binary], Starter = fun () -> {ok,NewSocket} = gen_sctp:peeloff(Socket, AssocId), + ok = inet:setopts(NewSocket, Opts), NewSocket end, - socket_starter(Starter, Timeout). + s_start(Starter, Timeout). + +socket_close_verbose(S) -> + History = socket_history(socket_close(S)), + io:format("socket_close ~p:~n ~p.~n", [S,History]), + History. + +socket_close(S) -> + s_req(S, close). + +socket_call(S, Request) -> + s_req(S, {Request}). + +%% socket_get(S, Key) -> +%% s_req(S, {get,Key}). + +socket_bailout([S|Ss]) -> + History = socket_history(socket_close(S)), + io:format("bailout ~p:~n ~p.~n", [S,History]), + socket_bailout(Ss); +socket_bailout([]) -> + io:format("flush: ~p.~n", [flush()]), + test_server:fail(socket_bailout). + +socket_history({State,Flush}) -> + {lists:keysort( + 2, + lists:flatten( + [[{Key,Val} || Val <- Vals] + || {Key,Vals} <- gb_trees:to_list(State)])), + Flush}. + +s_handler(Socket) -> + fun ({listen,Listen}) -> + ok = gen_sctp:listen(Socket, Listen); + (get_port) -> + ok(inet:port(Socket)); + (get_socket) -> + Socket; + ({connect_init,ConAddr,ConPort,ConOpts}) -> + ok = gen_sctp:connect_init(Socket, ConAddr, ConPort, ConOpts); + ({send,AssocId,Stream,Data}) -> + ok = gen_sctp:send(Socket, AssocId, Stream, Data); + ({send,OtherSocket,AssocId,Stream,Data}) -> + ok = gen_sctp:send(OtherSocket, AssocId, Stream, Data); + ({setopts,Opts}) -> + ok = inet:setopts(Socket, Opts); + ({getopts,Optnames}) -> + ok(inet:getopts(Socket, Optnames)) + end. -socket_start(Addr, Timeout) -> - Starter = - fun () -> - {ok,Socket} = - gen_sctp:open([{type,seqpacket},{ifaddr,Addr}]), - Socket - end, - socket_starter(Starter, Timeout). +s_req(S, Req) -> + Mref = erlang:monitor(process, S), + S ! {self(),Mref,Req}, + receive + {'DOWN',Mref,_,_,Error} -> + exit(Error); + {S,Mref,Reply} -> + erlang:demonitor(Mref), + receive {'DOWN',Mref,_,_,_} -> ok after 0 -> ok end, + Reply + end. -socket_starter(Starter, Timeout) -> +s_start(Starter, Timeout) -> Parent = self(), Owner = spawn_link( fun () -> - socket_starter(Starter(), Timeout, Parent) + s_start(Starter(), Timeout, Parent) end), - io:format("Started socket ~w.~n", [Owner]), Owner. -socket_starter(Socket, Timeout, Parent) -> +s_start(Socket, Timeout, Parent) -> + Handler = s_handler(Socket), try - Handler = socket_handler(Socket, Timeout), - socket_loop(Socket, Timeout, Parent, Handler) + s_loop(Socket, Timeout, Parent, Handler, gb_trees:empty()) catch Class:Reason -> Stacktrace = erlang:get_stacktrace(), @@ -1035,150 +1190,112 @@ socket_starter(Socket, Timeout, Parent) -> erlang:raise(Class, Reason, Stacktrace) end. -socket_loop(Socket, Timeout, Parent, Handler) -> +s_loop(Socket, Timeout, Parent, Handler, State) -> receive - {Parent,Ref} -> % socket_stop() - Result = - case log(gen_sctp:recv(Socket, Timeout)) of - {error,timeout} -> ok; - R -> R - end, + {Parent,Ref,close} -> % socket_close() + erlang:send_after(Timeout, self(), {Parent,Ref,exit}), + s_loop(Socket, Timeout, Parent, Handler, State); + {Parent,Ref,exit} -> ok = gen_sctp:close(Socket), - Parent ! {self(),Ref, Result}; - {Parent,Ref,Msg} -> - Parent ! {self(),Ref,Handler(Msg)}, - socket_loop(Socket, Timeout, Parent, Handler) + Key = exit, + Val = {now(),Socket}, + NewState = gb_push(Key, Val, State), + Parent ! {self(),Ref,{NewState,flush()}}; + {Parent,Ref,{Msg}} -> + Result = Handler(Msg), + Key = req, + Val = {now(),{Msg,Result}}, + NewState = gb_push(Key, Val, State), + Parent ! {self(),Ref,Result}, + s_loop(Socket, Timeout, Parent, Handler, NewState); + %% {Parent,Ref,{get,Key}} -> + %% Parent ! {self(),Ref,gb_get(Key, State)}, + %% s_loop(Socket, Timeout, Parent, Handler, State); + {sctp,Socket,Addr,Port, + {[#sctp_sndrcvinfo{stream=Stream,assoc_id=AssocId}=SRI],Data}} + when not is_tuple(Data) -> + case gb_get({assoc_change,AssocId}, State) of + [{_,{Addr,Port, + #sctp_assoc_change{ + state=comm_up, + inbound_streams=Is}}}|_] + when 0 =< Stream, Stream < Is-> ok; + [] -> ok + end, + Key = {msg,AssocId,Stream}, + Val = {now(),{Addr,Port,SRI,Data}}, + NewState = gb_push(Key, Val, State), + Parent ! {self(),{Addr,Port,AssocId,Stream,Data}}, + again(Socket), + s_loop(Socket, Timeout, Parent, Handler, NewState); + {sctp,Socket,Addr,Port, + {SRI,#sctp_assoc_change{assoc_id=AssocId,state=St}=SAC}} -> + case SRI of + [#sctp_sndrcvinfo{assoc_id=AssocId,stream=0}] -> ok; + [] -> ok + end, + Key = {assoc_change,AssocId}, + Val = {now(),{Addr,Port,SAC}}, + case {gb_get(Key, State),St} of + {[],_} -> ok; + {[{_,{Addr,Port,#sctp_assoc_change{state=comm_up}}}|_],_} + when St =:= comm_lost; St =:= shutdown_comp -> ok + end, + NewState = gb_push(Key, Val, State), + Parent ! {self(),{Addr,Port,SAC}}, + again(Socket), + s_loop(Socket, Timeout, Parent, Handler, NewState); + {sctp,Socket,Addr,Port, + {SRI,#sctp_paddr_change{assoc_id=AssocId, + addr={_,Port}, + state=St}=SPC}} -> + case SRI of + [#sctp_sndrcvinfo{assoc_id=AssocId,stream=0}] -> ok; + [] -> ok + end, + case {gb_get({assoc_change,AssocId}, State),St} of + {[],_} when St =:= addr_confirmed -> ok + end, + Key = {paddr_change,AssocId}, + Val = {now(),{Addr,Port,SPC}}, + NewState = gb_push(Key, Val, State), + again(Socket), + s_loop(Socket, Timeout, Parent, Handler, NewState); + {sctp,Socket,Addr,Port, + {SRI,#sctp_shutdown_event{assoc_id=AssocId}=SSE}} -> + case SRI of + [#sctp_sndrcvinfo{assoc_id=AssocId,stream=0}] -> ok; + [] -> ok + end, + case gb_get({assoc_change,AssocId}, State) of + [{_,{Addr,Port,#sctp_assoc_change{state=comm_up}}}|_] -> ok; + [] -> ok + end, + Key = {shutdown_event,AssocId}, + Val = {now(),{Addr,Port}}, + NewState = gb_push(Key, Val, State), + Parent ! {self(), {Addr,Port,SSE}}, + again(Socket), + s_loop(Socket, Timeout, Parent, Handler, NewState); + Unexpected -> + erlang:error({unexpected,Unexpected}) end. -socket_handler(Socket, Timeout) -> - fun ({listen,Listen}) -> - gen_sctp:listen(Socket, Listen); - (port) -> - ok(inet:port(Socket)); - (socket) -> - Socket; - (recv_assoc) -> - case recv_event(ok(gen_sctp:recv(Socket, Timeout))) of - {AssocAddr,AssocPort, - #sctp_paddr_change{state=addr_confirmed, - addr={_,AssocPort}, - error=0, - assoc_id=AssocId}} -> - {AssocAddr,AssocPort, - #sctp_assoc_change{state=comm_up, - error=0, - outbound_streams=Os, - inbound_streams=Is, - assoc_id=AssocId}} = - recv_event(ok(gen_sctp:recv(Socket, infinity))), - {AssocId,Os,Is,AssocAddr,AssocPort}; - {AssocAddr,AssocPort, - #sctp_assoc_change{state=comm_up, - error=0, - outbound_streams=Os, - inbound_streams=Is, - assoc_id=AssocId}} -> - {AssocId,Os,Is,AssocAddr,AssocPort} - end; - %% {AssocAddr,AssocPort,[], - %% #sctp_assoc_change{state=comm_up, - %% error=0, - %% outbound_streams=Os, - %% inbound_streams=Is, - %% assoc_id=AssocId}} = - %% ok(gen_sctp:recv(Socket, infinity)), - %% case log(gen_sctp:recv(Socket, Timeout)) of - %% {ok,AssocAddr,AssocPort,[], - %% #sctp_paddr_change{addr = {AssocAddr,AssocPort}, - %% state = addr_available, - %% error = 0, - %% assoc_id = AssocId}} -> ok; - %% {error,timeout} -> ok - %% end, - %% {AssocId,Os,Is,AssocAddr,AssocPort}; - ({connect,ConAddr,ConPort,ConOpts}) -> - ok = gen_sctp:connect_init(Socket, ConAddr, ConPort, ConOpts), - case recv_event(ok(gen_sctp:recv(Socket, Timeout))) of - {ConAddr,ConPort, - #sctp_paddr_change{state=addr_confirmed, - addr={_,ConPort}, - error=0, - assoc_id=AssocId}} -> - {ConAddr,ConPort, - #sctp_assoc_change{state=comm_up, - error=0, - outbound_streams=Os, - inbound_streams=Is, - assoc_id=AssocId}} = - recv_event(ok(gen_sctp:recv(Socket, infinity))), - {AssocId,Os,Is}; - {ConAddr,ConPort, - #sctp_assoc_change{state=comm_up, - error=0, - outbound_streams=Os, - inbound_streams=Is, - assoc_id=AssocId}} -> - {AssocId,Os,Is} - end; - %% #sctp_assoc_change{state=comm_up, - %% error=0, - %% outbound_streams=Os, - %% inbound_streams=Is, - %% assoc_id=AssocId} = - %% ok(gen_sctp:connect(Socket, ConAddr, ConPort, ConOpts)), - %% case log(gen_sctp:recv(Socket, Timeout)) of - %% {ok,ConAddr,ConPort,[], - %% #sctp_paddr_change{addr = {ConAddr,ConPort}, - %% state = addr_available, - %% error = 0, - %% assoc_id = AssocId}} -> ok; - %% {error,timeout} -> ok - %% end, - %% {AssocId,Os,Is}; - ({send,AssocId,Stream,Data}) -> - gen_sctp:send(Socket, AssocId, Stream, Data); - ({send,S,AssocId,Stream,Data}) -> - gen_sctp:send(S, AssocId, Stream, Data); - (recv) -> - {Addr,Port, - [#sctp_sndrcvinfo{stream=Stream,assoc_id=AssocId}],Data} = - ok(gen_sctp:recv(Socket, infinity)), - {Addr,Port,AssocId,Stream,Data}; - ({recv,S}) -> - {Addr,Port, - [#sctp_sndrcvinfo{stream=Stream,assoc_id=AssocId}],Data} = - ok(gen_sctp:recv(S, infinity)), - {Addr,Port,AssocId,Stream,Data}; - ({setopts,Opts}) -> - inet:setopts(Socket, Opts); - ({getopts,Optnames}) -> - inet:getopts(Socket, Optnames) - end. +again(Socket) -> + inet:setopts(Socket, [{active,once}]). -socket_stop(Handler) -> - Mref = erlang:monitor(process, Handler), - Handler ! {self(),Mref}, - receive - {Handler,Mref,Result} -> - receive {'DOWN',Mref,_,_,_} -> Result end; - {'DOWN',Mref,_,_,Error} -> - exit(Error) +gb_push(Key, Val, GBT) -> + case gb_trees:lookup(Key, GBT) of + none -> + gb_trees:insert(Key, [Val], GBT); + {value,V} -> + gb_trees:update(Key, [Val|V], GBT) end. -socket_call(Handler, Request) -> - socket_resp(socket_req(Handler, Request)). - -socket_req(Handler, Request) -> - Mref = erlang:monitor(process, Handler), - Handler ! {self(),Mref,Request}, - {Handler,Mref}. - -socket_resp({Handler,Mref}) -> - receive - {'DOWN',Mref,_,_,Error} -> - exit(Error); - {Handler,Mref,Reply} -> - erlang:demonitor(Mref), - receive {'DOWN',Mref,_,_,_} -> ok after 0 -> ok end, - Reply +gb_get(Key, GBT) -> + case gb_trees:lookup(Key, GBT) of + none -> + []; + {value,V} -> + V end. |