aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaimo Niskanen <[email protected]>2017-09-08 17:01:45 +0200
committerRaimo Niskanen <[email protected]>2017-09-15 11:54:00 +0200
commitc2180b0e570baf90cbf567381212bb223fc73d37 (patch)
tree936fc0ef40b7248372c24db8cf731b0bdc0f2e94
parentd6be9f66c293e225c24b43d83640a4e857fa5269 (diff)
downloadotp-c2180b0e570baf90cbf567381212bb223fc73d37.tar.gz
otp-c2180b0e570baf90cbf567381212bb223fc73d37.tar.bz2
otp-c2180b0e570baf90cbf567381212bb223fc73d37.zip
Rewrite dist ctrl from port to process
-rw-r--r--lib/ssl/src/Makefile4
-rw-r--r--lib/ssl/src/inet6_tls_dist.erl7
-rw-r--r--lib/ssl/src/inet_tls_dist.erl500
-rw-r--r--lib/ssl/src/ssl.app.src2
-rw-r--r--lib/ssl/src/ssl_dist_sup.erl14
-rw-r--r--lib/ssl/src/ssl_tls_dist_ctrl.erl380
6 files changed, 751 insertions, 156 deletions
diff --git a/lib/ssl/src/Makefile b/lib/ssl/src/Makefile
index 2e7df9792e..f6fe23b584 100644
--- a/lib/ssl/src/Makefile
+++ b/lib/ssl/src/Makefile
@@ -1,7 +1,7 @@
#
# %CopyrightBegin%
#
-# Copyright Ericsson AB 1999-2016. All Rights Reserved.
+# Copyright Ericsson AB 1999-2017. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -88,7 +88,7 @@ MODULES= \
ssl_v3 \
tls_v1 \
dtls_v1 \
- ssl_tls_dist_proxy
+ ssl_tls_dist_ctrl
INTERNAL_HRL_FILES = \
ssl_alert.hrl ssl_cipher.hrl \
diff --git a/lib/ssl/src/inet6_tls_dist.erl b/lib/ssl/src/inet6_tls_dist.erl
index ffd7296f93..96ce4d493a 100644
--- a/lib/ssl/src/inet6_tls_dist.erl
+++ b/lib/ssl/src/inet6_tls_dist.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2015. All Rights Reserved.
+%% Copyright Ericsson AB 2015-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -21,7 +21,8 @@
%%
-module(inet6_tls_dist).
--export([childspecs/0, listen/1, accept/1, accept_connection/5,
+-export([childspecs/0]).
+-export([listen/1, accept/1, accept_connection/5,
setup/5, close/1, select/1]).
childspecs() ->
@@ -43,4 +44,4 @@ setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
inet_tls_dist:gen_setup(inet6_tcp, Node, Type, MyNode, LongOrShortNames,SetupTime).
close(Socket) ->
- inet_tls_dist:close(Socket).
+ inet_tls_dist:gen_close(inet6_tcp, Socket).
diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl
index 0da4b3587f..47f400da6f 100644
--- a/lib/ssl/src/inet_tls_dist.erl
+++ b/lib/ssl/src/inet_tls_dist.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2011-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2011-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -21,17 +21,31 @@
%%
-module(inet_tls_dist).
--export([childspecs/0, listen/1, accept/1, accept_connection/5,
+-export([childspecs/0]).
+-export([listen/1, accept/1, accept_connection/5,
setup/5, close/1, select/1, is_node_name/1]).
%% Generalized dist API
-export([gen_listen/2, gen_accept/2, gen_accept_connection/6,
- gen_setup/6, gen_select/2]).
+ gen_setup/6, gen_close/2, gen_select/2]).
+
+-export([split_node/1, nodelay/0]).
-include_lib("kernel/include/net_address.hrl").
-include_lib("kernel/include/dist.hrl").
-include_lib("kernel/include/dist_util.hrl").
+%% -undef(trace).
+%% -define(trace(Fmt,Args),
+%% erlang:display(
+%% [erlang:convert_time_unit(
+%% erlang:monotonic_time()
+%% - erlang:system_info(start_time), native, microsecond),
+%% node(),
+%% lists:flatten(io_lib:format(Fmt, Args))])).
+
+%% -------------------------------------------------------------------------
+
childspecs() ->
{ok, [{ssl_dist_sup,{ssl_dist_sup, start_link, []},
permanent, infinity, supervisor, [ssl_dist_sup]}]}.
@@ -40,51 +54,186 @@ select(Node) ->
gen_select(inet_tcp, Node).
gen_select(Driver, Node) ->
- case split_node(atom_to_list(Node), $@, []) of
- [_, Host] ->
- case inet:getaddr(Host, Driver:family()) of
+ case split_node(Node) of
+ false ->
+ false;
+ Host ->
+ case Driver:getaddr(Host) of
{ok, _} -> true;
_ -> false
- end;
- _ ->
- false
+ end
end.
-is_node_name(Node) when is_atom(Node) ->
- select(Node);
-is_node_name(_) ->
- false.
+%% -------------------------------------------------------------------------
+
+is_node_name(Node) ->
+ case split_node(Node) of
+ false ->
+ false;
+ _Host ->
+ true
+ end.
+
+%% -------------------------------------------------------------------------
listen(Name) ->
gen_listen(inet_tcp, Name).
gen_listen(Driver, Name) ->
- ssl_tls_dist_proxy:listen(Driver, Name).
+ case inet_tcp_dist:gen_listen(Driver, Name) of
+ {ok, {Socket, Address, Creation}} ->
+ inet:setopts(Socket, [{packet, 4}]),
+ {ok, {Socket, Address#net_address{protocol=tls}, Creation}};
+ Other ->
+ Other
+ end.
+
+%% -------------------------------------------------------------------------
accept(Listen) ->
gen_accept(inet_tcp, Listen).
gen_accept(Driver, Listen) ->
- ssl_tls_dist_proxy:accept(Driver, Listen).
+ Kernel = self(),
+ spawn_opt(
+ fun () ->
+ accept_loop(Driver, Listen, Kernel)
+ end,
+ [link, {priority, max}]).
+
+accept_loop(Driver, Listen, Kernel) ->
+ ?trace("~p~n",[{?MODULE, accept_loop, self()}]),
+ case Driver:accept(Listen) of
+ {ok, Socket} ->
+ Opts = get_ssl_options(server),
+ wait_for_code_server(),
+ case ssl:ssl_accept(
+ Socket, [{active, false}, {packet, 4}] ++ Opts) of
+ {ok, SslSocket} ->
+ DistCtrl = ssl_tls_dist_ctrl:start(SslSocket),
+ ?trace("~p~n",
+ [{?MODULE, accept_loop, accepted,
+ SslSocket, DistCtrl, self()}]),
+ ok = ssl:controlling_process(SslSocket, DistCtrl),
+ Kernel !
+ {accept, self(), DistCtrl, Driver:family(), tls},
+ receive
+ {Kernel, controller, Pid} ->
+ ?trace("~p~n",
+ [{?MODULE, accept_loop,
+ controller, self()}]),
+ ssl_tls_dist_ctrl:set_supervisor(DistCtrl, Pid),
+ Pid ! {self(), controller};
+ {Kernel, unsupported_protocol} ->
+ ?trace("~p~n",
+ [{?MODULE, accept_loop,
+ unsupported_protocol, self()}]),
+ exit(unsupported_protocol)
+ end,
+ accept_loop(Driver, Listen, Kernel);
+ {error, {options, _}} = Error ->
+ %% Bad options: that's probably our fault.
+ %% Let's log that.
+ error_logger:error_msg(
+ "Cannot accept TLS distribution connection: ~s~n",
+ [ssl:format_error(Error)]),
+ gen_tcp:close(Socket);
+ _ ->
+ gen_tcp:close(Socket)
+ end;
+ Error ->
+ exit(Error)
+ end,
+ accept_loop(Driver, Listen, Kernel).
+
+wait_for_code_server() ->
+ %% This is an ugly hack. Upgrading a socket to TLS requires the
+ %% crypto module to be loaded. Loading the crypto module triggers
+ %% its on_load function, which calls code:priv_dir/1 to find the
+ %% directory where its NIF library is. However, distribution is
+ %% started earlier than the code server, so the code server is not
+ %% necessarily started yet, and code:priv_dir/1 might fail because
+ %% of that, if we receive an incoming connection on the
+ %% distribution port early enough.
+ %%
+ %% If the on_load function of a module fails, the module is
+ %% unloaded, and the function call that triggered loading it fails
+ %% with 'undef', which is rather confusing.
+ %%
+ %% Thus, the ssl_tls_dist_proxy process will terminate, and be
+ %% restarted by ssl_dist_sup. However, it won't have any memory
+ %% of being asked by net_kernel to listen for incoming
+ %% connections. Hence, the node will believe that it's open for
+ %% distribution, but it actually isn't.
+ %%
+ %% So let's avoid that by waiting for the code server to start.
+ case whereis(code_server) of
+ undefined ->
+ timer:sleep(10),
+ wait_for_code_server();
+ Pid when is_pid(Pid) ->
+ ok
+ end.
+
+%% -------------------------------------------------------------------------
-accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
- gen_accept_connection(inet_tcp, AcceptPid, Socket, MyNode, Allowed, SetupTime).
+accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
+ gen_accept_connection(
+ inet_tcp, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime).
-gen_accept_connection(Driver, AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
+gen_accept_connection(
+ Driver, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
Kernel = self(),
- spawn_link(fun() -> do_accept(Driver, Kernel, AcceptPid, Socket,
- MyNode, Allowed, SetupTime) end).
+ spawn_opt(
+ fun() ->
+ do_accept(
+ Driver, Kernel, AcceptPid, DistCtrl,
+ MyNode, Allowed, SetupTime)
+ end,
+ [link, {priority, max}]).
+
+do_accept(Driver, Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
+ receive
+ {AcceptPid, controller} ->
+ Timer = dist_util:start_timer(SetupTime),
+ case check_ip(Driver, DistCtrl) of
+ true ->
+ HSData0 = ssl_tls_dist_ctrl:hs_data_common(DistCtrl),
+ HSData =
+ HSData0#hs_data{
+ kernel_pid = Kernel,
+ this_node = MyNode,
+ socket = DistCtrl,
+ timer = Timer,
+ this_flags = 0,
+ allowed = Allowed},
+ dist_util:handshake_other_started(HSData);
+ {false,IP} ->
+ error_logger:error_msg(
+ "** Connection attempt from "
+ "disallowed IP ~w ** ~n", [IP]),
+ ?shutdown(no_node)
+ end
+ end.
-setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
- gen_setup(inet_tcp, Node, Type, MyNode, LongOrShortNames,SetupTime).
-gen_setup(Driver, Node, Type, MyNode, LongOrShortNames,SetupTime) ->
+
+setup(Node, Type, MyNode, LongOrShortNames, SetupTime) ->
+ gen_setup(inet_tcp, Node, Type, MyNode, LongOrShortNames, SetupTime).
+
+gen_setup(Driver, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
Kernel = self(),
- spawn_opt(fun() -> do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) end, [link, {priority, max}]).
-
+ spawn_opt(
+ fun() ->
+ do_setup(
+ Driver, Kernel, Node, Type,
+ MyNode, LongOrShortNames, SetupTime)
+ end,
+ [link, {priority, max}]).
+
do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
[Name, Address] = splitnode(Driver, Node, LongOrShortNames),
- case inet:getaddr(Address, Driver:family()) of
+ case Driver:getaddr(Address) of
{ok, Ip} ->
Timer = dist_util:start_timer(SetupTime),
ErlEpmd = net_kernel:epmd_module(),
@@ -92,12 +241,37 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
{port, TcpPort, Version} ->
?trace("port_please(~p) -> version ~p~n",
[Node,Version]),
+ Opts = connect_options(get_ssl_options(client)),
dist_util:reset_timer(Timer),
- case ssl_tls_dist_proxy:connect(Driver, Ip, TcpPort) of
- {ok, Socket} ->
- HSData = connect_hs_data(Kernel, Node, MyNode, Socket,
- Timer, Version, Ip, TcpPort, Address,
- Type),
+ case ssl:connect(
+ Ip, TcpPort,
+ [binary, {active, false}, {packet, 4},
+ Driver:family(), nodelay()] ++ Opts) of
+ {ok, SslSocket} ->
+ ?trace("~p~n",
+ [{?MODULE, do_setup,
+ ssl_socket, SslSocket}]),
+ DistCtrl = ssl_tls_dist_ctrl:start(SslSocket),
+ ssl_tls_dist_ctrl:set_supervisor(
+ DistCtrl, self()),
+ ok =
+ ssl:controlling_process(
+ SslSocket, DistCtrl),
+ HSData0 =
+ ssl_tls_dist_ctrl:hs_data_common(DistCtrl),
+ HSData =
+ HSData0#hs_data{
+ kernel_pid = Kernel,
+ other_node = Node,
+ this_node = MyNode,
+ socket = DistCtrl,
+ timer = Timer,
+ this_flags = 0,
+ other_version = Version,
+ request_type = Type},
+ ?trace("~p~n",
+ [{?MODULE, do_setup,
+ handshake_we_started, HSData}]),
dist_util:handshake_we_started(HSData);
Other ->
%% Other Node may have closed since
@@ -105,7 +279,8 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
?trace("other node (~p) "
"closed since port_please.~n",
[Node]),
- ?shutdown2(Node, {shutdown, {connect_failed, Other}})
+ ?shutdown2(Node,
+ {shutdown, {connect_failed, Other}})
end;
Other ->
?trace("port_please (~p) "
@@ -113,38 +288,25 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
?shutdown2(Node, {shutdown, {port_please_failed, Other}})
end;
Other ->
- ?trace("inet_getaddr(~p) "
- "failed (~p).~n", [Node,Other]),
- ?shutdown2(Node, {shutdown, {inet_getaddr_failed, Other}})
+ ?trace("~w:getaddr(~p) "
+ "failed (~p).~n", [Driver, Address, Other]),
+ ?shutdown2(Node, {shutdown, {getaddr_failed, Other}})
end.
close(Socket) ->
- gen_tcp:close(Socket),
- ok.
+ gen_close(inet, Socket).
+
+gen_close(Driver, Socket) ->
+ Driver:close(Socket).
-do_accept(Driver, Kernel, AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
- process_flag(priority, max),
- receive
- {AcceptPid, controller} ->
- Timer = dist_util:start_timer(SetupTime),
- case check_ip(Driver, Socket) of
- true ->
- HSData = accept_hs_data(Kernel, MyNode, Socket, Timer, Allowed),
- dist_util:handshake_other_started(HSData);
- {false,IP} ->
- error_logger:error_msg("** Connection attempt from "
- "disallowed IP ~w ** ~n", [IP]),
- ?shutdown(no_node)
- end
- end.
%% ------------------------------------------------------------
%% Do only accept new connection attempts from nodes at our
%% own LAN, if the check_ip environment parameter is true.
%% ------------------------------------------------------------
-check_ip(Driver, Socket) ->
+check_ip(Driver, DistCtrl) ->
case application:get_env(check_ip) of
{ok, true} ->
- case get_ifs(Socket) of
+ case get_ifs(DistCtrl) of
{ok, IFs, IP} ->
check_ip(Driver, IFs, IP);
_ ->
@@ -154,9 +316,19 @@ check_ip(Driver, Socket) ->
true
end.
-get_ifs(Socket) ->
+check_ip(Driver, [{OwnIP, _, Netmask}|IFs], PeerIP) ->
+ case {Driver:mask(Netmask, PeerIP), Driver:mask(Netmask, OwnIP)} of
+ {M, M} -> true;
+ _ -> check_ip(IFs, PeerIP)
+ end;
+check_ip(_Driver, [], PeerIP) ->
+ {false, PeerIP}.
+
+get_ifs(DistCtrl) ->
+ Socket = ssl_tls_dist_ctrl:get_socket(DistCtrl),
case inet:peername(Socket) of
{ok, {IP, _}} ->
+ %% XXX this is seriously broken for IPv6
case inet:getif(Socket) of
{ok, IFs} -> {ok, IFs, IP};
Error -> Error
@@ -165,14 +337,6 @@ get_ifs(Socket) ->
Error
end.
-check_ip(Driver, [{OwnIP, _, Netmask}|IFs], PeerIP) ->
- case {Driver:mask(Netmask, PeerIP), Driver:mask(Netmask, OwnIP)} of
- {M, M} -> true;
- _ -> check_ip(IFs, PeerIP)
- end;
-check_ip(_Driver, [], PeerIP) ->
- {false, PeerIP}.
-
%% If Node is illegal terminate the connection setup!!
splitnode(Driver, Node, LongOrShortNames) ->
@@ -196,23 +360,34 @@ check_node(Driver, Name, Node, Host, LongOrShortNames) ->
{ok, _} ->
[Name, Host];
_ ->
- error_logger:error_msg("** System running to use "
- "fully qualified "
- "hostnames **~n"
- "** Hostname ~s is illegal **~n",
- [Host]),
+ error_logger:error_msg(
+ "** System running to use "
+ "fully qualified hostnames **~n"
+ "** Hostname ~s is illegal **~n",
+ [Host]),
?shutdown(Node)
end;
[_, _ | _] when LongOrShortNames == shortnames ->
- error_logger:error_msg("** System NOT running to use fully qualified "
- "hostnames **~n"
- "** Hostname ~s is illegal **~n",
- [Host]),
+ error_logger:error_msg(
+ "** System NOT running to use "
+ "fully qualified hostnames **~n"
+ "** Hostname ~s is illegal **~n",
+ [Host]),
?shutdown(Node);
_ ->
[Name, Host]
end.
+split_node(Node) when is_atom(Node) ->
+ case split_node(atom_to_list(Node), $@, []) of
+ [_, Host] ->
+ Host;
+ _ ->
+ false
+ end;
+split_node(_) ->
+ false.
+%%
split_node([Chr|T], Chr, Ack) ->
[lists:reverse(Ack)|split_node(T, Chr, [])];
split_node([H|T], Chr, Ack) ->
@@ -220,70 +395,119 @@ split_node([H|T], Chr, Ack) ->
split_node([], _, Ack) ->
[lists:reverse(Ack)].
-connect_hs_data(Kernel, Node, MyNode, Socket, Timer, Version, Ip, TcpPort, Address, Type) ->
- common_hs_data(Kernel, MyNode, Socket, Timer,
- #hs_data{other_node = Node,
- other_version = Version,
- f_address =
- fun(_,_) ->
- #net_address{address = {Ip,TcpPort},
- host = Address,
- protocol = proxy,
- family = inet}
- end,
- request_type = Type
- }).
-
-accept_hs_data(Kernel, MyNode, Socket, Timer, Allowed) ->
- common_hs_data(Kernel, MyNode, Socket, Timer, #hs_data{
- allowed = Allowed,
- f_address = fun get_remote_id/2
- }).
-
-common_hs_data(Kernel, MyNode, Socket, Timer, HsData) ->
- HsData#hs_data{
- kernel_pid = Kernel,
- this_node = MyNode,
- socket = Socket,
- timer = Timer,
- this_flags = 0,
- f_send =
- fun(S,D) ->
- gen_tcp:send(S,D)
- end,
- f_recv =
- fun(S,N,T) ->
- gen_tcp:recv(S,N,T)
- end,
- f_setopts_pre_nodeup =
- fun(S) ->
- inet:setopts(S, [{active, false}, {packet, 4}])
- end,
- f_setopts_post_nodeup =
- fun(S) ->
- inet:setopts(S, [{deliver, port},{active, true}])
- end,
- f_getll =
- fun(S) ->
- inet:getll(S)
- end,
- mf_tick =
- fun(S) ->
- gen_tcp:send(S, <<>>)
- end,
- mf_getstat =
- fun(S) ->
- {ok, Stats} = inet:getstat(S, [recv_cnt, send_cnt, send_pend]),
- R = proplists:get_value(recv_cnt, Stats, 0),
- W = proplists:get_value(send_cnt, Stats, 0),
- P = proplists:get_value(send_pend, Stats, 0),
- {ok, R,W,P}
- end}.
-
-get_remote_id(Socket, _Node) ->
- case ssl_tls_dist_proxy:get_tcp_address(Socket) of
- {ok, Address} ->
- Address;
- {error, _Reason} ->
- ?shutdown(no_node)
+%% -------------------------------------------------------------------------
+
+connect_options(Opts) ->
+ case application:get_env(kernel, inet_dist_connect_options) of
+ {ok,ConnectOpts} ->
+ lists:ukeysort(1, ConnectOpts ++ Opts);
+ _ ->
+ Opts
+ end.
+
+%% we may not always want the nodelay behaviour
+%% for performance reasons
+nodelay() ->
+ case application:get_env(kernel, dist_nodelay) of
+ undefined ->
+ {nodelay, true};
+ {ok, true} ->
+ {nodelay, true};
+ {ok, false} ->
+ {nodelay, false};
+ _ ->
+ {nodelay, true}
+ end.
+
+
+get_ssl_options(Type) ->
+ case init:get_argument(ssl_dist_opt) of
+ {ok, Args} ->
+ [{erl_dist, true} | ssl_options(Type, lists:append(Args))];
+ _ ->
+ [{erl_dist, true}]
+ end.
+
+ssl_options(_,[]) ->
+ [];
+ssl_options(server, ["client_" ++ _, _Value |T]) ->
+ ssl_options(server,T);
+ssl_options(client, ["server_" ++ _, _Value|T]) ->
+ ssl_options(client,T);
+ssl_options(server, ["server_certfile", Value|T]) ->
+ [{certfile, Value} | ssl_options(server,T)];
+ssl_options(client, ["client_certfile", Value | T]) ->
+ [{certfile, Value} | ssl_options(client,T)];
+ssl_options(server, ["server_cacertfile", Value|T]) ->
+ [{cacertfile, Value} | ssl_options(server,T)];
+ssl_options(client, ["client_cacertfile", Value|T]) ->
+ [{cacertfile, Value} | ssl_options(client,T)];
+ssl_options(server, ["server_keyfile", Value|T]) ->
+ [{keyfile, Value} | ssl_options(server,T)];
+ssl_options(client, ["client_keyfile", Value|T]) ->
+ [{keyfile, Value} | ssl_options(client,T)];
+ssl_options(server, ["server_password", Value|T]) ->
+ [{password, Value} | ssl_options(server,T)];
+ssl_options(client, ["client_password", Value|T]) ->
+ [{password, Value} | ssl_options(client,T)];
+ssl_options(server, ["server_verify", Value|T]) ->
+ [{verify, atomize(Value)} | ssl_options(server,T)];
+ssl_options(client, ["client_verify", Value|T]) ->
+ [{verify, atomize(Value)} | ssl_options(client,T)];
+ssl_options(server, ["server_verify_fun", Value|T]) ->
+ [{verify_fun, verify_fun(Value)} | ssl_options(server,T)];
+ssl_options(client, ["client_verify_fun", Value|T]) ->
+ [{verify_fun, verify_fun(Value)} | ssl_options(client,T)];
+ssl_options(server, ["server_crl_check", Value|T]) ->
+ [{crl_check, atomize(Value)} | ssl_options(server,T)];
+ssl_options(client, ["client_crl_check", Value|T]) ->
+ [{crl_check, atomize(Value)} | ssl_options(client,T)];
+ssl_options(server, ["server_crl_cache", Value|T]) ->
+ [{crl_cache, termify(Value)} | ssl_options(server,T)];
+ssl_options(client, ["client_crl_cache", Value|T]) ->
+ [{crl_cache, termify(Value)} | ssl_options(client,T)];
+ssl_options(server, ["server_reuse_sessions", Value|T]) ->
+ [{reuse_sessions, atomize(Value)} | ssl_options(server,T)];
+ssl_options(client, ["client_reuse_sessions", Value|T]) ->
+ [{reuse_sessions, atomize(Value)} | ssl_options(client,T)];
+ssl_options(server, ["server_secure_renegotiate", Value|T]) ->
+ [{secure_renegotiate, atomize(Value)} | ssl_options(server,T)];
+ssl_options(client, ["client_secure_renegotiate", Value|T]) ->
+ [{secure_renegotiate, atomize(Value)} | ssl_options(client,T)];
+ssl_options(server, ["server_depth", Value|T]) ->
+ [{depth, list_to_integer(Value)} | ssl_options(server,T)];
+ssl_options(client, ["client_depth", Value|T]) ->
+ [{depth, list_to_integer(Value)} | ssl_options(client,T)];
+ssl_options(server, ["server_hibernate_after", Value|T]) ->
+ [{hibernate_after, list_to_integer(Value)} | ssl_options(server,T)];
+ssl_options(client, ["client_hibernate_after", Value|T]) ->
+ [{hibernate_after, list_to_integer(Value)} | ssl_options(client,T)];
+ssl_options(server, ["server_ciphers", Value|T]) ->
+ [{ciphers, Value} | ssl_options(server,T)];
+ssl_options(client, ["client_ciphers", Value|T]) ->
+ [{ciphers, Value} | ssl_options(client,T)];
+ssl_options(server, ["server_dhfile", Value|T]) ->
+ [{dhfile, Value} | ssl_options(server,T)];
+ssl_options(server, ["server_fail_if_no_peer_cert", Value|T]) ->
+ [{fail_if_no_peer_cert, atomize(Value)} | ssl_options(server,T)];
+ssl_options(Type, Opts) ->
+ error(malformed_ssl_dist_opt, [Type, Opts]).
+
+atomize(List) when is_list(List) ->
+ list_to_atom(List);
+atomize(Atom) when is_atom(Atom) ->
+ Atom.
+
+termify(String) when is_list(String) ->
+ {ok, Tokens, _} = erl_scan:string(String ++ "."),
+ {ok, Term} = erl_parse:parse_term(Tokens),
+ Term.
+
+verify_fun(Value) ->
+ case termify(Value) of
+ {Mod, Func, State} when is_atom(Mod), is_atom(Func) ->
+ Fun = fun Mod:Func/3,
+ {Fun, State};
+ _ ->
+ error(malformed_ssl_dist_opt, [Value])
end.
diff --git a/lib/ssl/src/ssl.app.src b/lib/ssl/src/ssl.app.src
index 064dcd6892..a5286b6747 100644
--- a/lib/ssl/src/ssl.app.src
+++ b/lib/ssl/src/ssl.app.src
@@ -37,7 +37,7 @@
%% Erlang Distribution over SSL/TLS
inet_tls_dist,
inet6_tls_dist,
- ssl_tls_dist_proxy,
+ ssl_tls_dist_ctrl,
ssl_dist_sup,
ssl_dist_connection_sup,
ssl_dist_admin_sup,
diff --git a/lib/ssl/src/ssl_dist_sup.erl b/lib/ssl/src/ssl_dist_sup.erl
index 690b896919..c241a9bced 100644
--- a/lib/ssl/src/ssl_dist_sup.erl
+++ b/lib/ssl/src/ssl_dist_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2011-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2011-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -46,8 +46,7 @@ start_link() ->
init([]) ->
AdminSup = ssl_admin_child_spec(),
ConnectionSup = ssl_connection_sup(),
- ProxyServer = proxy_server_child_spec(),
- {ok, {{one_for_all, 10, 3600}, [AdminSup, ProxyServer, ConnectionSup]}}.
+ {ok, {{one_for_all, 10, 3600}, [AdminSup, ConnectionSup]}}.
%%--------------------------------------------------------------------
%%% Internal functions
@@ -69,12 +68,3 @@ ssl_connection_sup() ->
Modules = [ssl_connection_sup],
Type = supervisor,
{Name, StartFunc, Restart, Shutdown, Type, Modules}.
-
-proxy_server_child_spec() ->
- Name = ssl_tls_dist_proxy,
- StartFunc = {ssl_tls_dist_proxy, start_link, []},
- Restart = permanent,
- Shutdown = 4000,
- Modules = [ssl_tls_dist_proxy],
- Type = worker,
- {Name, StartFunc, Restart, Shutdown, Type, Modules}.
diff --git a/lib/ssl/src/ssl_tls_dist_ctrl.erl b/lib/ssl/src/ssl_tls_dist_ctrl.erl
new file mode 100644
index 0000000000..cbb39e71ec
--- /dev/null
+++ b/lib/ssl/src/ssl_tls_dist_ctrl.erl
@@ -0,0 +1,380 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(ssl_tls_dist_ctrl).
+
+-include_lib("kernel/include/dist.hrl").
+-include_lib("kernel/include/dist_util.hrl").
+-include_lib("kernel/include/net_address.hrl").
+
+-export([start/1, get_socket/1, set_supervisor/2, hs_data_common/1]).
+
+%%% ------------------------------------------------------------
+
+%% In order to avoid issues with lingering signal binaries
+%% we enable off-heap message queue data as well as fullsweep
+%% after 0. The fullsweeps will be cheap since we have more
+%% or less no live data.
+common_spawn_opts() ->
+ [{message_queue_data, off_heap},
+ {fullsweep_after, 0}].
+
+%%% ------------------------------------------------------------
+
+start(SslSocket) ->
+ spawn_opt(
+ fun () ->
+ setup(SslSocket)
+ end,
+ [{priority, max}] ++ common_spawn_opts()).
+
+get_socket(DistCtrl) ->
+ call(DistCtrl, get_socket).
+
+set_supervisor(DistCtrl, Pid) ->
+ call(DistCtrl, {set_supervisor, Pid}).
+
+hs_data_common(DistCtrl) ->
+ TickHandler = call(DistCtrl, tick_handler),
+ SslSocket = get_socket(DistCtrl),
+ #hs_data{
+ f_send =
+ fun (Ctrl, Packet) when Ctrl == DistCtrl ->
+ call(Ctrl, {send, Packet})
+ end,
+ f_recv =
+ fun (Ctrl, Length, Timeout) when Ctrl == DistCtrl ->
+ case call(Ctrl, {recv, Length, Timeout}) of
+ {ok, Bin} when is_binary(Bin) ->
+ {ok, binary_to_list(Bin)};
+ Other ->
+ Other
+ end
+ end,
+ f_setopts_pre_nodeup =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ call(Ctrl, pre_nodeup)
+ end,
+ f_setopts_post_nodeup =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ call(Ctrl, post_nodeup)
+ end,
+ f_getll =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ call(Ctrl, getll)
+ end,
+ f_handshake_complete =
+ fun (Ctrl, Node, DHandle) when Ctrl == DistCtrl ->
+ call(Ctrl, {handshake_complete, Node, DHandle})
+ end,
+ f_address =
+ fun (Ctrl, Node) when Ctrl == DistCtrl ->
+ case call(Ctrl, {check_address, Node}) of
+ {error, no_node} ->
+ %% No '@' or more than one '@' in node name.
+ ?shutdown(no_node);
+ Res ->
+ Res
+ end
+ end,
+ mf_setopts =
+ fun (Ctrl, Opts) when Ctrl == DistCtrl ->
+ case setopts_filter(Opts) of
+ [] ->
+ ssl:setopts(SslSocket, Opts);
+ Opts1 ->
+ {error, {badopts,Opts1}}
+ end
+ end,
+ mf_getopts =
+ fun (Ctrl, Opts) when Ctrl == DistCtrl ->
+ ssl:getopts(SslSocket, Opts)
+ end,
+ mf_getstat =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ case ssl:getstat(
+ SslSocket, [recv_cnt, send_cnt, send_pend]) of
+ {ok, Stat} ->
+ split_stat(Stat,0,0,0);
+ Error ->
+ Error
+ end
+ end,
+ mf_tick =
+ fun (Ctrl) when Ctrl == DistCtrl ->
+ TickHandler ! tick
+ end}.
+
+%%% ------------------------------------------------------------
+
+call(DistCtrl, Msg) ->
+ Ref = erlang:monitor(process, DistCtrl),
+ DistCtrl ! {Ref, self(), Msg},
+ receive
+ {Ref, Res} ->
+ erlang:demonitor(Ref, [flush]),
+ Res;
+ {'DOWN', Ref, process, DistCtrl, Reason} ->
+ exit({dist_controller_exit, Reason})
+ end.
+
+setopts_filter(Opts) ->
+ [Opt || {K,_} = Opt <- Opts,
+ K =:= active orelse K =:= deliver orelse K =:= packet].
+
+split_stat([{recv_cnt, R}|Stat], _, W, P) ->
+ split_stat(Stat, R, W, P);
+split_stat([{send_cnt, W}|Stat], R, _, P) ->
+ split_stat(Stat, R, W, P);
+split_stat([{send_pend, P}|Stat], R, W, _) ->
+ split_stat(Stat, R, W, P);
+split_stat([], R, W, P) ->
+ {ok, R, W, P}.
+
+%%% ------------------------------------------------------------
+%%% Distribution controller processes
+%%% ------------------------------------------------------------
+
+%%
+%% There will be five parties working together when the
+%% connection is up:
+%% - The SSL socket. Providing a TLS connection
+%% to the other node.
+%% - The output handler. It will dispatch all outgoing
+%% traffic from the VM to the gen_tcp socket. This
+%% process is registered as distribution controller
+%% for this channel with the VM.
+%% - The input handler. It will dispatch all incoming
+%% traffic from the gen_tcp socket to the VM. This
+%% process is also the socket owner and receives
+%% incoming traffic using active-N.
+%% - The tick handler. Dispatches asynchronous tick
+%% requests to the socket. It executes on max priority
+%% since it is important to get ticks through to the
+%% other end.
+%% - The channel supervisor (provided by dist_util). It
+%% monitors traffic. Issue tick requests to the tick
+%% handler when no outgoing traffic is seen and bring
+%% the connection down if no incoming traffic is seen.
+%% This process also executes on max priority.
+%%
+%% These parties are linked togheter so should one
+%% of them fail, all of them are terminated and the
+%% connection is taken down.
+%%
+
+
+%%
+%% The tick handler process writes a tick to the
+%% socket when it receives a 'tick' message from
+%% the connection supervisor.
+%%
+%% We are not allowed to block the connection
+%% superviser when writing a tick and we also want
+%% the tick to go through even during a heavily
+%% loaded system. gen_tcp does not have a
+%% non-blocking send operation exposed in its API
+%% and we don't want to run the distribution
+%% controller under high priority. Therefore this
+%% separate process with max prio that dispatches
+%% ticks.
+%%
+tick_handler(SslSocket) ->
+ receive
+ tick ->
+ %% May block due to busy port...
+ sock_send(SslSocket, ""),
+ flush_ticks(SslSocket);
+ _ ->
+ tick_handler(SslSocket)
+ end.
+
+flush_ticks(SslSocket) ->
+ receive
+ tick ->
+ flush_ticks(SslSocket)
+ after 0 ->
+ tick_handler(SslSocket)
+ end.
+
+setup(SslSocket) ->
+ TickHandler =
+ spawn_opt(
+ fun () ->
+ tick_handler(SslSocket)
+ end,
+ [link, {priority, max}] ++ common_spawn_opts()),
+ setup_loop(SslSocket, TickHandler, undefined).
+
+%%
+%% During the handshake phase we loop in setup().
+%% When the connection is up we spawn an input handler and
+%% continue as output handler.
+%%
+setup_loop(SslSocket, TickHandler, Sup) ->
+ receive
+ {ssl_closed, SslSocket} ->
+ exit(connection_closed);
+ {ssl_error, SslSocket} ->
+ exit(connection_closed);
+
+ {Ref, From, {set_supervisor, Pid}} ->
+ Res = link(Pid),
+ From ! {Ref, Res},
+ setup_loop(SslSocket, TickHandler, Pid);
+
+ {Ref, From, tick_handler} ->
+ From ! {Ref, TickHandler},
+ setup_loop(SslSocket, TickHandler, Sup);
+
+ {Ref, From, get_socket} ->
+ From ! {Ref, SslSocket},
+ setup_loop(SslSocket, TickHandler, Sup);
+
+ {Ref, From, {send, Packet}} ->
+ Res = ssl:send(SslSocket, Packet),
+ From ! {Ref, Res},
+ setup_loop(SslSocket, TickHandler, Sup);
+
+ {Ref, From, {recv, Length, Timeout}} ->
+ Res = ssl:recv(SslSocket, Length, Timeout),
+ From ! {Ref, Res},
+ setup_loop(SslSocket, TickHandler, Sup);
+
+ {Ref, From, {check_address, Node}} ->
+ Res =
+ case ssl:peername(SslSocket) of
+ {ok, Address} ->
+ case inet_tls_dist:split_node(Node) of
+ false ->
+ {error, no_node};
+ Host ->
+ #net_address{
+ address=Address, host=Host,
+ protocol=tls, family=inet}
+ end
+ end,
+ From ! {Ref, Res},
+ setup_loop(SslSocket, TickHandler, Sup);
+
+ {Ref, From, pre_nodeup} ->
+ Res =
+ ssl:setopts(
+ SslSocket,
+ [{active, false}, {packet, 4}, inet_tls_dist:nodelay()]),
+ From ! {Ref, Res},
+ setup_loop(SslSocket, TickHandler, Sup);
+
+ {Ref, From, post_nodeup} ->
+ Res =
+ ssl:setopts(
+ SslSocket,
+ [{active, once}, {packet, 4}, inet_tls_dist:nodelay()]),
+ From ! {Ref, Res},
+ setup_loop(SslSocket, TickHandler, Sup);
+
+ {Ref, From, getll} ->
+ From ! {Ref, {ok, self()}},
+ setup_loop(SslSocket, TickHandler, Sup);
+
+ {Ref, From, {handshake_complete, _Node, DHandle}} ->
+ From ! {Ref, ok},
+ %% Handshake complete! Begin dispatching traffic...
+ %% From now on we execute on normal priority
+ process_flag(priority, normal),
+ erlang:dist_ctrl_get_data_notification(DHandle),
+ loop(DHandle, SslSocket)
+ end.
+
+loop(DHandle, SslSocket) ->
+ receive
+ dist_data ->
+ %% Outgoing data from this node...
+ try send_data(DHandle, SslSocket)
+ catch _ : _ -> death_row()
+ end,
+ loop(DHandle, SslSocket);
+
+ {send, From, Ref, Data} ->
+ %% This is for testing only!
+ %%
+ %% Needed by some OTP distribution
+ %% test suites...
+ sock_send(SslSocket, Data),
+ From ! {Ref, ok},
+ loop(DHandle, SslSocket);
+
+ {ssl_closed, SslSocket} ->
+ %% Connection to remote node terminated...
+ exit(connection_closed);
+ {ssl_error, SslSocket, _Reason} ->
+ %% Connection to remote node terminated...
+ exit(connection_closed);
+ {ssl, SslSocket, Data} ->
+ %% Incoming data from remote node...
+ ok = ssl:setopts(SslSocket, [{active, once}]),
+ try erlang:dist_ctrl_put_data(DHandle, Data)
+ catch _ : _ -> death_row()
+ end,
+ loop(DHandle, SslSocket);
+
+ _ ->
+ %% Drop garbage message...
+ loop(DHandle, SslSocket)
+ end.
+
+send_data(DHandle, SslSocket) ->
+ case erlang:dist_ctrl_get_data(DHandle) of
+ none ->
+ erlang:dist_ctrl_get_data_notification(DHandle);
+ Data ->
+ sock_send(SslSocket, Data),
+ send_data(DHandle, SslSocket)
+ end.
+
+sock_send(SslSocket, Data) ->
+ try ssl:send(SslSocket, Data) of
+ ok -> ok;
+ {error, Reason} ->
+ death_row({send_error, Reason})
+ catch
+ Type:Reason ->
+ death_row({send_error, {Type, Reason}})
+ end.
+
+death_row() ->
+ death_row(connection_closed).
+
+death_row(normal) ->
+ %% We do not want to exit with normal
+ %% exit reason since it wont bring down
+ %% linked processes...
+ death_row();
+death_row(Reason) ->
+ %% When the connection is on its way down operations
+ %% begin to fail. We catch the failures and call
+ %% this function waiting for termination. We should
+ %% be terminated by one of our links to the other
+ %% involved parties that began bringing the
+ %% connection down. By waiting for termination we
+ %% avoid altering the exit reason for the connection
+ %% teardown. We however limit the wait to 5 seconds
+ %% and bring down the connection ourselves if not
+ %% terminated...
+ receive after 5000 -> exit(Reason) end.