aboutsummaryrefslogtreecommitdiffstats
path: root/lib/ssl/src/tls_sender.erl
diff options
context:
space:
mode:
authorIngela Anderton Andin <[email protected]>2018-08-15 16:54:03 +0200
committerIngela Anderton Andin <[email protected]>2018-08-27 15:21:01 +0200
commit0078ebf6c5311edc1a07be71fb7a127a175a60fa (patch)
tree107108b71e7ea255ce4b8df460a9ae201a4801f6 /lib/ssl/src/tls_sender.erl
parentd87ac1c55188f5ba5cdf72384125d94d42118c18 (diff)
downloadotp-0078ebf6c5311edc1a07be71fb7a127a175a60fa.tar.gz
otp-0078ebf6c5311edc1a07be71fb7a127a175a60fa.tar.bz2
otp-0078ebf6c5311edc1a07be71fb7a127a175a60fa.zip
ssl: Adopt distribution over TLS to use new sender process
Diffstat (limited to 'lib/ssl/src/tls_sender.erl')
-rw-r--r--lib/ssl/src/tls_sender.erl94
1 files changed, 87 insertions, 7 deletions
diff --git a/lib/ssl/src/tls_sender.erl b/lib/ssl/src/tls_sender.erl
index 4aeb13284f..2746d89048 100644
--- a/lib/ssl/src/tls_sender.erl
+++ b/lib/ssl/src/tls_sender.erl
@@ -27,8 +27,8 @@
-include("ssl_handshake.hrl").
%% API
--export([start/0, initialize/2, send_data/2, send_alert/2, renegotiate/1,
- update_connection_state/3]).
+-export([start/0, start/1, initialize/2, send_data/2, send_alert/2, renegotiate/1,
+ update_connection_state/3, dist_tls_socket/1, dist_handshake_complete/3]).
%% gen_statem callbacks
-export([callback_mode/0, init/1, terminate/3, code_change/4]).
@@ -38,13 +38,16 @@
-record(data, {connection_pid,
connection_states = #{},
+ role,
socket,
socket_options,
+ tracker,
protocol_cb,
transport_cb,
negotiated_version,
renegotiate_at,
- connection_monitor
+ connection_monitor,
+ dist_handle
}).
%%%===================================================================
@@ -53,9 +56,24 @@
-spec start() -> {ok, Pid :: pid()} |
ignore |
{error, Error :: term()}.
+-spec start(list()) -> {ok, Pid :: pid()} |
+ ignore |
+ {error, Error :: term()}.
+
+%% Description: Start sender process to avoid dead lock that
+%% may happen when a socket is busy (busy port) and the
+%% same process is sending and receiving
+%%--------------------------------------------------------------------
start() ->
gen_statem:start(?MODULE, [], []).
+start(SpawnOpts) ->
+ gen_statem:start_link(?MODULE, [], SpawnOpts).
+%%--------------------------------------------------------------------
+-spec initialize(pid(), map()) -> ok.
+%% Description: So TLS connection process can initialize it sender
+%% process.
+%%--------------------------------------------------------------------
initialize(Pid, InitMsg) ->
gen_statem:call(Pid, {self(), InitMsg}).
@@ -82,6 +100,12 @@ renegotiate(Pid) ->
update_connection_state(Pid, NewState, Version) ->
gen_statem:cast(Pid, {new_write, NewState, Version}).
+dist_handshake_complete(ConnectionPid, Node, DHandle) ->
+ gen_statem:call(ConnectionPid, {dist_handshake_complete, Node, DHandle}).
+
+dist_tls_socket(Pid) ->
+ gen_statem:call(Pid, dist_get_tls_socket).
+
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
@@ -105,8 +129,10 @@ init(_) ->
gen_statem:event_handler_result(atom()).
%%--------------------------------------------------------------------
init({call, From}, {Pid, #{current_write := WriteState,
+ role := Role,
socket := Socket,
socket_options := SockOpts,
+ tracker := Tracker,
protocol_cb := Connection,
transport_cb := Transport,
negotiated_version := Version,
@@ -118,8 +144,10 @@ init({call, From}, {Pid, #{current_write := WriteState,
connection_monitor = Monitor,
connection_states =
ConnectionStates#{current_write => WriteState},
+ role = Role,
socket = Socket,
socket_options = SockOpts,
+ tracker = Tracker,
protocol_cb = Connection,
transport_cb = Transport,
negotiated_version = Version,
@@ -137,13 +165,28 @@ connection({call, From}, renegotiate,
#data{connection_states = #{current_write := Write}} = StateData) ->
{next_state, handshake, StateData, [{reply, From, {ok, Write}}]};
connection({call, From}, {application_data, AppData},
- #data{socket_options = SockOpts} = StateData) ->
+ #data{socket_options = SockOpts} = StateData) ->
case encode_packet(AppData, SockOpts) of
{error, _} = Error ->
{next_state, ?FUNCTION_NAME, StateData, [{reply, From, Error}]};
Data ->
send_application_data(Data, From, ?FUNCTION_NAME, StateData)
end;
+connection({call, From}, dist_get_tls_socket,
+ #data{protocol_cb = Connection,
+ transport_cb = Transport,
+ socket = Socket,
+ connection_pid = Pid,
+ tracker = Tracker} = StateData) ->
+ TLSSocket = Connection:socket([Pid, self()], Transport, Socket, Connection, Tracker),
+ {next_state, ?FUNCTION_NAME, StateData, [{reply, From, {ok, TLSSocket}}]};
+connection({call, From}, {dist_handshake_complete, _Node, DHandle}, #data{connection_pid = Pid} = StateData) ->
+ ok = erlang:dist_ctrl_input_handler(DHandle, Pid),
+ ok = ssl_connection:dist_handshake_complete(Pid, DHandle),
+ %% From now on we execute on normal priority
+ process_flag(priority, normal),
+ Events = dist_data_events(DHandle, []),
+ {next_state, ?FUNCTION_NAME, StateData#data{dist_handle = DHandle}, [{reply, From, ok} | Events]};
connection(cast, #alert{} = Alert, StateData0) ->
StateData = send_tls_alert(Alert, StateData0),
{next_state, ?FUNCTION_NAME, StateData};
@@ -153,6 +196,23 @@ connection(cast, {new_write, WritesState, Version},
StateData#data{connection_states =
ConnectionStates0#{current_write => WritesState},
negotiated_version = Version}};
+connection(info, dist_data, #data{dist_handle = DHandle} = StateData) ->
+ Events = dist_data_events(DHandle, []),
+ {next_state, ?FUNCTION_NAME, StateData, Events};
+connection(info, tick, StateData) ->
+ consume_ticks(),
+ {next_state, ?FUNCTION_NAME, StateData,
+ [{next_event, {call, {self(), undefined}},
+ {application_data, <<>>}}]};
+connection(info, {send, From, Ref, Data}, _StateData) ->
+ %% This is for testing only!
+ %%
+ %% Needed by some OTP distribution
+ %% test suites...
+ From ! {Ref, ok},
+ {keep_state_and_data,
+ [{next_event, {call, {self(), undefined}},
+ {application_data, iolist_to_binary(Data)}}]};
connection(info, Msg, StateData) ->
handle_info(Msg, ?FUNCTION_NAME, StateData).
%%--------------------------------------------------------------------
@@ -209,9 +269,10 @@ send_tls_alert(Alert, #data{negotiated_version = Version,
Connection:send(Transport, Socket, BinMsg),
StateData0#data{connection_states = ConnectionStates}.
-send_application_data(Data, {FromPid, _} = From, StateName,
+send_application_data(Data, From, StateName,
#data{connection_pid = Pid,
socket = Socket,
+ dist_handle = DistHandle,
negotiated_version = Version,
protocol_cb = Connection,
transport_cb = Transport,
@@ -227,9 +288,9 @@ send_application_data(Data, {FromPid, _} = From, StateName,
Connection:encode_data(Data, Version, ConnectionStates0),
StateData = StateData0#data{connection_states = ConnectionStates},
case Connection:send(Transport, Socket, Msgs) of
- ok when FromPid =:= Pid ->
+ ok when DistHandle =/= undefined ->
{next_state, StateName, StateData, []};
- Error when FromPid =:= Pid ->
+ Error when DistHandle =/= undefined ->
ssl_connection:stop({shutdown, Error}, StateData);
ok ->
{next_state, StateName, StateData, [{reply, From, ok}]};
@@ -279,3 +340,22 @@ call(FsmPid, Event) ->
exit:{{shutdown, _},_} ->
{error, closed}
end.
+
+%%---------------Erlang distribution --------------------------------------
+
+dist_data_events(DHandle, Events) ->
+ case erlang:dist_ctrl_get_data(DHandle) of
+ none ->
+ erlang:dist_ctrl_get_data_notification(DHandle),
+ lists:reverse(Events);
+ Data ->
+ Event = {next_event, {call, {self(), undefined}}, {application_data, Data}},
+ dist_data_events(DHandle, [Event | Events])
+ end.
+
+consume_ticks() ->
+ receive tick ->
+ consume_ticks()
+ after 0 ->
+ ok
+ end.