From fd3c106841e481aea0dc5ebf9b9fee1d34da5638 Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Thu, 22 Nov 2018 12:17:37 +0100
Subject: Improve dist send throughput

---
 lib/ssl/src/tls_sender.erl | 86 ++++++++++++++++++++++++++++++----------------
 1 file changed, 57 insertions(+), 29 deletions(-)

diff --git a/lib/ssl/src/tls_sender.erl b/lib/ssl/src/tls_sender.erl
index a245ee2465..107a0dcfd4 100644
--- a/lib/ssl/src/tls_sender.erl
+++ b/lib/ssl/src/tls_sender.erl
@@ -200,8 +200,9 @@ connection({call, From}, renegotiate,
            #data{connection_states = #{current_write := Write}} = StateData) ->
     {next_state, handshake, StateData, [{reply, From, {ok, Write}}]};
 connection({call, From}, {application_data, AppData}, 
-           #data{socket_options = SockOpts} = StateData) ->                   
-    case encode_packet(AppData, SockOpts) of
+           #data{socket_options = #socket_options{packet = Packet}} =
+               StateData) ->
+    case encode_packet(Packet, AppData) of
         {error, _} = Error ->
             {next_state, ?FUNCTION_NAME, StateData, [{reply, From, Error}]};
         Data ->
@@ -217,13 +218,26 @@ connection({call, From}, dist_get_tls_socket,
                  tracker = Tracker} = StateData) ->
     TLSSocket = Connection:socket([Pid, self()], Transport, Socket, Connection, Tracker),
     {next_state, ?FUNCTION_NAME, StateData, [{reply, From, {ok, TLSSocket}}]};
