diff options
-rw-r--r-- | lib/ssl/src/tls_sender.erl | 142 |
1 files changed, 87 insertions, 55 deletions
diff --git a/lib/ssl/src/tls_sender.erl b/lib/ssl/src/tls_sender.erl index 11fcc6def0..9cbd45bdc1 100644 --- a/lib/ssl/src/tls_sender.erl +++ b/lib/ssl/src/tls_sender.erl @@ -171,6 +171,10 @@ dist_tls_socket(Pid) -> callback_mode() -> state_functions. + +-define(HANDLE_COMMON, + ?FUNCTION_NAME(Type, Msg, StateData) -> + handle_common(Type, Msg, StateData)). %%-------------------------------------------------------------------- -spec init(Args :: term()) -> gen_statem:init_result(atom()). @@ -212,17 +216,16 @@ init({call, From}, {Pid, #{current_write := WriteState, negotiated_version = Version, renegotiate_at = RenegotiateAt}, {next_state, handshake, StateData, [{reply, From, ok}]}; -init(info, Msg, StateData) -> - handle_info(Msg, ?FUNCTION_NAME, StateData). +init(_, _, _) -> + %% Just in case anything else sneeks through + {keep_state_and_data, [postpone]}. + %%-------------------------------------------------------------------- -spec connection(gen_statem:event_type(), Msg :: term(), StateData :: term()) -> gen_statem:event_handler_result(atom()). %%-------------------------------------------------------------------- -connection({call, From}, renegotiate, - #data{connection_states = #{current_write := Write}} = StateData) -> - {next_state, handshake, StateData, [{reply, From, {ok, Write}}]}; connection({call, From}, {application_data, AppData}, #data{socket_options = #socket_options{packet = Packet}} = StateData) -> @@ -232,8 +235,18 @@ connection({call, From}, {application_data, AppData}, Data -> send_application_data(Data, From, ?FUNCTION_NAME, StateData) end; -connection({call, From}, {set_opts, _} = Call, StateData) -> - handle_call(From, Call, ?FUNCTION_NAME, StateData); +connection({call, From}, {ack_alert, #alert{} = Alert}, StateData0) -> + StateData = send_tls_alert(Alert, StateData0), + {next_state, ?FUNCTION_NAME, StateData, + [{reply,From,ok}]}; +connection({call, From}, renegotiate, + #data{connection_states = #{current_write := Write}} = StateData) -> + {next_state, handshake, StateData, [{reply, From, {ok, Write}}]}; +connection({call, From}, downgrade, #data{connection_states = + #{current_write := Write}} = StateData) -> + {next_state, death_row, StateData, [{reply,From, {ok, Write}}]}; +connection({call, From}, {set_opts, Opts}, StateData) -> + handle_set_opts(From, Opts, StateData); connection({call, From}, dist_get_tls_socket, #data{protocol_cb = Connection, transport_cb = Transport, @@ -243,29 +256,20 @@ connection({call, From}, dist_get_tls_socket, TLSSocket = Connection:socket([Pid, self()], Transport, Socket, Connection, Tracker), {next_state, ?FUNCTION_NAME, StateData, [{reply, From, {ok, TLSSocket}}]}; connection({call, From}, {dist_handshake_complete, _Node, DHandle}, - #data{connection_pid = Pid, - socket_options = #socket_options{packet = Packet}} = - StateData) -> + #data{connection_pid = Pid} = StateData) -> ok = erlang:dist_ctrl_input_handler(DHandle, Pid), ok = ssl_connection:dist_handshake_complete(Pid, DHandle), %% From now on we execute on normal priority process_flag(priority, normal), - {next_state, ?FUNCTION_NAME, StateData#data{dist_handle = DHandle}, - [{reply, From, ok} - | case dist_data(DHandle, Packet) of - [] -> - []; - Data -> - [{next_event, internal, - {application_packets,{self(),undefined},Data}}] - end]}; -connection({call, From}, {ack_alert, #alert{} = Alert}, StateData0) -> - StateData = send_tls_alert(Alert, StateData0), - {next_state, ?FUNCTION_NAME, StateData, - [{reply,From,ok}]}; -connection({call, From}, downgrade, #data{connection_states = - #{current_write := Write}} = StateData) -> - {next_state, death_row, StateData, [{reply,From, {ok, Write}}]}; + {keep_state, StateData#data{dist_handle = DHandle}, + [{reply,From,ok}| + case dist_data(DHandle) of + [] -> + []; + Data -> + [{next_event, internal, + {application_packets,{self(),undefined},Data}}] + end]}; connection(internal, {application_packets, From, Data}, StateData) -> send_application_data(Data, From, ?FUNCTION_NAME, StateData); %% @@ -279,12 +283,9 @@ connection(cast, {new_write, WritesState, Version}, ConnectionStates0#{current_write => WritesState}, negotiated_version = Version}}; %% -connection(info, dist_data, - #data{dist_handle = DHandle, - socket_options = #socket_options{packet = Packet}} = - StateData) -> - {next_state, ?FUNCTION_NAME, StateData, - case dist_data(DHandle, Packet) of +connection(info, dist_data, #data{dist_handle = DHandle}) -> + {keep_state_and_data, + case dist_data(DHandle) of [] -> []; Data -> @@ -293,9 +294,9 @@ connection(info, dist_data, end}; connection(info, tick, StateData) -> consume_ticks(), - {next_state, ?FUNCTION_NAME, StateData, - [{next_event, {call, {self(), undefined}}, - {application_data, <<>>}}]}; + Data = <<0:32>>, % encode_packet(4, <<>>) + From = {self(), undefined}, + send_application_data(Data, From, ?FUNCTION_NAME, StateData); connection(info, {send, From, Ref, Data}, _StateData) -> %% This is for testing only! %% @@ -305,17 +306,20 @@ connection(info, {send, From, Ref, Data}, _StateData) -> {keep_state_and_data, [{next_event, {call, {self(), undefined}}, {application_data, iolist_to_binary(Data)}}]}; -connection(info, Msg, StateData) -> - handle_info(Msg, ?FUNCTION_NAME, StateData). +?HANDLE_COMMON. + %%-------------------------------------------------------------------- -spec handshake(gen_statem:event_type(), Msg :: term(), StateData :: term()) -> gen_statem:event_handler_result(atom()). %%-------------------------------------------------------------------- -handshake({call, From}, {set_opts, _} = Call, StateData) -> - handle_call(From, Call, ?FUNCTION_NAME, StateData); +handshake({call, From}, {set_opts, Opts}, StateData) -> + handle_set_opts(From, Opts, StateData); handshake({call, _}, _, _) -> + %% Postpone all calls to the connection state + {keep_state_and_data, [postpone]}; +handshake(internal, {application_packets,_,_}, _) -> {keep_state_and_data, [postpone]}; handshake(cast, {new_write, WritesState, Version}, #data{connection_states = ConnectionStates0} = StateData) -> @@ -323,10 +327,16 @@ handshake(cast, {new_write, WritesState, Version}, StateData#data{connection_states = ConnectionStates0#{current_write => WritesState}, negotiated_version = Version}}; -handshake(internal, {application_packets,_,_}, _) -> +handshake(info, dist_data, _) -> {keep_state_and_data, [postpone]}; -handshake(info, Msg, StateData) -> - handle_info(Msg, ?FUNCTION_NAME, StateData). +handshake(info, tick, _) -> + %% Ignore - data is sent anyway during handshake + consume_ticks(), + keep_state_and_data; +handshake(info, {send, _, _, _}, _) -> + %% Testing only, OTP distribution test suites... + {keep_state_and_data, [postpone]}; +?HANDLE_COMMON. %%-------------------------------------------------------------------- -spec death_row(gen_statem:event_type(), @@ -361,17 +371,37 @@ code_change(_OldVsn, State, Data, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -handle_call(From, {set_opts, Opts}, StateName, #data{socket_options = SockOpts} = StateData) -> - {next_state, StateName, StateData#data{socket_options = set_opts(SockOpts, Opts)}, [{reply, From, ok}]}. - -handle_info({'DOWN', Monitor, _, _, Reason}, _, - #data{connection_monitor = Monitor, - dist_handle = Handle} = StateData) when Handle =/= undefined-> - {next_state, death_row, StateData, [{state_timeout, 5000, Reason}]}; -handle_info({'DOWN', Monitor, _, _, _}, _, - #data{connection_monitor = Monitor} = StateData) -> + +handle_set_opts( + From, Opts, #data{socket_options = SockOpts} = StateData) -> + {keep_state, StateData#data{socket_options = set_opts(SockOpts, Opts)}, + [{reply, From, ok}]}. + +handle_common( + {call, From}, {set_opts, Opts}, + #data{socket_options = SockOpts} = StateData) -> + {keep_state, StateData#data{socket_options = set_opts(SockOpts, Opts)}, + [{reply, From, ok}]}; +handle_common( + info, {'DOWN', Monitor, _, _, Reason}, + #data{connection_monitor = Monitor, + dist_handle = Handle} = StateData) when Handle =/= undefined -> + {next_state, death_row, StateData, + [{state_timeout, 5000, Reason}]}; +handle_common( + info, {'DOWN', Monitor, _, _, _}, + #data{connection_monitor = Monitor} = StateData) -> {stop, normal, StateData}; -handle_info(_,_,_) -> +handle_common(info, Msg, _) -> + Report = + io_lib:format("TLS sender: Got unexpected info: ~p ~n", [Msg]), + error_logger:info_report(Report), + keep_state_and_data; +handle_common(Type, Msg, _) -> + Report = + io_lib:format( + "TLS sender: Got unexpected event: ~p ~n", [{Type,Msg}]), + error_logger:error_report(Report), keep_state_and_data. send_tls_alert(Alert, #data{negotiated_version = Version, @@ -458,9 +488,9 @@ call(FsmPid, Event) -> {error, closed} end. -%%---------------Erlang distribution -------------------------------------- +%%-------------- Erlang distribution helpers ------------------------------ -dist_data(DHandle, Packet) -> +dist_data(DHandle) -> case erlang:dist_ctrl_get_data(DHandle) of none -> erlang:dist_ctrl_get_data_notification(DHandle), @@ -471,9 +501,11 @@ dist_data(DHandle, Packet) -> %% smaller than 4 GB, and the distribution will %% therefore always have to use {packet,4} Len = iolist_size(Data), - [<<Len:32>>,Data|dist_data(DHandle, Packet)] + [<<Len:32>>,Data|dist_data(DHandle)] end. + +%% Empty the inbox from distribution ticks - do not let them accumulate consume_ticks() -> receive tick -> consume_ticks() |