From fd3c106841e481aea0dc5ebf9b9fee1d34da5638 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Thu, 22 Nov 2018 12:17:37 +0100 Subject: Improve dist send throughput --- lib/ssl/src/tls_sender.erl | 86 ++++++++++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 29 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/tls_sender.erl b/lib/ssl/src/tls_sender.erl index a245ee2465..107a0dcfd4 100644 --- a/lib/ssl/src/tls_sender.erl +++ b/lib/ssl/src/tls_sender.erl @@ -200,8 +200,9 @@ connection({call, From}, renegotiate, #data{connection_states = #{current_write := Write}} = StateData) -> {next_state, handshake, StateData, [{reply, From, {ok, Write}}]}; connection({call, From}, {application_data, AppData}, - #data{socket_options = SockOpts} = StateData) -> - case encode_packet(AppData, SockOpts) of + #data{socket_options = #socket_options{packet = Packet}} = + StateData) -> + case encode_packet(Packet, AppData) of {error, _} = Error -> {next_state, ?FUNCTION_NAME, StateData, [{reply, From, Error}]}; Data -> @@ -217,13 +218,26 @@ connection({call, From}, dist_get_tls_socket, tracker = Tracker} = StateData) -> TLSSocket = Connection:socket([Pid, self()], Transport, Socket, Connection, Tracker), {next_state, ?FUNCTION_NAME, StateData, [{reply, From, {ok, TLSSocket}}]}; -connection({call, From}, {dist_handshake_complete, _Node, DHandle}, #data{connection_pid = Pid} = StateData) -> +connection({call, From}, {dist_handshake_complete, _Node, DHandle}, + #data{connection_pid = Pid, + socket_options = #socket_options{packet = Packet}} = + StateData) -> ok = erlang:dist_ctrl_input_handler(DHandle, Pid), ok = ssl_connection:dist_handshake_complete(Pid, DHandle), %% From now on we execute on normal priority process_flag(priority, normal), - Events = dist_data_events(DHandle, []), - {next_state, ?FUNCTION_NAME, StateData#data{dist_handle = DHandle}, [{reply, From, ok} | Events]}; + {next_state, ?FUNCTION_NAME, StateData#data{dist_handle = DHandle}, + [{reply, From, ok} + | case dist_data(DHandle, Packet) of + [] -> + []; + Data -> + [{next_event, internal, + {application_packets,{self(),undefined},Data}}] + end]}; +connection(internal, {application_packets, From, Data}, StateData) -> + send_application_data(Data, From, ?FUNCTION_NAME, StateData); +%% connection(cast, {ack_alert, #alert{} = Alert}, #data{connection_pid = Pid} =StateData0) -> StateData = send_tls_alert(Alert, StateData0), Pid ! {self(), ack_alert}, @@ -237,9 +251,19 @@ connection(cast, {new_write, WritesState, Version}, StateData#data{connection_states = ConnectionStates0#{current_write => WritesState}, negotiated_version = Version}}; -connection(info, dist_data, #data{dist_handle = DHandle} = StateData) -> - Events = dist_data_events(DHandle, []), - {next_state, ?FUNCTION_NAME, StateData, Events}; +%% +connection(info, dist_data, + #data{dist_handle = DHandle, + socket_options = #socket_options{packet = Packet}} = + StateData) -> + {next_state, ?FUNCTION_NAME, StateData, + case dist_data(DHandle, Packet) of + [] -> + []; + Data -> + [{next_event, internal, + {application_packets,{self(),undefined},Data}}] + end}; connection(info, tick, StateData) -> consume_ticks(), {next_state, ?FUNCTION_NAME, StateData, @@ -272,6 +296,8 @@ handshake(cast, {new_write, WritesState, Version}, StateData#data{connection_states = ConnectionStates0#{current_write => WritesState}, negotiated_version = Version}}; +handshake(internal, {application_packets,_,_}, _) -> + {keep_state_and_data, [postpone]}; handshake(info, Msg, StateData) -> handle_info(Msg, ?FUNCTION_NAME, StateData). @@ -342,12 +368,13 @@ send_application_data(Data, From, StateName, renegotiate_at = RenegotiateAt} = StateData0) -> case time_to_renegotiate(Data, ConnectionStates0, RenegotiateAt) of true -> - ssl_connection:internal_renegotiation(Pid, ConnectionStates0), + ssl_connection:internal_renegotiation(Pid, ConnectionStates0), {next_state, handshake, StateData0, - [{next_event, {call, From}, {application_data, Data}}]}; + [{next_event, internal, {application_packets, From, Data}}]}; false -> {Msgs, ConnectionStates} = - Connection:encode_data(Data, Version, ConnectionStates0), + Connection:encode_data( + iolist_to_binary(Data), Version, ConnectionStates0), StateData = StateData0#data{connection_states = ConnectionStates}, case Connection:send(Transport, Socket, Msgs) of ok when DistHandle =/= undefined -> @@ -361,21 +388,18 @@ send_application_data(Data, From, StateName, end end. -encode_packet(Data, #socket_options{packet=Packet}) -> +-compile({inline, encode_packet/2}). +encode_packet(Packet, Data) -> + Len = iolist_size(Data), case Packet of - 1 -> encode_size_packet(Data, 8, (1 bsl 8) - 1); - 2 -> encode_size_packet(Data, 16, (1 bsl 16) - 1); - 4 -> encode_size_packet(Data, 32, (1 bsl 32) - 1); - _ -> Data - end. - -encode_size_packet(Bin, Size, Max) -> - Len = erlang:byte_size(Bin), - case Len > Max of - true -> - {error, {badarg, {packet_to_large, Len, Max}}}; - false -> - <> + 1 when Len < (1 bsl 8) -> [<>,Data]; + 2 when Len < (1 bsl 16) -> [<>,Data]; + 4 when Len < (1 bsl 32) -> [<>,Data]; + N when N =:= 1; N =:= 2; N =:= 4 -> + {error, + {badarg, {packet_to_large, Len, (1 bsl (Packet bsl 3)) - 1}}}; + _ -> + Data end. set_opts(SocketOptions, [{packet, N}]) -> @@ -409,14 +433,18 @@ call(FsmPid, Event) -> %%---------------Erlang distribution -------------------------------------- -dist_data_events(DHandle, Events) -> +dist_data(DHandle, Packet) -> case erlang:dist_ctrl_get_data(DHandle) of none -> erlang:dist_ctrl_get_data_notification(DHandle), - lists:reverse(Events); + []; Data -> - Event = {next_event, {call, {self(), undefined}}, {application_data, Data}}, - dist_data_events(DHandle, [Event | Events]) + %% This is encode_packet(4, Data) without Len check + %% since the emulator will always deliver a Data + %% smaller than 4 GB, and the distribution will + %% therefore always have to use {packet,4} + Len = iolist_size(Data), + [<>,Data|dist_data(DHandle, Packet)] end. consume_ticks() -> -- cgit v1.2.3 From 573220e4231bd6a7e4741dbd83848c2b03de048b Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Thu, 22 Nov 2018 19:36:28 +0100 Subject: Optimize split_bin --- lib/ssl/src/tls_record.erl | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/tls_record.erl b/lib/ssl/src/tls_record.erl index ce7edc9dcd..cf0690f2a5 100644 --- a/lib/ssl/src/tls_record.erl +++ b/lib/ssl/src/tls_record.erl @@ -113,7 +113,7 @@ encode_handshake(Frag, Version, ConnectionStates) -> case iolist_size(Frag) of N when N > ?MAX_PLAIN_TEXT_LENGTH -> - Data = split_bin(iolist_to_binary(Frag), ?MAX_PLAIN_TEXT_LENGTH, Version, BCA, BeastMitigation), + Data = split_bin(iolist_to_binary(Frag), Version, BCA, BeastMitigation), encode_iolist(?HANDSHAKE, Data, Version, ConnectionStates); _ -> encode_plain_text(?HANDSHAKE, Version, Frag, ConnectionStates) @@ -150,7 +150,7 @@ encode_data(Frag, Version, security_parameters := #security_parameters{bulk_cipher_algorithm = BCA}}} = ConnectionStates) -> - Data = split_bin(Frag, ?MAX_PLAIN_TEXT_LENGTH, Version, BCA, BeastMitigation), + Data = split_bin(Frag, Version, BCA, BeastMitigation), encode_iolist(?APPLICATION_DATA, Data, Version, ConnectionStates). %%==================================================================== @@ -485,27 +485,26 @@ start_additional_data(Type, {MajVer, MinVer}, %% 1/n-1 splitting countermeasure Rizzo/Duong-Beast, RC4 chiphers are %% not vulnerable to this attack. -split_bin(<>, ChunkSize, Version, BCA, one_n_minus_one) when +split_bin(<>, Version, BCA, one_n_minus_one) when BCA =/= ?RC4 andalso ({3, 1} == Version orelse {3, 0} == Version) -> - do_split_bin(Rest, ChunkSize, [[FirstByte]]); + [[FirstByte]|do_split_bin(Rest)]; %% 0/n splitting countermeasure for clients that are incompatible with 1/n-1 %% splitting. -split_bin(Bin, ChunkSize, Version, BCA, zero_n) when +split_bin(Bin, Version, BCA, zero_n) when BCA =/= ?RC4 andalso ({3, 1} == Version orelse {3, 0} == Version) -> - do_split_bin(Bin, ChunkSize, [[<<>>]]); -split_bin(Bin, ChunkSize, _, _, _) -> - do_split_bin(Bin, ChunkSize, []). + [<<>>|do_split_bin(Bin)]; +split_bin(Bin, _, _, _) -> + do_split_bin(Bin). -do_split_bin(<<>>, _, Acc) -> - lists:reverse(Acc); -do_split_bin(Bin, ChunkSize, Acc) -> +do_split_bin(<<>>) -> []; +do_split_bin(Bin) -> case Bin of - <> -> - do_split_bin(Rest, ChunkSize, [Chunk | Acc]); + <> -> + [Chunk|do_split_bin(Rest)]; _ -> - lists:reverse(Acc, [Bin]) + [Bin] end. %%-------------------------------------------------------------------- lowest_list_protocol_version(Ver, []) -> -- cgit v1.2.3 From 54f871fe6e093d0eaf0cebb9e9b228177b30d15a Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 16 Nov 2018 11:40:03 +0100 Subject: Fix compiler warnings --- lib/ssl/test/ssl_test_lib.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'lib') diff --git a/lib/ssl/test/ssl_test_lib.erl b/lib/ssl/test/ssl_test_lib.erl index 8a2f0824fb..a8d62d6c4e 100644 --- a/lib/ssl/test/ssl_test_lib.erl +++ b/lib/ssl/test/ssl_test_lib.erl @@ -26,6 +26,7 @@ %% Note: This directive should only be used in test suites. -compile(export_all). +-compile(nowarn_export_all). -record(sslsocket, { fd = nil, pid = nil}). -define(SLEEP, 1000). @@ -1706,10 +1707,10 @@ openssl_dsa_support() -> true; "LibreSSL" ++ _ -> false; - "OpenSSL 1.1" ++ Rest -> + "OpenSSL 1.1" ++ _Rest -> false; "OpenSSL 1.0.1" ++ Rest -> - hd(Rest) >= s; + hd(Rest) >= $s; _ -> true end. @@ -1746,8 +1747,6 @@ openssl_sane_client_cert() -> false; "LibreSSL 2.0" ++ _ -> false; - "LibreSSL 2.0" ++ _ -> - false; "OpenSSL 1.0.1s-freebsd" -> false; "OpenSSL 1.0.0" ++ _ -> -- cgit v1.2.3 From a1bdaba31c8f925f70fd87d625c1760f271f46ad Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 16 Nov 2018 16:58:44 +0100 Subject: Implement print on other node --- lib/tools/src/eprof.erl | 52 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 16 deletions(-) (limited to 'lib') diff --git a/lib/tools/src/eprof.erl b/lib/tools/src/eprof.erl index 535ddbcd04..86e3d3a8b8 100644 --- a/lib/tools/src/eprof.erl +++ b/lib/tools/src/eprof.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1996-2017. All Rights Reserved. +%% Copyright Ericsson AB 1996-2018. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -26,11 +26,11 @@ -export([start/0, stop/0, - dump/0, + dump/0, dump_data/0, start_profiling/1, start_profiling/2, start_profiling/3, profile/1, profile/2, profile/3, profile/4, profile/5, stop_profiling/0, - analyze/0, analyze/1, analyze/2, + analyze/0, analyze/1, analyze/2, analyze/4, log/1]). %% Internal exports @@ -117,6 +117,9 @@ profile(Rootset, M, F, A, Pattern, Options) -> dump() -> gen_server:call(?MODULE, dump, infinity). +dump_data() -> + gen_server:call(?MODULE, dump_data, infinity). + log(File) -> gen_server:call(?MODULE, {logfile, File}, infinity). @@ -151,22 +154,18 @@ init([]) -> %% analyze -handle_call({analyze, _, _}, _, #state{ bpd = #bpd{ p = {0,nil}, us = 0, n = 0} = Bpd } = S) when is_record(Bpd, bpd) -> +handle_call( + {analyze, _, _}, _, + #state{ bpd = #bpd{ p = {0,nil}, us = 0, n = 0 } } = S) -> {reply, nothing_to_analyze, S}; -handle_call({analyze, procs, Opts}, _, #state{ bpd = #bpd{ p = Ps, us = Tus} = Bpd, fd = Fd} = S) when is_record(Bpd, bpd) -> - lists:foreach(fun - ({Pid, Mfas}) -> - {Pn, Pus} = sum_bp_total_n_us(Mfas), - format(Fd, "~n****** Process ~w -- ~s % of profiled time *** ~n", [Pid, s("~.2f", [100.0*divide(Pus,Tus)])]), - print_bp_mfa(Mfas, {Pn,Pus}, Fd, Opts), - ok - end, gb_trees:to_list(Ps)), - {reply, ok, S}; +handle_call({analyze, procs, Opts}, _, #state{ bpd = Bpd, fd = Fd } = S) + when is_record(Bpd, bpd) -> + {reply, analyze(Fd, procs, Opts, Bpd), S}; -handle_call({analyze, total, Opts}, _, #state{ bpd = #bpd{ mfa = Mfas, n = Tn, us = Tus} = Bpd, fd = Fd} = S) when is_record(Bpd, bpd) -> - print_bp_mfa(Mfas, {Tn, Tus}, Fd, Opts), - {reply, ok, S}; +handle_call({analyze, total, Opts}, _, #state{ bpd = Bpd, fd = Fd } = S) + when is_record(Bpd, bpd) -> + {reply, analyze(Fd, total, Opts, Bpd), S}; handle_call({analyze, Type, _Opts}, _, S) -> {reply, {error, {undefined, Type}}, S}; @@ -260,6 +259,10 @@ handle_call({logfile, File}, _From, #state{ fd = OldFd } = S) -> handle_call(dump, _From, #state{ bpd = Bpd } = S) when is_record(Bpd, bpd) -> {reply, gb_trees:to_list(Bpd#bpd.p), S}; +handle_call(dump_data, _, #state{ bpd = #bpd{} = Bpd } = S) + when is_record(Bpd, bpd) -> + {reply, Bpd, S}; + handle_call(stop, _FromTag, S) -> {stop, normal, stopped, S}. @@ -438,6 +441,23 @@ collect_bpdfp(Mfa, Tree, Data) -> {PTno + Ni, PTuso + Time, Ti1} end, {0,0, Tree}, Data). + + +analyze(Fd, procs, Opts, #bpd{ p = Ps, us = Tus }) -> + lists:foreach( + fun + ({Pid, Mfas}) -> + {Pn, Pus} = sum_bp_total_n_us(Mfas), + format( + Fd, + "~n****** Process ~w -- ~s % of profiled time *** ~n", + [Pid, s("~.2f", [100.0*divide(Pus, Tus)])]), + print_bp_mfa(Mfas, {Pn,Pus}, Fd, Opts), + ok + end, gb_trees:to_list(Ps)); +analyze(Fd, total, Opts, #bpd{ mfa = Mfas, n = Tn, us = Tus } ) -> + print_bp_mfa(Mfas, {Tn, Tus}, Fd, Opts). + %% manipulators sort_mfa(Bpfs, mfa) when is_list(Bpfs) -> lists:sort(fun -- cgit v1.2.3 From 645d0bbe9eff5620e5f9daf61b0658bcdadd0665 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 16 Nov 2018 11:56:13 +0100 Subject: Improve benchmark --- lib/ssl/test/ssl_dist_bench_SUITE.erl | 224 +++++++++++++++++++++++++++++++--- 1 file changed, 205 insertions(+), 19 deletions(-) (limited to 'lib') diff --git a/lib/ssl/test/ssl_dist_bench_SUITE.erl b/lib/ssl/test/ssl_dist_bench_SUITE.erl index 3c7904cf24..7409b69639 100644 --- a/lib/ssl/test/ssl_dist_bench_SUITE.erl +++ b/lib/ssl/test/ssl_dist_bench_SUITE.erl @@ -32,6 +32,8 @@ -export( [setup/1, roundtrip/1, + throughput_0/1, + throughput_64/1, throughput_1024/1, throughput_4096/1, throughput_16384/1, @@ -55,7 +57,9 @@ groups() -> {setup, [{repeat, 1}], [setup]}, {roundtrip, [{repeat, 1}], [roundtrip]}, {throughput, [{repeat, 1}], - [throughput_1024, + [throughput_0, + throughput_64, + throughput_1024, throughput_4096, throughput_16384, throughput_65536, @@ -247,8 +251,9 @@ setup(A, B, Prefix, HA, HB) -> [] = ssl_apply(HB, erlang, nodes, []), {SetupTime, CycleTime} = ssl_apply(HA, fun () -> setup_runner(A, B, Rounds) end), - [] = ssl_apply(HA, erlang, nodes, []), - [] = ssl_apply(HB, erlang, nodes, []), + ok = ssl_apply(HB, fun () -> setup_wait_nodedown(A, 10000) end), + %% [] = ssl_apply(HA, erlang, nodes, []), + %% [] = ssl_apply(HB, erlang, nodes, []), SetupSpeed = round((Rounds*1000000*1000) / SetupTime), CycleSpeed = round((Rounds*1000000*1000) / CycleTime), _ = report(Prefix++" Setup", SetupSpeed, "setups/1000s"), @@ -275,6 +280,22 @@ setup_loop(A, B, T, N) -> setup_loop(A, B, Time + T, N - 1) end. +setup_wait_nodedown(A, Time) -> + ok = net_kernel:monitor_nodes(true), + case nodes() of + [] -> + ok; + [A] -> + receive + {nodedown,A} -> + ok; + Unexpected -> + {error,{unexpected,Unexpected}} + after Time -> + {error,timeout} + end + end. + %%---------------- %% Roundtrip speed @@ -334,6 +355,18 @@ roundtrip_client(Pid, Mon, StartTime, N) -> %%----------------- %% Throughput speed +throughput_0(Config) -> + run_nodepair_test( + fun (A, B, Prefix, HA, HB) -> + throughput(A, B, Prefix, HA, HB, 500000, 0) + end, Config). + +throughput_64(Config) -> + run_nodepair_test( + fun (A, B, Prefix, HA, HB) -> + throughput(A, B, Prefix, HA, HB, 500000, 64) + end, Config). + throughput_1024(Config) -> run_nodepair_test( fun (A, B, Prefix, HA, HB) -> @@ -373,45 +406,198 @@ throughput_1048576(Config) -> throughput(A, B, Prefix, HA, HB, Packets, Size) -> [] = ssl_apply(HA, erlang, nodes, []), [] = ssl_apply(HB, erlang, nodes, []), - Time = + #{time := Time, + dist_stats := DistStats, + client_msacc_stats := ClientMsaccStats, + client_prof := ClientProf, + server_msacc_stats := ServerMsaccStats, + server_prof := ServerProf} = ssl_apply(HA, fun () -> throughput_runner(A, B, Packets, Size) end), [B] = ssl_apply(HA, erlang, nodes, []), [A] = ssl_apply(HB, erlang, nodes, []), - Speed = round((Packets*Size*1000000) / (1024*Time)), + ClientMsaccStats =:= undefined orelse + msacc:print(ClientMsaccStats), + io:format("DistStats: ~p~n", [DistStats]), + Overhead = + 50 % Distribution protocol headers (empirical) (TLS+=54) + + byte_size(erlang:term_to_binary([0|<<>>])), % Benchmark overhead + Bytes = Packets * (Size + Overhead), + io:format("~w bytes, ~.4g s~n", [Bytes,Time/1000000]), + ClientMsaccStats =:= undefined orelse + io:format( + "Sender core usage ratio: ~.4g ns/byte~n", + [msacc:stats(system_runtime, ClientMsaccStats)*1000/Bytes]), + ServerMsaccStats =:= undefined orelse + begin + io:format( + "Receiver core usage ratio: ~.4g ns/byte~n", + [msacc:stats(system_runtime, ServerMsaccStats)*1000/Bytes]), + msacc:print(ServerMsaccStats) + end, + io:format("******* ClientProf:~n", []), prof_print(ClientProf), + io:format("******* ServerProf:~n", []), prof_print(ServerProf), + Speed = round((Bytes * 1000000) / (1024 * Time)), report(Prefix++" Throughput_"++integer_to_list(Size), Speed, "kB/s"). %% Runs on node A and spawns a server on node B throughput_runner(A, B, Rounds, Size) -> Payload = payload(Size), - ClientPid = self(), [A] = rpc:call(B, erlang, nodes, []), + ClientPid = self(), ServerPid = erlang:spawn( B, fun () -> throughput_server(ClientPid, Rounds) end), ServerMon = erlang:monitor(process, ServerPid), - microseconds( - throughput_client( - ServerPid, ServerMon, Payload, start_time(), Rounds)). + msacc:available() andalso + begin + msacc:stop(), + msacc:reset(), + msacc:start(), + ok + end, + prof_start(), + {Time,ServerMsaccStats,ServerProf} = + throughput_client(ServerPid, ServerMon, Payload, Rounds), + prof_stop(), + ClientMsaccStats = + case msacc:available() of + true -> + MStats = msacc:stats(), + msacc:stop(), + MStats; + false -> + undefined + end, + ClientProf = prof_end(), + [{_Node,Socket}] = dig_dist_node_sockets(), + DistStats = inet:getstat(Socket), + #{time => microseconds(Time), + dist_stats => DistStats, + client_msacc_stats => ClientMsaccStats, + client_prof => ClientProf, + server_msacc_stats => ServerMsaccStats, + server_prof => ServerProf}. + +dig_dist_node_sockets() -> + [case DistCtrl of + {_Node,Socket} = NodeSocket when is_port(Socket) -> + NodeSocket; + {Node,DistCtrlPid} when is_pid(DistCtrlPid) -> + [{links,DistCtrlLinks}] = process_info(DistCtrlPid, [links]), + case [S || S <- DistCtrlLinks, is_port(S)] of + [Socket] -> + {Node,Socket}; + [] -> + [{monitors,[{process,DistSenderPid}]}] = + process_info(DistCtrlPid, [monitors]), + [{links,DistSenderLinks}] = + process_info(DistSenderPid, [links]), + [Socket] = [S || S <- DistSenderLinks, is_port(S)], + {Node,Socket} + end + end || DistCtrl <- erlang:system_info(dist_ctrl)]. + -throughput_server(_Pid, 0) -> - ok; throughput_server(Pid, N) -> + msacc:available() andalso + begin + msacc:stop(), + msacc:reset(), + msacc:start(), + ok + end, + prof_start(), + throughput_server_loop(Pid, N). + +throughput_server_loop(_Pid, 0) -> + prof_stop(), + MsaccStats = + case msacc:available() of + true -> + msacc:stop(), + MStats = msacc:stats(), + msacc:reset(), + MStats; + false -> + undefined + end, + Prof = prof_end(), + exit({ok,MsaccStats,Prof}); +throughput_server_loop(Pid, N) -> receive - [N|_] -> - throughput_server(Pid, N-1) + {Pid, N, _} -> + throughput_server_loop(Pid, N-1) end. -throughput_client(_Pid, Mon, _Payload, StartTime, 0) -> +throughput_client(Pid, Mon, Payload, N) -> + throughput_client_loop(Pid, Mon, Payload, N, start_time()). + +throughput_client_loop(_Pid, Mon, _Payload, 0, StartTime) -> receive - {'DOWN', Mon, _, _, normal} -> - elapsed_time(StartTime); + {'DOWN', Mon, _, _, {ok,MsaccStats,Prof}} -> + {elapsed_time(StartTime),MsaccStats,Prof}; {'DOWN', Mon, _, _, Other} -> exit(Other) end; -throughput_client(Pid, Mon, Payload, StartTime, N) -> - Pid ! [N|Payload], - throughput_client(Pid, Mon, Payload, StartTime, N - 1). +throughput_client_loop(Pid, Mon, Payload, N, StartTime) -> + Pid ! {self(), N, Payload}, + throughput_client_loop(Pid, Mon, Payload, N - 1, StartTime). + + +-define(prof, none). % none | cprof | eprof + +-if(?prof =:= cprof). +prof_start() -> + cprof:stop(), + cprof:start(), + ok. +-elif(?prof =:= eprof). +prof_start() -> + {ok,_} = eprof:start(), + profiling = eprof:start_profiling(processes()), + ok. +-elif(?prof =:= none). +prof_start() -> + ok. +-endif. + +-if(?prof =:= cprof). +prof_stop() -> + cprof:pause(), + ok. +-elif(?prof =:= eprof). +prof_stop() -> + _ = eprof:stop_profiling(), + ok. +-elif(?prof =:= none). +prof_stop() -> + ok. +-endif. + +-if(?prof =:= cprof). +prof_end() -> + Prof = cprof:analyse(), + cprof:stop(), + Prof. +-elif(?prof =:= eprof). +prof_end() -> + eprof:dump_data(). +-elif(?prof =:= none). +prof_end() -> + []. +-endif. + +-if(?prof =:= cprof). +prof_print(Prof) -> + io:format("~p.~n", [Prof]). +-elif(?prof =:= eprof). +prof_print(Dump) -> + eprof:analyze(undefined, total, [], Dump). +-elif(?prof =:= none). +prof_print([]) -> + ok. +-endif. %%%------------------------------------------------------------------- %%% Test cases helpers -- cgit v1.2.3 From cadbaea28debb5fb761597afb7f3ff85367f2a0c Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Wed, 28 Nov 2018 09:31:05 +0100 Subject: Handle tls_sender exit properly --- lib/ssl/src/ssl_connection.erl | 4 ++-- lib/ssl/src/tls_connection.erl | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/ssl_connection.erl b/lib/ssl/src/ssl_connection.erl index acd9f14f7b..cd0a1d858d 100644 --- a/lib/ssl/src/ssl_connection.erl +++ b/lib/ssl/src/ssl_connection.erl @@ -51,8 +51,8 @@ %% Alert and close handling -export([handle_own_alert/4, handle_alert/3, - handle_normal_shutdown/3, stop/2, stop_and_reply/3 - ]). + handle_normal_shutdown/3, stop/2, stop_and_reply/3, + handle_trusted_certs_db/1]). %% Data handling -export([read_application_data/2, internal_renegotiation/2]). diff --git a/lib/ssl/src/tls_connection.erl b/lib/ssl/src/tls_connection.erl index 4dfb50967d..a4fc45e834 100644 --- a/lib/ssl/src/tls_connection.erl +++ b/lib/ssl/src/tls_connection.erl @@ -658,6 +658,12 @@ downgrade(Type, Event, State) -> callback_mode() -> state_functions. + +terminate( + {shutdown, sender_died, Reason}, _StateName, + #state{socket = Socket, transport_cb = Transport} = State) -> + ssl_connection:handle_trusted_certs_db(State), + close(Reason, Socket, Transport, undefined, undefined); terminate(Reason, StateName, State) -> catch ssl_connection:terminate(Reason, StateName, State), ensure_sender_terminate(Reason, State). -- cgit v1.2.3 From b96f872bba8cbcd87d1e1f4a5caffdb362f45111 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 30 Nov 2018 14:52:19 +0100 Subject: Handle dead sender at terminate --- lib/ssl/src/tls_connection.erl | 12 +++++------- lib/ssl/src/tls_sender.erl | 10 +++++----- 2 files changed, 10 insertions(+), 12 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/tls_connection.erl b/lib/ssl/src/tls_connection.erl index a4fc45e834..87e7809817 100644 --- a/lib/ssl/src/tls_connection.erl +++ b/lib/ssl/src/tls_connection.erl @@ -367,13 +367,11 @@ send_alert_in_connection(#alert{description = ?CLOSE_NOTIFY} = Alert, State) -> send_alert_in_connection(Alert, #state{protocol_specific = #{sender := Sender}}) -> tls_sender:send_alert(Sender, Alert). -send_sync_alert(Alert, #state{protocol_specific = #{sender := Sender}}= State) -> - tls_sender:send_and_ack_alert(Sender, Alert), - receive - {Sender, ack_alert} -> - ok - after ?DEFAULT_TIMEOUT -> - %% Sender is blocked terminate anyway +send_sync_alert( + Alert, #state{protocol_specific = #{sender := Sender}} = State) -> + try tls_sender:send_and_ack_alert(Sender, Alert) + catch + _:_ -> throw({stop, {shutdown, own_alert}, State}) end. diff --git a/lib/ssl/src/tls_sender.erl b/lib/ssl/src/tls_sender.erl index 107a0dcfd4..ee2b7be4f4 100644 --- a/lib/ssl/src/tls_sender.erl +++ b/lib/ssl/src/tls_sender.erl @@ -102,7 +102,7 @@ send_alert(Pid, Alert) -> %% in the connection state and recive an ack. %%-------------------------------------------------------------------- send_and_ack_alert(Pid, Alert) -> - gen_statem:cast(Pid, {ack_alert, Alert}). + gen_statem:call(Pid, {ack_alert, Alert}, ?DEFAULT_TIMEOUT). %%-------------------------------------------------------------------- -spec setopts(pid(), [{packet, integer() | atom()}]) -> ok | {error, term()}. %% Description: Send application data @@ -235,13 +235,13 @@ connection({call, From}, {dist_handshake_complete, _Node, DHandle}, [{next_event, internal, {application_packets,{self(),undefined},Data}}] end]}; +connection({call, From}, {ack_alert, #alert{} = Alert}, StateData0) -> + StateData = send_tls_alert(Alert, StateData0), + {next_state, ?FUNCTION_NAME, StateData, + [{reply,From,ok}]}; connection(internal, {application_packets, From, Data}, StateData) -> send_application_data(Data, From, ?FUNCTION_NAME, StateData); %% -connection(cast, {ack_alert, #alert{} = Alert}, #data{connection_pid = Pid} =StateData0) -> - StateData = send_tls_alert(Alert, StateData0), - Pid ! {self(), ack_alert}, - {next_state, ?FUNCTION_NAME, StateData}; connection(cast, #alert{} = Alert, StateData0) -> StateData = send_tls_alert(Alert, StateData0), {next_state, ?FUNCTION_NAME, StateData}; -- cgit v1.2.3 From 5d2e52ff81558471499986386d6018cdb4e8ccc8 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Mon, 3 Dec 2018 14:26:43 +0100 Subject: Handle socket close in state downgrade --- lib/ssl/src/ssl_connection.erl | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'lib') diff --git a/lib/ssl/src/ssl_connection.erl b/lib/ssl/src/ssl_connection.erl index cd0a1d858d..9b16c3c9ed 100644 --- a/lib/ssl/src/ssl_connection.erl +++ b/lib/ssl/src/ssl_connection.erl @@ -1092,6 +1092,12 @@ downgrade(internal, #alert{description = ?CLOSE_NOTIFY}, downgrade(timeout, downgrade, #state{downgrade = {_, From}} = State, _) -> gen_statem:reply(From, {error, timeout}), stop(normal, State); +downgrade( + info, {CloseTag, Socket}, + #state{socket = Socket, close_tag = CloseTag, downgrade = {_, From}} = + State, _) -> + gen_statem:reply(From, {error, CloseTag}), + stop(normal, State); downgrade(Type, Event, State, Connection) -> handle_common_event(Type, Event, ?FUNCTION_NAME, State, Connection). -- cgit v1.2.3