-connection({call, From}, {dist_handshake_complete, _Node, DHandle}, #data{connection_pid = Pid} = StateData) ->
+connection({call, From}, {dist_handshake_complete, _Node, DHandle},
+           #data{connection_pid = Pid,
+                 socket_options = #socket_options{packet = Packet}} =
+               StateData) ->
     ok = erlang:dist_ctrl_input_handler(DHandle, Pid),
     ok = ssl_connection:dist_handshake_complete(Pid, DHandle),
     %% From now on we execute on normal priority
     process_flag(priority, normal),
-    Events = dist_data_events(DHandle, []),
-    {next_state, ?FUNCTION_NAME, StateData#data{dist_handle = DHandle}, [{reply, From, ok} | Events]};
+    {next_state, ?FUNCTION_NAME, StateData#data{dist_handle = DHandle},
+     [{reply, From, ok}
+      | case dist_data(DHandle, Packet) of
+            [] ->
+                [];
+            Data ->
+                [{next_event, internal,
+                  {application_packets,{self(),undefined},Data}}]
+        end]};
+connection(internal, {application_packets, From, Data}, StateData) ->
+    send_application_data(Data, From, ?FUNCTION_NAME, StateData);
+%%
 connection(cast, {ack_alert, #alert{} = Alert}, #data{connection_pid = Pid} =StateData0) ->
     StateData = send_tls_alert(Alert, StateData0),
     Pid ! {self(), ack_alert},
@@ -237,9 +251,19 @@ connection(cast, {new_write, WritesState, Version},
      StateData#data{connection_states = 
                         ConnectionStates0#{current_write => WritesState},
                     negotiated_version = Version}};
-connection(info, dist_data, #data{dist_handle = DHandle} = StateData) ->  
-    Events = dist_data_events(DHandle, []),
-    {next_state, ?FUNCTION_NAME, StateData, Events};
+%%
+connection(info, dist_data,
+           #data{dist_handle = DHandle,
+                 socket_options = #socket_options{packet = Packet}} =
+               StateData) ->
+    {next_state, ?FUNCTION_NAME, StateData,
+      case dist_data(DHandle, Packet) of
+          [] ->
+              [];
+          Data ->
+              [{next_event, internal,
+               {application_packets,{self(),undefined},Data}}]
+      end};
 connection(info, tick, StateData) ->  
     consume_ticks(),
     {next_state, ?FUNCTION_NAME, StateData, 
@@ -272,6 +296,8 @@ handshake(cast, {new_write, WritesState, Version},
      StateData#data{connection_states = 
                         ConnectionStates0#{current_write => WritesState},
                    negotiated_version = Version}};
+handshake(internal, {application_packets,_,_}, _) ->
+    {keep_state_and_data, [postpone]};
 handshake(info, Msg, StateData) -> 
     handle_info(Msg, ?FUNCTION_NAME, StateData).
 
@@ -342,12 +368,13 @@ send_application_data(Data, From, StateName,
                              renegotiate_at = RenegotiateAt} = StateData0) ->
     case time_to_renegotiate(Data, ConnectionStates0, RenegotiateAt) of
 	true ->
-	    ssl_connection:internal_renegotiation(Pid, ConnectionStates0), 
+	    ssl_connection:internal_renegotiation(Pid, ConnectionStates0),
             {next_state, handshake, StateData0, 
-             [{next_event, {call, From}, {application_data, Data}}]};
+             [{next_event, internal, {application_packets, From, Data}}]};
 	false ->
 	    {Msgs, ConnectionStates} =
-                Connection:encode_data(Data, Version, ConnectionStates0),
+                Connection:encode_data(
+                  iolist_to_binary(Data), Version, ConnectionStates0),
             StateData = StateData0#data{connection_states = ConnectionStates},
 	    case Connection:send(Transport, Socket, Msgs) of
                 ok when DistHandle =/=  undefined ->
@@ -361,21 +388,18 @@ send_application_data(Data, From, StateName,
             end
     end.
 
-encode_packet(Data, #socket_options{packet=Packet}) ->
+-compile({inline, encode_packet/2}).
+encode_packet(Packet, Data) ->
+    Len = iolist_size(Data),
     case Packet of
-	1 -> encode_size_packet(Data, 8,  (1 bsl 8) - 1);
-	2 -> encode_size_packet(Data, 16, (1 bsl 16) - 1);
-	4 -> encode_size_packet(Data, 32, (1 bsl 32) - 1);
-	_ -> Data
-    end.
-
-encode_size_packet(Bin, Size, Max) ->
-    Len = erlang:byte_size(Bin),
-    case Len > Max of
-	true  -> 
-            {error, {badarg, {packet_to_large, Len, Max}}};
-        false -> 
-            <<Len:Size, Bin/binary>>
+        1 when Len < (1 bsl 8) ->  [<<Len:8>>,Data];
+        2 when Len < (1 bsl 16) -> [<<Len:16>>,Data];
+        4 when Len < (1 bsl 32) -> [<<Len:32>>,Data];
+        N when N =:= 1; N =:= 2; N =:= 4 ->
+            {error,
+             {badarg, {packet_to_large, Len, (1 bsl (Packet bsl 3)) - 1}}};
+        _ ->
+            Data
     end.
 
 set_opts(SocketOptions, [{packet, N}]) ->
@@ -409,14 +433,18 @@ call(FsmPid, Event) ->
 
 %%---------------Erlang distribution --------------------------------------
 
-dist_data_events(DHandle, Events) ->
+dist_data(DHandle, Packet) ->
     case erlang:dist_ctrl_get_data(DHandle) of
         none ->
             erlang:dist_ctrl_get_data_notification(DHandle),
-            lists:reverse(Events);
+            [];
         Data ->
-            Event = {next_event, {call, {self(), undefined}}, {application_data, Data}},
-            dist_data_events(DHandle, [Event | Events])
+            %% This is encode_packet(4, Data) without Len check
+            %% since the emulator will always deliver a Data
+            %% smaller than 4 GB, and the distribution will
+            %% therefore always have to use {packet,4}
+            Len = iolist_size(Data),
+            [<<Len:32>>,Data|dist_data(DHandle, Packet)]
     end.
 
 consume_ticks() ->
-- 
cgit v1.2.3