aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaimo Niskanen <[email protected]>2017-09-21 16:18:37 +0200
committerRaimo Niskanen <[email protected]>2017-09-26 17:15:19 +0200
commit6e28a7909c665cc316d657dda02a2b8655ecc5da (patch)
tree123de045952f7bc11476d88134771c04e808e248
parent275da9e0e7f876ec7c9b9fe3405f1ca40fdbbd17 (diff)
downloadotp-6e28a7909c665cc316d657dda02a2b8655ecc5da.tar.gz
otp-6e28a7909c665cc316d657dda02a2b8655ecc5da.tar.bz2
otp-6e28a7909c665cc316d657dda02a2b8655ecc5da.zip
Remove ssl_tls_dist_ctrl process
-rw-r--r--lib/ssl/src/dtls_connection.erl1
-rw-r--r--lib/ssl/src/inet_tls_dist.erl329
-rw-r--r--lib/ssl/src/ssl_connection.erl242
-rw-r--r--lib/ssl/src/tls_connection.erl10
4 files changed, 454 insertions, 128 deletions
diff --git a/lib/ssl/src/dtls_connection.erl b/lib/ssl/src/dtls_connection.erl
index ff3e69bae5..56ed232b2d 100644
--- a/lib/ssl/src/dtls_connection.erl
+++ b/lib/ssl/src/dtls_connection.erl
@@ -541,6 +541,7 @@ handle_info(new_cookie_secret, StateName,
previous_cookie_secret => Secret}}};
handle_info(Msg, StateName, State) ->
ssl_connection:handle_info(Msg, StateName, State).
+%%% ssl_connection:StateName(info, Msg, State, ?MODULE).
handle_call(Event, From, StateName, State) ->
diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl
index ef2a608b3c..72cf73e79c 100644
--- a/lib/ssl/src/inet_tls_dist.erl
+++ b/lib/ssl/src/inet_tls_dist.erl
@@ -31,18 +31,33 @@
-export([split_node/1, nodelay/0]).
+-export([dbg/0]). % Debug
+
-include_lib("kernel/include/net_address.hrl").
-include_lib("kernel/include/dist.hrl").
-include_lib("kernel/include/dist_util.hrl").
-%% -undef(trace).
-%% -define(trace(Fmt,Args),
-%% erlang:display(
-%% [erlang:convert_time_unit(
-%% erlang:monotonic_time()
-%% - erlang:system_info(start_time), native, microsecond),
-%% node(),
-%% lists:flatten(io_lib:format(Fmt, Args))])).
+-include("ssl_api.hrl").
+
+%%%-undef(trace).
+%%%%%-define(trace, true).
+%%%-ifdef(trace).
+%%%trace(Module, FunctionName, Line, Info, Value) ->
+%%% erlang:display(
+%%% [{erlang:convert_time_unit(
+%%% erlang:monotonic_time()
+%%% - erlang:system_info(start_time), native, microsecond),
+%%% node(), self()},
+%%% {Module, FunctionName, Line}, Info, Value]),
+%%% Value.
+%%%-else.
+%%%trace(_Module, _FunctionName, _Line, _Info, Value) -> Value.
+%%%-endif.
+%%%-undef(trace).
+%%%-define(
+%%% trace(Info, Body),
+%%% trace(?MODULE, ?FUNCTION_NAME, ?LINE, (Info), begin Body end)).
+trace(Term) -> Term.
%% -------------------------------------------------------------------------
@@ -76,6 +91,130 @@ is_node_name(Node) ->
%% -------------------------------------------------------------------------
+hs_data_common(#sslsocket{pid = DistCtrl} = SslSocket) ->
+ #hs_data{
+ f_send =
+ fun (Ctrl, Packet) when Ctrl == DistCtrl ->
+ f_send(SslSocket, Packet)
+ end,
+ f_recv =
+ fun (Ctrl, Length, Timeout) when Ctrl == DistCtrl ->
+ f_recv(SslSocket, Length, Timeout)
+ end,
+ f_setopts_pre_nodeup =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ f_setopts_pre_nodeup(SslSocket)
+ end,
+ f_setopts_post_nodeup =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+%%% sys:trace(Ctrl, true),
+ f_setopts_post_nodeup(SslSocket)
+ end,
+ f_getll =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ f_getll(DistCtrl)
+ end,
+ f_address =
+ fun (Ctrl, Node) when Ctrl == DistCtrl ->
+ f_address(SslSocket, Node)
+ end,
+ mf_tick =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ mf_tick(DistCtrl)
+ end,
+ mf_getstat =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ mf_getstat(SslSocket)
+ end,
+ mf_setopts =
+ fun (Ctrl, Opts) when Ctrl == DistCtrl ->
+ mf_setopts(SslSocket, Opts)
+ end,
+ mf_getopts =
+ fun (Ctrl, Opts) when Ctrl == DistCtrl ->
+ mf_getopts(SslSocket, Opts)
+ end,
+ f_handshake_complete =
+ fun (Ctrl, Node, DHandle) when Ctrl == DistCtrl ->
+ f_handshake_complete(DistCtrl, Node, DHandle)
+ end}.
+
+f_send(SslSocket, Packet) ->
+ ssl:send(SslSocket, Packet).
+
+f_recv(SslSocket, Length, Timeout) ->
+ case ssl:recv(SslSocket, Length, Timeout) of
+ {ok, Bin} when is_binary(Bin) ->
+ {ok, binary_to_list(Bin)};
+ Other ->
+ Other
+ end.
+
+f_setopts_pre_nodeup(_SslSocket) ->
+ ok.
+
+f_setopts_post_nodeup(_SslSocket) ->
+ ok.
+
+f_getll(DistCtrl) ->
+ {ok, DistCtrl}.
+
+f_address(SslSocket, Node) ->
+ case ssl:peername(SslSocket) of
+ {ok, Address} ->
+ case split_node(Node) of
+ false ->
+ {error, no_node};
+ Host ->
+ #net_address{
+ address=Address, host=Host,
+ protocol=tls, family=inet}
+ end
+ end.
+
+mf_tick(DistCtrl) ->
+ DistCtrl ! tick,
+ ok.
+
+mf_getstat(SslSocket) ->
+ case ssl:getstat(
+ SslSocket, [recv_cnt, send_cnt, send_pend]) of
+ {ok, Stat} ->
+ split_stat(Stat,0,0,0);
+ Error ->
+ Error
+ end.
+
+mf_setopts(SslSocket, Opts) ->
+ case setopts_filter(Opts) of
+ [] ->
+ ssl:setopts(SslSocket, Opts);
+ Opts1 ->
+ {error, {badopts,Opts1}}
+ end.
+
+mf_getopts(SslSocket, Opts) ->
+ ssl:getopts(SslSocket, Opts).
+
+f_handshake_complete(DistCtrl, Node, DHandle) ->
+ ssl_connection:handshake_complete(DistCtrl, Node, DHandle).
+
+
+setopts_filter(Opts) ->
+ [Opt || {K,_} = Opt <- Opts,
+ K =:= active orelse K =:= deliver orelse K =:= packet].
+
+split_stat([{recv_cnt, R}|Stat], _, W, P) ->
+ split_stat(Stat, R, W, P);
+split_stat([{send_cnt, W}|Stat], R, _, P) ->
+ split_stat(Stat, R, W, P);
+split_stat([{send_pend, P}|Stat], R, W, _) ->
+ split_stat(Stat, R, W, P);
+split_stat([], R, W, P) ->
+ {ok, R, W, P}.
+
+%% -------------------------------------------------------------------------
+
listen(Name) ->
gen_listen(inet_tcp, Name).
@@ -95,40 +234,33 @@ accept(Listen) ->
gen_accept(Driver, Listen) ->
Kernel = self(),
- spawn_opt(
- fun () ->
- accept_loop(Driver, Listen, Kernel)
- end,
- [link, {priority, max}]).
+ monitor_pid(
+ spawn_opt(
+ fun () ->
+ accept_loop(Driver, Listen, Kernel)
+ end,
+ [link, {priority, max}])).
accept_loop(Driver, Listen, Kernel) ->
- ?trace("~p~n",[{?MODULE, accept_loop, self()}]),
case Driver:accept(Listen) of
{ok, Socket} ->
Opts = get_ssl_options(server),
wait_for_code_server(),
case ssl:ssl_accept(
Socket, [{active, false}, {packet, 4}] ++ Opts) of
- {ok, SslSocket} ->
- DistCtrl = ssl_tls_dist_ctrl:start(SslSocket),
- ?trace("~p~n",
- [{?MODULE, accept_loop, accepted,
- SslSocket, DistCtrl, self()}]),
- ok = ssl:controlling_process(SslSocket, DistCtrl),
- Kernel !
- {accept, self(), DistCtrl, Driver:family(), tls},
+ {ok, #sslsocket{pid = DistCtrl} = SslSocket} ->
+ monitor_pid(DistCtrl),
+ trace(
+ Kernel !
+ {accept, self(), DistCtrl,
+ Driver:family(), tls}),
receive
{Kernel, controller, Pid} ->
- ?trace("~p~n",
- [{?MODULE, accept_loop,
- controller, self()}]),
- ssl_tls_dist_ctrl:set_supervisor(DistCtrl, Pid),
- Pid ! {self(), controller};
+ ok = ssl:controlling_process(SslSocket, Pid),
+ trace(
+ Pid ! {self(), controller});
{Kernel, unsupported_protocol} ->
- ?trace("~p~n",
- [{?MODULE, accept_loop,
- unsupported_protocol, self()}]),
- exit(unsupported_protocol)
+ exit(trace(unsupported_protocol))
end,
accept_loop(Driver, Listen, Kernel);
{error, {options, _}} = Error ->
@@ -137,12 +269,14 @@ accept_loop(Driver, Listen, Kernel) ->
error_logger:error_msg(
"Cannot accept TLS distribution connection: ~s~n",
[ssl:format_error(Error)]),
- gen_tcp:close(Socket);
- _ ->
- gen_tcp:close(Socket)
+ _ = trace(Error),
+ gen_tcp:close(Socket);
+ Other ->
+ _ = trace(Other),
+ gen_tcp:close(Socket)
end;
Error ->
- exit(Error)
+ exit(trace(Error))
end,
accept_loop(Driver, Listen, Kernel).
@@ -184,21 +318,23 @@ accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
gen_accept_connection(
Driver, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
Kernel = self(),
- spawn_opt(
- fun() ->
- do_accept(
- Driver, Kernel, AcceptPid, DistCtrl,
- MyNode, Allowed, SetupTime)
- end,
- [link, {priority, max}]).
+ monitor_pid(
+ spawn_opt(
+ fun() ->
+ do_accept(
+ Driver, Kernel, AcceptPid, DistCtrl,
+ MyNode, Allowed, SetupTime)
+ end,
+ [link, {priority, max}])).
do_accept(Driver, Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
+ SslSocket = ssl_connection:get_sslsocket(DistCtrl),
receive
{AcceptPid, controller} ->
Timer = dist_util:start_timer(SetupTime),
- case check_ip(Driver, DistCtrl) of
+ case check_ip(Driver, SslSocket) of
true ->
- HSData0 = ssl_tls_dist_ctrl:hs_data_common(DistCtrl),
+ HSData0 = hs_data_common(SslSocket),
HSData =
HSData0#hs_data{
kernel_pid = Kernel,
@@ -207,12 +343,12 @@ do_accept(Driver, Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
timer = Timer,
this_flags = 0,
allowed = Allowed},
- dist_util:handshake_other_started(HSData);
+ dist_util:handshake_other_started(trace(HSData));
{false,IP} ->
error_logger:error_msg(
"** Connection attempt from "
"disallowed IP ~w ** ~n", [IP]),
- ?shutdown(no_node)
+ ?shutdown(trace(no_node))
end
end.
@@ -223,42 +359,33 @@ setup(Node, Type, MyNode, LongOrShortNames, SetupTime) ->
gen_setup(Driver, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
Kernel = self(),
- spawn_opt(
- fun() ->
- do_setup(
- Driver, Kernel, Node, Type,
- MyNode, LongOrShortNames, SetupTime)
- end,
- [link, {priority, max}]).
+ monitor_pid(
+ spawn_opt(
+ fun() ->
+ do_setup(
+ Driver, Kernel, Node, Type,
+ MyNode, LongOrShortNames, SetupTime)
+ end,
+ [link, {priority, max}])).
do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
[Name, Address] = splitnode(Driver, Node, LongOrShortNames),
case Driver:getaddr(Address) of
{ok, Ip} ->
- Timer = dist_util:start_timer(SetupTime),
+ Timer = trace(dist_util:start_timer(SetupTime)),
ErlEpmd = net_kernel:epmd_module(),
case ErlEpmd:port_please(Name, Ip) of
{port, TcpPort, Version} ->
- ?trace("port_please(~p) -> version ~p~n",
- [Node,Version]),
- Opts = connect_options(get_ssl_options(client)),
+ Opts = trace(connect_options(get_ssl_options(client))),
dist_util:reset_timer(Timer),
case ssl:connect(
Ip, TcpPort,
[binary, {active, false}, {packet, 4},
Driver:family(), nodelay()] ++ Opts) of
- {ok, SslSocket} ->
- ?trace("~p~n",
- [{?MODULE, do_setup,
- ssl_socket, SslSocket}]),
- DistCtrl = ssl_tls_dist_ctrl:start(SslSocket),
- ssl_tls_dist_ctrl:set_supervisor(
- DistCtrl, self()),
- ok =
- ssl:controlling_process(
- SslSocket, DistCtrl),
- HSData0 =
- ssl_tls_dist_ctrl:hs_data_common(DistCtrl),
+ {ok, #sslsocket{pid = DistCtrl} = SslSocket} ->
+ monitor_pid(DistCtrl),
+ ok = ssl:controlling_process(SslSocket, self()),
+ HSData0 = hs_data_common(SslSocket),
HSData =
HSData0#hs_data{
kernel_pid = Kernel,
@@ -269,44 +396,37 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
this_flags = 0,
other_version = Version,
request_type = Type},
- ?trace("~p~n",
- [{?MODULE, do_setup,
- handshake_we_started, HSData}]),
- dist_util:handshake_we_started(HSData);
+ dist_util:handshake_we_started(trace(HSData));
Other ->
%% Other Node may have closed since
%% port_please !
- ?trace("other node (~p) "
- "closed since port_please.~n",
- [Node]),
- ?shutdown2(Node,
- {shutdown, {connect_failed, Other}})
+ ?shutdown2(
+ Node,
+ trace({shutdown, {connect_failed, Other}}))
end;
Other ->
- ?trace("port_please (~p) "
- "failed.~n", [Node]),
- ?shutdown2(Node, {shutdown, {port_please_failed, Other}})
+ ?shutdown2(
+ Node,
+ trace({shutdown, {port_please_failed, Other}}))
end;
Other ->
- ?trace("~w:getaddr(~p) "
- "failed (~p).~n", [Driver, Address, Other]),
- ?shutdown2(Node, {shutdown, {getaddr_failed, Other}})
+ ?shutdown2(Node, trace({shutdown, {getaddr_failed, Other}}))
end.
close(Socket) ->
gen_close(inet, Socket).
gen_close(Driver, Socket) ->
- Driver:close(Socket).
+ trace(Driver:close(Socket)).
%% ------------------------------------------------------------
%% Do only accept new connection attempts from nodes at our
%% own LAN, if the check_ip environment parameter is true.
%% ------------------------------------------------------------
-check_ip(Driver, DistCtrl) ->
+check_ip(Driver, SslSocket) ->
case application:get_env(check_ip) of
{ok, true} ->
- case get_ifs(DistCtrl) of
+ case get_ifs(SslSocket) of
{ok, IFs, IP} ->
check_ip(Driver, IFs, IP);
_ ->
@@ -324,8 +444,7 @@ check_ip(Driver, [{OwnIP, _, Netmask}|IFs], PeerIP) ->
check_ip(_Driver, [], PeerIP) ->
{false, PeerIP}.
-get_ifs(DistCtrl) ->
- Socket = ssl_tls_dist_ctrl:get_socket(DistCtrl),
+get_ifs(#sslsocket{fd = {gen_tcp, Socket, _}}) ->
case inet:peername(Socket) of
{ok, {IP, _}} ->
%% XXX this is seriously broken for IPv6
@@ -513,3 +632,35 @@ verify_fun(Value) ->
_ ->
error(malformed_ssl_dist_opt, [Value])
end.
+
+%% -------------------------------------------------------------------------
+
+%% Keep an eye on distribution Pid:s we know of
+monitor_pid(Pid) ->
+ spawn(
+ fun () ->
+ MRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MRef, _, _, normal} ->
+ error_logger:error_report(
+ [dist_proc_died,
+ {reason, normal},
+ {pid, Pid}]);
+ {'DOWN', MRef, _, _, Reason} ->
+ error_logger:info_report(
+ [dist_proc_died,
+ {reason, Reason},
+ {pid, Pid}])
+ end
+ end),
+ Pid.
+
+dbg() ->
+ dbg:stop(),
+ dbg:tracer(),
+ dbg:p(all, c),
+ dbg:tpl(?MODULE, cx),
+ dbg:tpl(erlang, dist_ctrl_get_data_notification, cx),
+ dbg:tpl(erlang, dist_ctrl_get_data, cx),
+ dbg:tpl(erlang, dist_ctrl_put_data, cx),
+ ok.
diff --git a/lib/ssl/src/ssl_connection.erl b/lib/ssl/src/ssl_connection.erl
index 5d48325719..008e7e1704 100644
--- a/lib/ssl/src/ssl_connection.erl
+++ b/lib/ssl/src/ssl_connection.erl
@@ -44,12 +44,14 @@
-export([send/2, recv/3, close/2, shutdown/2,
new_user/2, get_opts/2, set_opts/2,
peer_certificate/1, renegotiation/1, negotiated_protocol/1, prf/5,
+ get_sslsocket/1, handshake_complete/3,
connection_information/2, handle_common_event/5
]).
%% General gen_statem state functions with extra callback argument
%% to determine if it is an SSL/TLS or DTLS gen_statem machine
--export([init/4, hello/4, abbreviated/4, certify/4, cipher/4, connection/4, downgrade/4]).
+-export([init/4, hello/4, abbreviated/4, certify/4, cipher/4,
+ connection/4, death_row/4, downgrade/4]).
%% gen_statem callbacks
-export([terminate/3, format_status/2]).
@@ -262,6 +264,13 @@ peer_certificate(ConnectionPid) ->
renegotiation(ConnectionPid) ->
call(ConnectionPid, renegotiate).
+
+get_sslsocket(ConnectionPid) ->
+ call(ConnectionPid, get_sslsocket).
+
+handshake_complete(ConnectionPid, Node, DHandle) ->
+ call(ConnectionPid, {handshake_complete, Node, DHandle}).
+
%%--------------------------------------------------------------------
-spec prf(pid(), binary() | 'master_secret', binary(),
[binary() | ssl:prf_random()], non_neg_integer()) ->
@@ -359,6 +368,12 @@ init({call, From}, {start, {Opts, EmOpts}, Timeout},
socket_options = SockOpts} = State0, Connection) ->
try
SslOpts = ssl:handle_options(Opts, OrigSSLOptions),
+ case SslOpts of
+ #ssl_options{erl_dist = true} ->
+ process_flag(priority, max);
+ _ ->
+ ok
+ end,
State = ssl_config(SslOpts, Role, State0),
init({call, From}, {start, Timeout},
State#state{ssl_options = SslOpts, socket_options = new_emulated(EmOpts, SockOpts)}, Connection)
@@ -748,7 +763,7 @@ cipher(Type, Msg, State, Connection) ->
#state{}, tls_connection | dtls_connection) ->
gen_statem:state_function_result().
%%--------------------------------------------------------------------
-connection({call, From}, {application_data, Data},
+connection({call, {FromPid, _} = From}, {application_data, Data},
#state{protocol_cb = Connection} = State, Connection) ->
%% We should look into having a worker process to do this to
%% parallize send and receive decoding and not block the receiver
@@ -756,7 +771,13 @@ connection({call, From}, {application_data, Data},
try
write_application_data(Data, From, State)
catch throw:Error ->
- hibernate_after(connection, State, [{reply, From, Error}])
+ case self() of
+ FromPid ->
+ {stop, {shutdown, Error}};
+ _ ->
+ hibernate_after(
+ connection, State, [{reply, From, Error}])
+ end
end;
connection({call, RecvFrom}, {recv, N, Timeout},
#state{protocol_cb = Connection, socket_options =
@@ -784,6 +805,28 @@ connection({call, From}, negotiated_protocol,
#state{negotiated_protocol = SelectedProtocol} = State, _) ->
hibernate_after(connection, State,
[{reply, From, {ok, SelectedProtocol}}]);
+connection(
+ {call, From}, {handshake_complete, _Node, DHandle},
+ #state{
+ ssl_options = #ssl_options{erl_dist = true},
+ socket_options = SockOpts,
+ protocol_specific = ProtocolSpecific} = State,
+ Connection) ->
+ %% From now on we execute on normal priority
+ process_flag(priority, normal),
+ try erlang:dist_ctrl_get_data_notification(DHandle) of
+ _ ->
+ NewState =
+ State#state{
+ socket_options =
+ SockOpts#socket_options{active = true},
+ protocol_specific =
+ ProtocolSpecific#{d_handle => DHandle}},
+ {Record, NewerState} = Connection:next_record_if_active(NewState),
+ Connection:next_event(connection, Record, NewerState, [{reply, From, ok}])
+ catch _:Reason ->
+ death_row(State, Reason)
+ end;
connection({call, From}, Msg, State, Connection) ->
handle_call(Msg, From, connection, State, Connection);
connection(info, Msg, State, _) ->
@@ -794,6 +837,30 @@ connection(Type, Msg, State, Connection) ->
handle_common_event(Type, Msg, connection, State, Connection).
%%--------------------------------------------------------------------
+-spec death_row(gen_statem:event_type(), term(),
+ #state{}, tls_connection | dtls_connection) ->
+ gen_statem:state_function_result().
+%%--------------------------------------------------------------------
+%% We just wait for the owner to die which triggers the monitor,
+%% or the socket may die too
+death_row(
+ info, {'DOWN', MonitorRef, _, _, Reason},
+ #state{user_application={MonitorRef,_Pid} = State},
+ _) ->
+ {stop, {shutdown, Reason}, State};
+death_row(
+ info, {'EXIT', Socket, Reason}, #state{socket = Socket} = State, _) ->
+ {stop, {shutdown, Reason}, State};
+death_row(state_timeout, Reason, _State, _Connection) ->
+ {stop, {shutdown,Reason}};
+death_row(_Type, _Msg, State, _Connection) ->
+ {keep_state, State, [postpone]}.
+
+%% State entry function
+death_row(State, Reason) ->
+ {next_state, death_row, State, [{state_timeout, 5000, Reason}]}.
+
+%%--------------------------------------------------------------------
-spec downgrade(gen_statem:event_type(), term(),
#state{}, tls_connection | dtls_connection) ->
gen_statem:state_function_result().
@@ -804,10 +871,10 @@ downgrade(internal, #alert{description = ?CLOSE_NOTIFY},
tls_socket:setopts(Transport, Socket, [{active, false}, {packet, 0}, {mode, binary}]),
Transport:controlling_process(Socket, Pid),
gen_statem:reply(From, {ok, Socket}),
- {stop, normal, State};
+ stop_normal(State);
downgrade(timeout, downgrade, #state{downgrade = {_, From}} = State, _) ->
gen_statem:reply(From, {error, timeout}),
- {stop, normal, State};
+ stop_normal(State);
downgrade(Type, Event, State, Connection) ->
handle_common_event(Type, Event, downgrade, State, Connection).
@@ -877,7 +944,7 @@ handle_call({shutdown, How0}, From, _,
#state{transport_cb = Transport,
negotiated_version = Version,
connection_states = ConnectionStates,
- socket = Socket}, Connection) ->
+ socket = Socket} = State, Connection) ->
case How0 of
How when How == write; How == both ->
Alert = ?ALERT_REC(?WARNING, ?CLOSE_NOTIFY),
@@ -893,7 +960,7 @@ handle_call({shutdown, How0}, From, _,
{keep_state_and_data, [{reply, From, ok}]};
Error ->
gen_statem:reply(From, {error, Error}),
- {stop, normal}
+ stop_normal(State)
end;
handle_call({recv, _N, _Timeout}, From, _,
#state{socket_options =
@@ -928,6 +995,15 @@ handle_call({set_opts, Opts0}, From, StateName,
handle_call(renegotiate, From, StateName, _, _) when StateName =/= connection ->
{keep_state_and_data, [{reply, From, {error, already_renegotiating}}]};
+
+handle_call(
+ get_sslsocket, From, _StateName,
+ #state{transport_cb = Transport, socket = Socket, tracker = Tracker},
+ Connection) ->
+ SslSocket =
+ Connection:socket(self(), Transport, Socket, Connection, Tracker),
+ {keep_state_and_data, [{reply, From, SslSocket}]};
+
handle_call({prf, Secret, Label, Seed, WantedLength}, From, _,
#state{connection_states = ConnectionStates,
negotiated_version = Version}, _) ->
@@ -964,18 +1040,19 @@ handle_info({ErrorTag, Socket, econnaborted}, StateName,
tracker = Tracker} = State) when StateName =/= connection ->
alert_user(Transport, Tracker,Socket,
StartFrom, ?ALERT_REC(?FATAL, ?CLOSE_NOTIFY), Role, Connection),
- {stop, normal, State};
+ stop_normal(State);
handle_info({ErrorTag, Socket, Reason}, StateName, #state{socket = Socket,
error_tag = ErrorTag} = State) ->
Report = io_lib:format("SSL: Socket error: ~p ~n", [Reason]),
- error_logger:info_report(Report),
+ error_logger:error_report(Report),
handle_normal_shutdown(?ALERT_REC(?FATAL, ?CLOSE_NOTIFY), StateName, State),
- {stop, normal, State};
+ stop_normal(State);
-handle_info({'DOWN', MonitorRef, _, _, _}, _,
- State = #state{user_application={MonitorRef,_Pid}}) ->
- {stop, normal, State};
+handle_info(
+ {'DOWN', MonitorRef, _, _, _}, _,
+ #state{user_application={MonitorRef,_Pid}} = State) ->
+ stop_normal(State);
%%% So that terminate will be run when supervisor issues shutdown
handle_info({'EXIT', _Sup, shutdown}, _StateName, State) ->
@@ -983,6 +1060,8 @@ handle_info({'EXIT', _Sup, shutdown}, _StateName, State) ->
handle_info({'EXIT', Socket, normal}, _StateName, #state{socket = Socket} = State) ->
%% Handle as transport close"
{stop, {shutdown, transport_closed}, State};
+handle_info({'EXIT', Socket, Reason}, _StateName, #state{socket = Socket} = State) ->
+ {stop, {shutdown, Reason}, State};
handle_info(allow_renegotiate, StateName, State) ->
{next_state, StateName, State#state{allow_renegotiate = true}};
@@ -1001,11 +1080,62 @@ handle_info({cancel_start_or_recv, RecvFrom}, StateName,
handle_info({cancel_start_or_recv, _RecvFrom}, StateName, State) ->
{next_state, StateName, State#state{timer = undefined}};
+handle_info(
+ dist_data = Msg,
+ connection,
+ #state{
+ ssl_options = #ssl_options{erl_dist = true},
+ protocol_specific = #{d_handle := DHandle}} = State) ->
+ eat_msgs(Msg),
+ try send_dist_data(connection, State, DHandle, [])
+ catch _:Reason ->
+ death_row(State, Reason)
+ end;
+handle_info(
+ tick = Msg,
+ connection,
+ #state{
+ ssl_options = #ssl_options{erl_dist = true},
+ protocol_specific = #{d_handle := _}}) ->
+ eat_msgs(Msg),
+ {keep_state_and_data,
+ [{next_event, {call, {self(), undefined}}, {application_data, <<>>}}]};
+
handle_info(Msg, StateName, #state{socket = Socket, error_tag = Tag} = State) ->
Report = io_lib:format("SSL: Got unexpected info: ~p ~n", [{Msg, Tag, Socket}]),
error_logger:info_report(Report),
{next_state, StateName, State}.
+send_dist_data(StateName, State, DHandle, Acc) ->
+ case erlang:dist_ctrl_get_data(DHandle) of
+ none ->
+ erlang:dist_ctrl_get_data_notification(DHandle),
+ hibernate_after(StateName, State, lists:reverse(Acc));
+ Data ->
+ send_dist_data(
+ StateName, State, DHandle,
+ [{next_event, {call, {self(), undefined}}, {application_data, Data}}
+ |Acc])
+ end.
+
+%% Overload mitigation
+eat_msgs(Msg) ->
+ receive Msg -> eat_msgs(Msg)
+ after 0 -> ok
+ end.
+
+%% When running with erl_dist the stop reason 'normal'
+%% would be too silent and prevent cleanup
+stop_normal(State) ->
+ Reason =
+ case State of
+ #state{ssl_options = #ssl_options{erl_dist = true}} ->
+ {shutdown, normal};
+ _ ->
+ normal
+ end,
+ {stop, Reason, State}.
+
%%--------------------------------------------------------------------
%% gen_statem callbacks
%%--------------------------------------------------------------------
@@ -1080,7 +1210,7 @@ format_status(terminate, [_, StateName, State]) ->
%%--------------------------------------------------------------------
%%%
%%--------------------------------------------------------------------
-write_application_data(Data0, From,
+write_application_data(Data0, {FromPid, _} = From,
#state{socket = Socket,
negotiated_version = Version,
protocol_cb = Connection,
@@ -1095,10 +1225,19 @@ write_application_data(Data0, From,
Connection:renegotiate(State#state{renegotiation = {true, internal}},
[{next_event, {call, From}, {application_data, Data0}}]);
false ->
- {Msgs, ConnectionStates} = Connection:encode_data(Data, Version, ConnectionStates0),
- Result = Connection:send(Transport, Socket, Msgs),
- ssl_connection:hibernate_after(connection, State#state{connection_states = ConnectionStates},
- [{reply, From, Result}])
+ {Msgs, ConnectionStates} =
+ Connection:encode_data(Data, Version, ConnectionStates0),
+ NewState = State#state{connection_states = ConnectionStates},
+ case Connection:send(Transport, Socket, Msgs) of
+ ok when FromPid =:= self() ->
+ hibernate_after(connection, NewState, []);
+ Error when FromPid =:= self() ->
+ {stop, {shutdown, Error}, NewState};
+ ok ->
+ hibernate_after(connection, NewState, [{reply, From, ok}]);
+ Result ->
+ hibernate_after(connection, NewState, [{reply, From, Result}])
+ end
end.
read_application_data(Data, #state{user_application = {_Mon, Pid},
@@ -1118,30 +1257,57 @@ read_application_data(Data, #state{user_application = {_Mon, Pid},
end,
case get_data(SOpts, BytesToRead, Buffer1) of
{ok, ClientData, Buffer} -> % Send data
- SocketOpt = deliver_app_data(Transport, Socket, SOpts,
- ClientData, Pid, RecvFrom, Tracker, Connection),
- cancel_timer(Timer),
- State = State0#state{user_data_buffer = Buffer,
- start_or_recv_from = undefined,
- timer = undefined,
- bytes_to_read = undefined,
- socket_options = SocketOpt
- },
- if
- SocketOpt#socket_options.active =:= false; Buffer =:= <<>> ->
- %% Passive mode, wait for active once or recv
- %% Active and empty, get more data
- Connection:next_record_if_active(State);
- true -> %% We have more data
- read_application_data(<<>>, State)
- end;
+ case State0 of
+ #state{
+ ssl_options = #ssl_options{erl_dist = true},
+ protocol_specific = #{d_handle := DHandle}} ->
+ State =
+ State0#state{
+ user_data_buffer = Buffer,
+ bytes_to_read = undefined},
+ try erlang:dist_ctrl_put_data(DHandle, ClientData) of
+ _
+ when SOpts#socket_options.active =:= false;
+ Buffer =:= <<>> ->
+ %% Passive mode, wait for active once or recv
+ %% Active and empty, get more data
+ Connection:next_record_if_active(State);
+ _ -> %% We have more data
+ read_application_data(<<>>, State)
+ catch _:Reason ->
+ death_row(State, Reason)
+ end;
+ _ ->
+ SocketOpt =
+ deliver_app_data(
+ Transport, Socket, SOpts,
+ ClientData, Pid, RecvFrom, Tracker, Connection),
+ cancel_timer(Timer),
+ State =
+ State0#state{
+ user_data_buffer = Buffer,
+ start_or_recv_from = undefined,
+ timer = undefined,
+ bytes_to_read = undefined,
+ socket_options = SocketOpt
+ },
+ if
+ SocketOpt#socket_options.active =:= false;
+ Buffer =:= <<>> ->
+ %% Passive mode, wait for active once or recv
+ %% Active and empty, get more data
+ Connection:next_record_if_active(State);
+ true -> %% We have more data
+ read_application_data(<<>>, State)
+ end
+ end;
{more, Buffer} -> % no reply, we need more data
Connection:next_record(State0#state{user_data_buffer = Buffer});
{passive, Buffer} ->
Connection:next_record_if_active(State0#state{user_data_buffer = Buffer});
{error,_Reason} -> %% Invalid packet in packet mode
deliver_packet_error(Transport, Socket, SOpts, Buffer1, Pid, RecvFrom, Tracker, Connection),
- {stop, normal, State0}
+ stop_normal(State0)
end.
%%--------------------------------------------------------------------
%%%
@@ -1151,12 +1317,12 @@ handle_alert(#alert{level = ?FATAL} = Alert, StateName,
protocol_cb = Connection,
ssl_options = SslOpts, start_or_recv_from = From, host = Host,
port = Port, session = Session, user_application = {_Mon, Pid},
- role = Role, socket_options = Opts, tracker = Tracker}) ->
+ role = Role, socket_options = Opts, tracker = Tracker} = State) ->
invalidate_session(Role, Host, Port, Session),
log_alert(SslOpts#ssl_options.log_alert, Role, Connection:protocol_name(),
StateName, Alert#alert{role = opposite_role(Role)}),
alert_user(Transport, Tracker, Socket, StateName, Opts, Pid, From, Alert, Role, Connection),
- {stop, normal};
+ stop_normal(State);
handle_alert(#alert{level = ?WARNING, description = ?CLOSE_NOTIFY} = Alert,
StateName, State) ->
diff --git a/lib/ssl/src/tls_connection.erl b/lib/ssl/src/tls_connection.erl
index e3ffbea3d3..1c506fe951 100644
--- a/lib/ssl/src/tls_connection.erl
+++ b/lib/ssl/src/tls_connection.erl
@@ -65,7 +65,7 @@
%% gen_statem state functions
-export([init/3, error/3, downgrade/3, %% Initiation and take down states
hello/3, certify/3, cipher/3, abbreviated/3, %% Handshake states
- connection/3]).
+ connection/3, death_row/3]).
%% gen_statem callbacks
-export([callback_mode/0, terminate/3, code_change/4, format_status/2]).
@@ -378,6 +378,13 @@ connection(Type, Event, State) ->
ssl_connection:connection(Type, Event, State, ?MODULE).
%%--------------------------------------------------------------------
+-spec death_row(gen_statem:event_type(), term(), #state{}) ->
+ gen_statem:state_function_result().
+%%--------------------------------------------------------------------
+death_row(Type, Event, State) ->
+ ssl_connection:death_row(Type, Event, State, ?MODULE).
+
+%%--------------------------------------------------------------------
-spec downgrade(gen_statem:event_type(), term(), #state{}) ->
gen_statem:state_function_result().
%%--------------------------------------------------------------------
@@ -435,6 +442,7 @@ handle_info({CloseTag, Socket}, StateName,
end;
handle_info(Msg, StateName, State) ->
ssl_connection:handle_info(Msg, StateName, State).
+%%% ssl_connection:StateName(info, Msg, State, ?MODULE).
handle_common_event(internal, #alert{} = Alert, StateName,
#state{negotiated_version = Version} = State) ->