From c2180b0e570baf90cbf567381212bb223fc73d37 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 8 Sep 2017 17:01:45 +0200 Subject: Rewrite dist ctrl from port to process --- lib/ssl/src/Makefile | 4 +- lib/ssl/src/inet6_tls_dist.erl | 7 +- lib/ssl/src/inet_tls_dist.erl | 500 +++++++++++++++++++++++++++----------- lib/ssl/src/ssl.app.src | 2 +- lib/ssl/src/ssl_dist_sup.erl | 14 +- lib/ssl/src/ssl_tls_dist_ctrl.erl | 380 +++++++++++++++++++++++++++++ 6 files changed, 751 insertions(+), 156 deletions(-) create mode 100644 lib/ssl/src/ssl_tls_dist_ctrl.erl (limited to 'lib/ssl/src') diff --git a/lib/ssl/src/Makefile b/lib/ssl/src/Makefile index 2e7df9792e..f6fe23b584 100644 --- a/lib/ssl/src/Makefile +++ b/lib/ssl/src/Makefile @@ -1,7 +1,7 @@ # # %CopyrightBegin% # -# Copyright Ericsson AB 1999-2016. All Rights Reserved. +# Copyright Ericsson AB 1999-2017. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -88,7 +88,7 @@ MODULES= \ ssl_v3 \ tls_v1 \ dtls_v1 \ - ssl_tls_dist_proxy + ssl_tls_dist_ctrl INTERNAL_HRL_FILES = \ ssl_alert.hrl ssl_cipher.hrl \ diff --git a/lib/ssl/src/inet6_tls_dist.erl b/lib/ssl/src/inet6_tls_dist.erl index ffd7296f93..96ce4d493a 100644 --- a/lib/ssl/src/inet6_tls_dist.erl +++ b/lib/ssl/src/inet6_tls_dist.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2015. All Rights Reserved. +%% Copyright Ericsson AB 2015-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -21,7 +21,8 @@ %% -module(inet6_tls_dist). --export([childspecs/0, listen/1, accept/1, accept_connection/5, +-export([childspecs/0]). +-export([listen/1, accept/1, accept_connection/5, setup/5, close/1, select/1]). childspecs() -> @@ -43,4 +44,4 @@ setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> inet_tls_dist:gen_setup(inet6_tcp, Node, Type, MyNode, LongOrShortNames,SetupTime). close(Socket) -> - inet_tls_dist:close(Socket). + inet_tls_dist:gen_close(inet6_tcp, Socket). diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl index 0da4b3587f..47f400da6f 100644 --- a/lib/ssl/src/inet_tls_dist.erl +++ b/lib/ssl/src/inet_tls_dist.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2011-2016. All Rights Reserved. +%% Copyright Ericsson AB 2011-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -21,17 +21,31 @@ %% -module(inet_tls_dist). --export([childspecs/0, listen/1, accept/1, accept_connection/5, +-export([childspecs/0]). +-export([listen/1, accept/1, accept_connection/5, setup/5, close/1, select/1, is_node_name/1]). %% Generalized dist API -export([gen_listen/2, gen_accept/2, gen_accept_connection/6, - gen_setup/6, gen_select/2]). + gen_setup/6, gen_close/2, gen_select/2]). + +-export([split_node/1, nodelay/0]). -include_lib("kernel/include/net_address.hrl"). -include_lib("kernel/include/dist.hrl"). -include_lib("kernel/include/dist_util.hrl"). +%% -undef(trace). +%% -define(trace(Fmt,Args), +%% erlang:display( +%% [erlang:convert_time_unit( +%% erlang:monotonic_time() +%% - erlang:system_info(start_time), native, microsecond), +%% node(), +%% lists:flatten(io_lib:format(Fmt, Args))])). + +%% ------------------------------------------------------------------------- + childspecs() -> {ok, [{ssl_dist_sup,{ssl_dist_sup, start_link, []}, permanent, infinity, supervisor, [ssl_dist_sup]}]}. @@ -40,51 +54,186 @@ select(Node) -> gen_select(inet_tcp, Node). gen_select(Driver, Node) -> - case split_node(atom_to_list(Node), $@, []) of - [_, Host] -> - case inet:getaddr(Host, Driver:family()) of + case split_node(Node) of + false -> + false; + Host -> + case Driver:getaddr(Host) of {ok, _} -> true; _ -> false - end; - _ -> - false + end end. -is_node_name(Node) when is_atom(Node) -> - select(Node); -is_node_name(_) -> - false. +%% ------------------------------------------------------------------------- + +is_node_name(Node) -> + case split_node(Node) of + false -> + false; + _Host -> + true + end. + +%% ------------------------------------------------------------------------- listen(Name) -> gen_listen(inet_tcp, Name). gen_listen(Driver, Name) -> - ssl_tls_dist_proxy:listen(Driver, Name). + case inet_tcp_dist:gen_listen(Driver, Name) of + {ok, {Socket, Address, Creation}} -> + inet:setopts(Socket, [{packet, 4}]), + {ok, {Socket, Address#net_address{protocol=tls}, Creation}}; + Other -> + Other + end. + +%% ------------------------------------------------------------------------- accept(Listen) -> gen_accept(inet_tcp, Listen). gen_accept(Driver, Listen) -> - ssl_tls_dist_proxy:accept(Driver, Listen). + Kernel = self(), + spawn_opt( + fun () -> + accept_loop(Driver, Listen, Kernel) + end, + [link, {priority, max}]). + +accept_loop(Driver, Listen, Kernel) -> + ?trace("~p~n",[{?MODULE, accept_loop, self()}]), + case Driver:accept(Listen) of + {ok, Socket} -> + Opts = get_ssl_options(server), + wait_for_code_server(), + case ssl:ssl_accept( + Socket, [{active, false}, {packet, 4}] ++ Opts) of + {ok, SslSocket} -> + DistCtrl = ssl_tls_dist_ctrl:start(SslSocket), + ?trace("~p~n", + [{?MODULE, accept_loop, accepted, + SslSocket, DistCtrl, self()}]), + ok = ssl:controlling_process(SslSocket, DistCtrl), + Kernel ! + {accept, self(), DistCtrl, Driver:family(), tls}, + receive + {Kernel, controller, Pid} -> + ?trace("~p~n", + [{?MODULE, accept_loop, + controller, self()}]), + ssl_tls_dist_ctrl:set_supervisor(DistCtrl, Pid), + Pid ! {self(), controller}; + {Kernel, unsupported_protocol} -> + ?trace("~p~n", + [{?MODULE, accept_loop, + unsupported_protocol, self()}]), + exit(unsupported_protocol) + end, + accept_loop(Driver, Listen, Kernel); + {error, {options, _}} = Error -> + %% Bad options: that's probably our fault. + %% Let's log that. + error_logger:error_msg( + "Cannot accept TLS distribution connection: ~s~n", + [ssl:format_error(Error)]), + gen_tcp:close(Socket); + _ -> + gen_tcp:close(Socket) + end; + Error -> + exit(Error) + end, + accept_loop(Driver, Listen, Kernel). + +wait_for_code_server() -> + %% This is an ugly hack. Upgrading a socket to TLS requires the + %% crypto module to be loaded. Loading the crypto module triggers + %% its on_load function, which calls code:priv_dir/1 to find the + %% directory where its NIF library is. However, distribution is + %% started earlier than the code server, so the code server is not + %% necessarily started yet, and code:priv_dir/1 might fail because + %% of that, if we receive an incoming connection on the + %% distribution port early enough. + %% + %% If the on_load function of a module fails, the module is + %% unloaded, and the function call that triggered loading it fails + %% with 'undef', which is rather confusing. + %% + %% Thus, the ssl_tls_dist_proxy process will terminate, and be + %% restarted by ssl_dist_sup. However, it won't have any memory + %% of being asked by net_kernel to listen for incoming + %% connections. Hence, the node will believe that it's open for + %% distribution, but it actually isn't. + %% + %% So let's avoid that by waiting for the code server to start. + case whereis(code_server) of + undefined -> + timer:sleep(10), + wait_for_code_server(); + Pid when is_pid(Pid) -> + ok + end. + +%% ------------------------------------------------------------------------- -accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) -> - gen_accept_connection(inet_tcp, AcceptPid, Socket, MyNode, Allowed, SetupTime). +accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + gen_accept_connection( + inet_tcp, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime). -gen_accept_connection(Driver, AcceptPid, Socket, MyNode, Allowed, SetupTime) -> +gen_accept_connection( + Driver, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> Kernel = self(), - spawn_link(fun() -> do_accept(Driver, Kernel, AcceptPid, Socket, - MyNode, Allowed, SetupTime) end). + spawn_opt( + fun() -> + do_accept( + Driver, Kernel, AcceptPid, DistCtrl, + MyNode, Allowed, SetupTime) + end, + [link, {priority, max}]). + +do_accept(Driver, Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + receive + {AcceptPid, controller} -> + Timer = dist_util:start_timer(SetupTime), + case check_ip(Driver, DistCtrl) of + true -> + HSData0 = ssl_tls_dist_ctrl:hs_data_common(DistCtrl), + HSData = + HSData0#hs_data{ + kernel_pid = Kernel, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + allowed = Allowed}, + dist_util:handshake_other_started(HSData); + {false,IP} -> + error_logger:error_msg( + "** Connection attempt from " + "disallowed IP ~w ** ~n", [IP]), + ?shutdown(no_node) + end + end. -setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> - gen_setup(inet_tcp, Node, Type, MyNode, LongOrShortNames,SetupTime). -gen_setup(Driver, Node, Type, MyNode, LongOrShortNames,SetupTime) -> + +setup(Node, Type, MyNode, LongOrShortNames, SetupTime) -> + gen_setup(inet_tcp, Node, Type, MyNode, LongOrShortNames, SetupTime). + +gen_setup(Driver, Node, Type, MyNode, LongOrShortNames, SetupTime) -> Kernel = self(), - spawn_opt(fun() -> do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) end, [link, {priority, max}]). - + spawn_opt( + fun() -> + do_setup( + Driver, Kernel, Node, Type, + MyNode, LongOrShortNames, SetupTime) + end, + [link, {priority, max}]). + do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> [Name, Address] = splitnode(Driver, Node, LongOrShortNames), - case inet:getaddr(Address, Driver:family()) of + case Driver:getaddr(Address) of {ok, Ip} -> Timer = dist_util:start_timer(SetupTime), ErlEpmd = net_kernel:epmd_module(), @@ -92,12 +241,37 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> {port, TcpPort, Version} -> ?trace("port_please(~p) -> version ~p~n", [Node,Version]), + Opts = connect_options(get_ssl_options(client)), dist_util:reset_timer(Timer), - case ssl_tls_dist_proxy:connect(Driver, Ip, TcpPort) of - {ok, Socket} -> - HSData = connect_hs_data(Kernel, Node, MyNode, Socket, - Timer, Version, Ip, TcpPort, Address, - Type), + case ssl:connect( + Ip, TcpPort, + [binary, {active, false}, {packet, 4}, + Driver:family(), nodelay()] ++ Opts) of + {ok, SslSocket} -> + ?trace("~p~n", + [{?MODULE, do_setup, + ssl_socket, SslSocket}]), + DistCtrl = ssl_tls_dist_ctrl:start(SslSocket), + ssl_tls_dist_ctrl:set_supervisor( + DistCtrl, self()), + ok = + ssl:controlling_process( + SslSocket, DistCtrl), + HSData0 = + ssl_tls_dist_ctrl:hs_data_common(DistCtrl), + HSData = + HSData0#hs_data{ + kernel_pid = Kernel, + other_node = Node, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + other_version = Version, + request_type = Type}, + ?trace("~p~n", + [{?MODULE, do_setup, + handshake_we_started, HSData}]), dist_util:handshake_we_started(HSData); Other -> %% Other Node may have closed since @@ -105,7 +279,8 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> ?trace("other node (~p) " "closed since port_please.~n", [Node]), - ?shutdown2(Node, {shutdown, {connect_failed, Other}}) + ?shutdown2(Node, + {shutdown, {connect_failed, Other}}) end; Other -> ?trace("port_please (~p) " @@ -113,38 +288,25 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> ?shutdown2(Node, {shutdown, {port_please_failed, Other}}) end; Other -> - ?trace("inet_getaddr(~p) " - "failed (~p).~n", [Node,Other]), - ?shutdown2(Node, {shutdown, {inet_getaddr_failed, Other}}) + ?trace("~w:getaddr(~p) " + "failed (~p).~n", [Driver, Address, Other]), + ?shutdown2(Node, {shutdown, {getaddr_failed, Other}}) end. close(Socket) -> - gen_tcp:close(Socket), - ok. + gen_close(inet, Socket). + +gen_close(Driver, Socket) -> + Driver:close(Socket). -do_accept(Driver, Kernel, AcceptPid, Socket, MyNode, Allowed, SetupTime) -> - process_flag(priority, max), - receive - {AcceptPid, controller} -> - Timer = dist_util:start_timer(SetupTime), - case check_ip(Driver, Socket) of - true -> - HSData = accept_hs_data(Kernel, MyNode, Socket, Timer, Allowed), - dist_util:handshake_other_started(HSData); - {false,IP} -> - error_logger:error_msg("** Connection attempt from " - "disallowed IP ~w ** ~n", [IP]), - ?shutdown(no_node) - end - end. %% ------------------------------------------------------------ %% Do only accept new connection attempts from nodes at our %% own LAN, if the check_ip environment parameter is true. %% ------------------------------------------------------------ -check_ip(Driver, Socket) -> +check_ip(Driver, DistCtrl) -> case application:get_env(check_ip) of {ok, true} -> - case get_ifs(Socket) of + case get_ifs(DistCtrl) of {ok, IFs, IP} -> check_ip(Driver, IFs, IP); _ -> @@ -154,9 +316,19 @@ check_ip(Driver, Socket) -> true end. -get_ifs(Socket) -> +check_ip(Driver, [{OwnIP, _, Netmask}|IFs], PeerIP) -> + case {Driver:mask(Netmask, PeerIP), Driver:mask(Netmask, OwnIP)} of + {M, M} -> true; + _ -> check_ip(IFs, PeerIP) + end; +check_ip(_Driver, [], PeerIP) -> + {false, PeerIP}. + +get_ifs(DistCtrl) -> + Socket = ssl_tls_dist_ctrl:get_socket(DistCtrl), case inet:peername(Socket) of {ok, {IP, _}} -> + %% XXX this is seriously broken for IPv6 case inet:getif(Socket) of {ok, IFs} -> {ok, IFs, IP}; Error -> Error @@ -165,14 +337,6 @@ get_ifs(Socket) -> Error end. -check_ip(Driver, [{OwnIP, _, Netmask}|IFs], PeerIP) -> - case {Driver:mask(Netmask, PeerIP), Driver:mask(Netmask, OwnIP)} of - {M, M} -> true; - _ -> check_ip(IFs, PeerIP) - end; -check_ip(_Driver, [], PeerIP) -> - {false, PeerIP}. - %% If Node is illegal terminate the connection setup!! splitnode(Driver, Node, LongOrShortNames) -> @@ -196,23 +360,34 @@ check_node(Driver, Name, Node, Host, LongOrShortNames) -> {ok, _} -> [Name, Host]; _ -> - error_logger:error_msg("** System running to use " - "fully qualified " - "hostnames **~n" - "** Hostname ~s is illegal **~n", - [Host]), + error_logger:error_msg( + "** System running to use " + "fully qualified hostnames **~n" + "** Hostname ~s is illegal **~n", + [Host]), ?shutdown(Node) end; [_, _ | _] when LongOrShortNames == shortnames -> - error_logger:error_msg("** System NOT running to use fully qualified " - "hostnames **~n" - "** Hostname ~s is illegal **~n", - [Host]), + error_logger:error_msg( + "** System NOT running to use " + "fully qualified hostnames **~n" + "** Hostname ~s is illegal **~n", + [Host]), ?shutdown(Node); _ -> [Name, Host] end. +split_node(Node) when is_atom(Node) -> + case split_node(atom_to_list(Node), $@, []) of + [_, Host] -> + Host; + _ -> + false + end; +split_node(_) -> + false. +%% split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; split_node([H|T], Chr, Ack) -> @@ -220,70 +395,119 @@ split_node([H|T], Chr, Ack) -> split_node([], _, Ack) -> [lists:reverse(Ack)]. -connect_hs_data(Kernel, Node, MyNode, Socket, Timer, Version, Ip, TcpPort, Address, Type) -> - common_hs_data(Kernel, MyNode, Socket, Timer, - #hs_data{other_node = Node, - other_version = Version, - f_address = - fun(_,_) -> - #net_address{address = {Ip,TcpPort}, - host = Address, - protocol = proxy, - family = inet} - end, - request_type = Type - }). - -accept_hs_data(Kernel, MyNode, Socket, Timer, Allowed) -> - common_hs_data(Kernel, MyNode, Socket, Timer, #hs_data{ - allowed = Allowed, - f_address = fun get_remote_id/2 - }). - -common_hs_data(Kernel, MyNode, Socket, Timer, HsData) -> - HsData#hs_data{ - kernel_pid = Kernel, - this_node = MyNode, - socket = Socket, - timer = Timer, - this_flags = 0, - f_send = - fun(S,D) -> - gen_tcp:send(S,D) - end, - f_recv = - fun(S,N,T) -> - gen_tcp:recv(S,N,T) - end, - f_setopts_pre_nodeup = - fun(S) -> - inet:setopts(S, [{active, false}, {packet, 4}]) - end, - f_setopts_post_nodeup = - fun(S) -> - inet:setopts(S, [{deliver, port},{active, true}]) - end, - f_getll = - fun(S) -> - inet:getll(S) - end, - mf_tick = - fun(S) -> - gen_tcp:send(S, <<>>) - end, - mf_getstat = - fun(S) -> - {ok, Stats} = inet:getstat(S, [recv_cnt, send_cnt, send_pend]), - R = proplists:get_value(recv_cnt, Stats, 0), - W = proplists:get_value(send_cnt, Stats, 0), - P = proplists:get_value(send_pend, Stats, 0), - {ok, R,W,P} - end}. - -get_remote_id(Socket, _Node) -> - case ssl_tls_dist_proxy:get_tcp_address(Socket) of - {ok, Address} -> - Address; - {error, _Reason} -> - ?shutdown(no_node) +%% ------------------------------------------------------------------------- + +connect_options(Opts) -> + case application:get_env(kernel, inet_dist_connect_options) of + {ok,ConnectOpts} -> + lists:ukeysort(1, ConnectOpts ++ Opts); + _ -> + Opts + end. + +%% we may not always want the nodelay behaviour +%% for performance reasons +nodelay() -> + case application:get_env(kernel, dist_nodelay) of + undefined -> + {nodelay, true}; + {ok, true} -> + {nodelay, true}; + {ok, false} -> + {nodelay, false}; + _ -> + {nodelay, true} + end. + + +get_ssl_options(Type) -> + case init:get_argument(ssl_dist_opt) of + {ok, Args} -> + [{erl_dist, true} | ssl_options(Type, lists:append(Args))]; + _ -> + [{erl_dist, true}] + end. + +ssl_options(_,[]) -> + []; +ssl_options(server, ["client_" ++ _, _Value |T]) -> + ssl_options(server,T); +ssl_options(client, ["server_" ++ _, _Value|T]) -> + ssl_options(client,T); +ssl_options(server, ["server_certfile", Value|T]) -> + [{certfile, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_certfile", Value | T]) -> + [{certfile, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_cacertfile", Value|T]) -> + [{cacertfile, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_cacertfile", Value|T]) -> + [{cacertfile, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_keyfile", Value|T]) -> + [{keyfile, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_keyfile", Value|T]) -> + [{keyfile, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_password", Value|T]) -> + [{password, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_password", Value|T]) -> + [{password, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_verify", Value|T]) -> + [{verify, atomize(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_verify", Value|T]) -> + [{verify, atomize(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_verify_fun", Value|T]) -> + [{verify_fun, verify_fun(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_verify_fun", Value|T]) -> + [{verify_fun, verify_fun(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_crl_check", Value|T]) -> + [{crl_check, atomize(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_crl_check", Value|T]) -> + [{crl_check, atomize(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_crl_cache", Value|T]) -> + [{crl_cache, termify(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_crl_cache", Value|T]) -> + [{crl_cache, termify(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_reuse_sessions", Value|T]) -> + [{reuse_sessions, atomize(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_reuse_sessions", Value|T]) -> + [{reuse_sessions, atomize(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_secure_renegotiate", Value|T]) -> + [{secure_renegotiate, atomize(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_secure_renegotiate", Value|T]) -> + [{secure_renegotiate, atomize(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_depth", Value|T]) -> + [{depth, list_to_integer(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_depth", Value|T]) -> + [{depth, list_to_integer(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_hibernate_after", Value|T]) -> + [{hibernate_after, list_to_integer(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_hibernate_after", Value|T]) -> + [{hibernate_after, list_to_integer(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_ciphers", Value|T]) -> + [{ciphers, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_ciphers", Value|T]) -> + [{ciphers, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_dhfile", Value|T]) -> + [{dhfile, Value} | ssl_options(server,T)]; +ssl_options(server, ["server_fail_if_no_peer_cert", Value|T]) -> + [{fail_if_no_peer_cert, atomize(Value)} | ssl_options(server,T)]; +ssl_options(Type, Opts) -> + error(malformed_ssl_dist_opt, [Type, Opts]). + +atomize(List) when is_list(List) -> + list_to_atom(List); +atomize(Atom) when is_atom(Atom) -> + Atom. + +termify(String) when is_list(String) -> + {ok, Tokens, _} = erl_scan:string(String ++ "."), + {ok, Term} = erl_parse:parse_term(Tokens), + Term. + +verify_fun(Value) -> + case termify(Value) of + {Mod, Func, State} when is_atom(Mod), is_atom(Func) -> + Fun = fun Mod:Func/3, + {Fun, State}; + _ -> + error(malformed_ssl_dist_opt, [Value]) end. diff --git a/lib/ssl/src/ssl.app.src b/lib/ssl/src/ssl.app.src index 064dcd6892..a5286b6747 100644 --- a/lib/ssl/src/ssl.app.src +++ b/lib/ssl/src/ssl.app.src @@ -37,7 +37,7 @@ %% Erlang Distribution over SSL/TLS inet_tls_dist, inet6_tls_dist, - ssl_tls_dist_proxy, + ssl_tls_dist_ctrl, ssl_dist_sup, ssl_dist_connection_sup, ssl_dist_admin_sup, diff --git a/lib/ssl/src/ssl_dist_sup.erl b/lib/ssl/src/ssl_dist_sup.erl index 690b896919..c241a9bced 100644 --- a/lib/ssl/src/ssl_dist_sup.erl +++ b/lib/ssl/src/ssl_dist_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2011-2016. All Rights Reserved. +%% Copyright Ericsson AB 2011-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -46,8 +46,7 @@ start_link() -> init([]) -> AdminSup = ssl_admin_child_spec(), ConnectionSup = ssl_connection_sup(), - ProxyServer = proxy_server_child_spec(), - {ok, {{one_for_all, 10, 3600}, [AdminSup, ProxyServer, ConnectionSup]}}. + {ok, {{one_for_all, 10, 3600}, [AdminSup, ConnectionSup]}}. %%-------------------------------------------------------------------- %%% Internal functions @@ -69,12 +68,3 @@ ssl_connection_sup() -> Modules = [ssl_connection_sup], Type = supervisor, {Name, StartFunc, Restart, Shutdown, Type, Modules}. - -proxy_server_child_spec() -> - Name = ssl_tls_dist_proxy, - StartFunc = {ssl_tls_dist_proxy, start_link, []}, - Restart = permanent, - Shutdown = 4000, - Modules = [ssl_tls_dist_proxy], - Type = worker, - {Name, StartFunc, Restart, Shutdown, Type, Modules}. diff --git a/lib/ssl/src/ssl_tls_dist_ctrl.erl b/lib/ssl/src/ssl_tls_dist_ctrl.erl new file mode 100644 index 0000000000..cbb39e71ec --- /dev/null +++ b/lib/ssl/src/ssl_tls_dist_ctrl.erl @@ -0,0 +1,380 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(ssl_tls_dist_ctrl). + +-include_lib("kernel/include/dist.hrl"). +-include_lib("kernel/include/dist_util.hrl"). +-include_lib("kernel/include/net_address.hrl"). + +-export([start/1, get_socket/1, set_supervisor/2, hs_data_common/1]). + +%%% ------------------------------------------------------------ + +%% In order to avoid issues with lingering signal binaries +%% we enable off-heap message queue data as well as fullsweep +%% after 0. The fullsweeps will be cheap since we have more +%% or less no live data. +common_spawn_opts() -> + [{message_queue_data, off_heap}, + {fullsweep_after, 0}]. + +%%% ------------------------------------------------------------ + +start(SslSocket) -> + spawn_opt( + fun () -> + setup(SslSocket) + end, + [{priority, max}] ++ common_spawn_opts()). + +get_socket(DistCtrl) -> + call(DistCtrl, get_socket). + +set_supervisor(DistCtrl, Pid) -> + call(DistCtrl, {set_supervisor, Pid}). + +hs_data_common(DistCtrl) -> + TickHandler = call(DistCtrl, tick_handler), + SslSocket = get_socket(DistCtrl), + #hs_data{ + f_send = + fun (Ctrl, Packet) when Ctrl == DistCtrl -> + call(Ctrl, {send, Packet}) + end, + f_recv = + fun (Ctrl, Length, Timeout) when Ctrl == DistCtrl -> + case call(Ctrl, {recv, Length, Timeout}) of + {ok, Bin} when is_binary(Bin) -> + {ok, binary_to_list(Bin)}; + Other -> + Other + end + end, + f_setopts_pre_nodeup = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, pre_nodeup) + end, + f_setopts_post_nodeup = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, post_nodeup) + end, + f_getll = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, getll) + end, + f_handshake_complete = + fun (Ctrl, Node, DHandle) when Ctrl == DistCtrl -> + call(Ctrl, {handshake_complete, Node, DHandle}) + end, + f_address = + fun (Ctrl, Node) when Ctrl == DistCtrl -> + case call(Ctrl, {check_address, Node}) of + {error, no_node} -> + %% No '@' or more than one '@' in node name. + ?shutdown(no_node); + Res -> + Res + end + end, + mf_setopts = + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + case setopts_filter(Opts) of + [] -> + ssl:setopts(SslSocket, Opts); + Opts1 -> + {error, {badopts,Opts1}} + end + end, + mf_getopts = + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + ssl:getopts(SslSocket, Opts) + end, + mf_getstat = + fun (Ctrl) when Ctrl == DistCtrl -> + case ssl:getstat( + SslSocket, [recv_cnt, send_cnt, send_pend]) of + {ok, Stat} -> + split_stat(Stat,0,0,0); + Error -> + Error + end + end, + mf_tick = + fun (Ctrl) when Ctrl == DistCtrl -> + TickHandler ! tick + end}. + +%%% ------------------------------------------------------------ + +call(DistCtrl, Msg) -> + Ref = erlang:monitor(process, DistCtrl), + DistCtrl ! {Ref, self(), Msg}, + receive + {Ref, Res} -> + erlang:demonitor(Ref, [flush]), + Res; + {'DOWN', Ref, process, DistCtrl, Reason} -> + exit({dist_controller_exit, Reason}) + end. + +setopts_filter(Opts) -> + [Opt || {K,_} = Opt <- Opts, + K =:= active orelse K =:= deliver orelse K =:= packet]. + +split_stat([{recv_cnt, R}|Stat], _, W, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_cnt, W}|Stat], R, _, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_pend, P}|Stat], R, W, _) -> + split_stat(Stat, R, W, P); +split_stat([], R, W, P) -> + {ok, R, W, P}. + +%%% ------------------------------------------------------------ +%%% Distribution controller processes +%%% ------------------------------------------------------------ + +%% +%% There will be five parties working together when the +%% connection is up: +%% - The SSL socket. Providing a TLS connection +%% to the other node. +%% - The output handler. It will dispatch all outgoing +%% traffic from the VM to the gen_tcp socket. This +%% process is registered as distribution controller +%% for this channel with the VM. +%% - The input handler. It will dispatch all incoming +%% traffic from the gen_tcp socket to the VM. This +%% process is also the socket owner and receives +%% incoming traffic using active-N. +%% - The tick handler. Dispatches asynchronous tick +%% requests to the socket. It executes on max priority +%% since it is important to get ticks through to the +%% other end. +%% - The channel supervisor (provided by dist_util). It +%% monitors traffic. Issue tick requests to the tick +%% handler when no outgoing traffic is seen and bring +%% the connection down if no incoming traffic is seen. +%% This process also executes on max priority. +%% +%% These parties are linked togheter so should one +%% of them fail, all of them are terminated and the +%% connection is taken down. +%% + + +%% +%% The tick handler process writes a tick to the +%% socket when it receives a 'tick' message from +%% the connection supervisor. +%% +%% We are not allowed to block the connection +%% superviser when writing a tick and we also want +%% the tick to go through even during a heavily +%% loaded system. gen_tcp does not have a +%% non-blocking send operation exposed in its API +%% and we don't want to run the distribution +%% controller under high priority. Therefore this +%% separate process with max prio that dispatches +%% ticks. +%% +tick_handler(SslSocket) -> + receive + tick -> + %% May block due to busy port... + sock_send(SslSocket, ""), + flush_ticks(SslSocket); + _ -> + tick_handler(SslSocket) + end. + +flush_ticks(SslSocket) -> + receive + tick -> + flush_ticks(SslSocket) + after 0 -> + tick_handler(SslSocket) + end. + +setup(SslSocket) -> + TickHandler = + spawn_opt( + fun () -> + tick_handler(SslSocket) + end, + [link, {priority, max}] ++ common_spawn_opts()), + setup_loop(SslSocket, TickHandler, undefined). + +%% +%% During the handshake phase we loop in setup(). +%% When the connection is up we spawn an input handler and +%% continue as output handler. +%% +setup_loop(SslSocket, TickHandler, Sup) -> + receive + {ssl_closed, SslSocket} -> + exit(connection_closed); + {ssl_error, SslSocket} -> + exit(connection_closed); + + {Ref, From, {set_supervisor, Pid}} -> + Res = link(Pid), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Pid); + + {Ref, From, tick_handler} -> + From ! {Ref, TickHandler}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, get_socket} -> + From ! {Ref, SslSocket}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {send, Packet}} -> + Res = ssl:send(SslSocket, Packet), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {recv, Length, Timeout}} -> + Res = ssl:recv(SslSocket, Length, Timeout), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {check_address, Node}} -> + Res = + case ssl:peername(SslSocket) of + {ok, Address} -> + case inet_tls_dist:split_node(Node) of + false -> + {error, no_node}; + Host -> + #net_address{ + address=Address, host=Host, + protocol=tls, family=inet} + end + end, + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, pre_nodeup} -> + Res = + ssl:setopts( + SslSocket, + [{active, false}, {packet, 4}, inet_tls_dist:nodelay()]), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, post_nodeup} -> + Res = + ssl:setopts( + SslSocket, + [{active, once}, {packet, 4}, inet_tls_dist:nodelay()]), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, getll} -> + From ! {Ref, {ok, self()}}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {handshake_complete, _Node, DHandle}} -> + From ! {Ref, ok}, + %% Handshake complete! Begin dispatching traffic... + %% From now on we execute on normal priority + process_flag(priority, normal), + erlang:dist_ctrl_get_data_notification(DHandle), + loop(DHandle, SslSocket) + end. + +loop(DHandle, SslSocket) -> + receive + dist_data -> + %% Outgoing data from this node... + try send_data(DHandle, SslSocket) + catch _ : _ -> death_row() + end, + loop(DHandle, SslSocket); + + {send, From, Ref, Data} -> + %% This is for testing only! + %% + %% Needed by some OTP distribution + %% test suites... + sock_send(SslSocket, Data), + From ! {Ref, ok}, + loop(DHandle, SslSocket); + + {ssl_closed, SslSocket} -> + %% Connection to remote node terminated... + exit(connection_closed); + {ssl_error, SslSocket, _Reason} -> + %% Connection to remote node terminated... + exit(connection_closed); + {ssl, SslSocket, Data} -> + %% Incoming data from remote node... + ok = ssl:setopts(SslSocket, [{active, once}]), + try erlang:dist_ctrl_put_data(DHandle, Data) + catch _ : _ -> death_row() + end, + loop(DHandle, SslSocket); + + _ -> + %% Drop garbage message... + loop(DHandle, SslSocket) + end. + +send_data(DHandle, SslSocket) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + erlang:dist_ctrl_get_data_notification(DHandle); + Data -> + sock_send(SslSocket, Data), + send_data(DHandle, SslSocket) + end. + +sock_send(SslSocket, Data) -> + try ssl:send(SslSocket, Data) of + ok -> ok; + {error, Reason} -> + death_row({send_error, Reason}) + catch + Type:Reason -> + death_row({send_error, {Type, Reason}}) + end. + +death_row() -> + death_row(connection_closed). + +death_row(normal) -> + %% We do not want to exit with normal + %% exit reason since it wont bring down + %% linked processes... + death_row(); +death_row(Reason) -> + %% When the connection is on its way down operations + %% begin to fail. We catch the failures and call + %% this function waiting for termination. We should + %% be terminated by one of our links to the other + %% involved parties that began bringing the + %% connection down. By waiting for termination we + %% avoid altering the exit reason for the connection + %% teardown. We however limit the wait to 5 seconds + %% and bring down the connection ourselves if not + %% terminated... + receive after 5000 -> exit(Reason) end. -- cgit v1.2.3