From 0078ebf6c5311edc1a07be71fb7a127a175a60fa Mon Sep 17 00:00:00 2001 From: Ingela Anderton Andin Date: Wed, 15 Aug 2018 16:54:03 +0200 Subject: ssl: Adopt distribution over TLS to use new sender process --- lib/ssl/src/tls_sender.erl | 94 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 87 insertions(+), 7 deletions(-) (limited to 'lib/ssl/src/tls_sender.erl') diff --git a/lib/ssl/src/tls_sender.erl b/lib/ssl/src/tls_sender.erl index 4aeb13284f..2746d89048 100644 --- a/lib/ssl/src/tls_sender.erl +++ b/lib/ssl/src/tls_sender.erl @@ -27,8 +27,8 @@ -include("ssl_handshake.hrl"). %% API --export([start/0, initialize/2, send_data/2, send_alert/2, renegotiate/1, - update_connection_state/3]). +-export([start/0, start/1, initialize/2, send_data/2, send_alert/2, renegotiate/1, + update_connection_state/3, dist_tls_socket/1, dist_handshake_complete/3]). %% gen_statem callbacks -export([callback_mode/0, init/1, terminate/3, code_change/4]). @@ -38,13 +38,16 @@ -record(data, {connection_pid, connection_states = #{}, + role, socket, socket_options, + tracker, protocol_cb, transport_cb, negotiated_version, renegotiate_at, - connection_monitor + connection_monitor, + dist_handle }). %%%=================================================================== @@ -53,9 +56,24 @@ -spec start() -> {ok, Pid :: pid()} | ignore | {error, Error :: term()}. +-spec start(list()) -> {ok, Pid :: pid()} | + ignore | + {error, Error :: term()}. + +%% Description: Start sender process to avoid dead lock that +%% may happen when a socket is busy (busy port) and the +%% same process is sending and receiving +%%-------------------------------------------------------------------- start() -> gen_statem:start(?MODULE, [], []). +start(SpawnOpts) -> + gen_statem:start_link(?MODULE, [], SpawnOpts). +%%-------------------------------------------------------------------- +-spec initialize(pid(), map()) -> ok. +%% Description: So TLS connection process can initialize it sender +%% process. +%%-------------------------------------------------------------------- initialize(Pid, InitMsg) -> gen_statem:call(Pid, {self(), InitMsg}). @@ -82,6 +100,12 @@ renegotiate(Pid) -> update_connection_state(Pid, NewState, Version) -> gen_statem:cast(Pid, {new_write, NewState, Version}). +dist_handshake_complete(ConnectionPid, Node, DHandle) -> + gen_statem:call(ConnectionPid, {dist_handshake_complete, Node, DHandle}). + +dist_tls_socket(Pid) -> + gen_statem:call(Pid, dist_get_tls_socket). + %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== @@ -105,8 +129,10 @@ init(_) -> gen_statem:event_handler_result(atom()). %%-------------------------------------------------------------------- init({call, From}, {Pid, #{current_write := WriteState, + role := Role, socket := Socket, socket_options := SockOpts, + tracker := Tracker, protocol_cb := Connection, transport_cb := Transport, negotiated_version := Version, @@ -118,8 +144,10 @@ init({call, From}, {Pid, #{current_write := WriteState, connection_monitor = Monitor, connection_states = ConnectionStates#{current_write => WriteState}, + role = Role, socket = Socket, socket_options = SockOpts, + tracker = Tracker, protocol_cb = Connection, transport_cb = Transport, negotiated_version = Version, @@ -137,13 +165,28 @@ connection({call, From}, renegotiate, #data{connection_states = #{current_write := Write}} = StateData) -> {next_state, handshake, StateData, [{reply, From, {ok, Write}}]}; connection({call, From}, {application_data, AppData}, - #data{socket_options = SockOpts} = StateData) -> + #data{socket_options = SockOpts} = StateData) -> case encode_packet(AppData, SockOpts) of {error, _} = Error -> {next_state, ?FUNCTION_NAME, StateData, [{reply, From, Error}]}; Data -> send_application_data(Data, From, ?FUNCTION_NAME, StateData) end; +connection({call, From}, dist_get_tls_socket, + #data{protocol_cb = Connection, + transport_cb = Transport, + socket = Socket, + connection_pid = Pid, + tracker = Tracker} = StateData) -> + TLSSocket = Connection:socket([Pid, self()], Transport, Socket, Connection, Tracker), + {next_state, ?FUNCTION_NAME, StateData, [{reply, From, {ok, TLSSocket}}]}; +connection({call, From}, {dist_handshake_complete, _Node, DHandle}, #data{connection_pid = Pid} = StateData) -> + ok = erlang:dist_ctrl_input_handler(DHandle, Pid), + ok = ssl_connection:dist_handshake_complete(Pid, DHandle), + %% From now on we execute on normal priority + process_flag(priority, normal), + Events = dist_data_events(DHandle, []), + {next_state, ?FUNCTION_NAME, StateData#data{dist_handle = DHandle}, [{reply, From, ok} | Events]}; connection(cast, #alert{} = Alert, StateData0) -> StateData = send_tls_alert(Alert, StateData0), {next_state, ?FUNCTION_NAME, StateData}; @@ -153,6 +196,23 @@ connection(cast, {new_write, WritesState, Version}, StateData#data{connection_states = ConnectionStates0#{current_write => WritesState}, negotiated_version = Version}}; +connection(info, dist_data, #data{dist_handle = DHandle} = StateData) -> + Events = dist_data_events(DHandle, []), + {next_state, ?FUNCTION_NAME, StateData, Events}; +connection(info, tick, StateData) -> + consume_ticks(), + {next_state, ?FUNCTION_NAME, StateData, + [{next_event, {call, {self(), undefined}}, + {application_data, <<>>}}]}; +connection(info, {send, From, Ref, Data}, _StateData) -> + %% This is for testing only! + %% + %% Needed by some OTP distribution + %% test suites... + From ! {Ref, ok}, + {keep_state_and_data, + [{next_event, {call, {self(), undefined}}, + {application_data, iolist_to_binary(Data)}}]}; connection(info, Msg, StateData) -> handle_info(Msg, ?FUNCTION_NAME, StateData). %%-------------------------------------------------------------------- @@ -209,9 +269,10 @@ send_tls_alert(Alert, #data{negotiated_version = Version, Connection:send(Transport, Socket, BinMsg), StateData0#data{connection_states = ConnectionStates}. -send_application_data(Data, {FromPid, _} = From, StateName, +send_application_data(Data, From, StateName, #data{connection_pid = Pid, socket = Socket, + dist_handle = DistHandle, negotiated_version = Version, protocol_cb = Connection, transport_cb = Transport, @@ -227,9 +288,9 @@ send_application_data(Data, {FromPid, _} = From, StateName, Connection:encode_data(Data, Version, ConnectionStates0), StateData = StateData0#data{connection_states = ConnectionStates}, case Connection:send(Transport, Socket, Msgs) of - ok when FromPid =:= Pid -> + ok when DistHandle =/= undefined -> {next_state, StateName, StateData, []}; - Error when FromPid =:= Pid -> + Error when DistHandle =/= undefined -> ssl_connection:stop({shutdown, Error}, StateData); ok -> {next_state, StateName, StateData, [{reply, From, ok}]}; @@ -279,3 +340,22 @@ call(FsmPid, Event) -> exit:{{shutdown, _},_} -> {error, closed} end. + +%%---------------Erlang distribution -------------------------------------- + +dist_data_events(DHandle, Events) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + erlang:dist_ctrl_get_data_notification(DHandle), + lists:reverse(Events); + Data -> + Event = {next_event, {call, {self(), undefined}}, {application_data, Data}}, + dist_data_events(DHandle, [Event | Events]) + end. + +consume_ticks() -> + receive tick -> + consume_ticks() + after 0 -> + ok + end. -- cgit v1.2.3