From c2180b0e570baf90cbf567381212bb223fc73d37 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 8 Sep 2017 17:01:45 +0200 Subject: Rewrite dist ctrl from port to process --- lib/ssl/src/Makefile | 4 +- lib/ssl/src/inet6_tls_dist.erl | 7 +- lib/ssl/src/inet_tls_dist.erl | 500 +++++++++++++++++++++++++++----------- lib/ssl/src/ssl.app.src | 2 +- lib/ssl/src/ssl_dist_sup.erl | 14 +- lib/ssl/src/ssl_tls_dist_ctrl.erl | 380 +++++++++++++++++++++++++++++ 6 files changed, 751 insertions(+), 156 deletions(-) create mode 100644 lib/ssl/src/ssl_tls_dist_ctrl.erl (limited to 'lib') diff --git a/lib/ssl/src/Makefile b/lib/ssl/src/Makefile index 2e7df9792e..f6fe23b584 100644 --- a/lib/ssl/src/Makefile +++ b/lib/ssl/src/Makefile @@ -1,7 +1,7 @@ # # %CopyrightBegin% # -# Copyright Ericsson AB 1999-2016. All Rights Reserved. +# Copyright Ericsson AB 1999-2017. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -88,7 +88,7 @@ MODULES= \ ssl_v3 \ tls_v1 \ dtls_v1 \ - ssl_tls_dist_proxy + ssl_tls_dist_ctrl INTERNAL_HRL_FILES = \ ssl_alert.hrl ssl_cipher.hrl \ diff --git a/lib/ssl/src/inet6_tls_dist.erl b/lib/ssl/src/inet6_tls_dist.erl index ffd7296f93..96ce4d493a 100644 --- a/lib/ssl/src/inet6_tls_dist.erl +++ b/lib/ssl/src/inet6_tls_dist.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2015. All Rights Reserved. +%% Copyright Ericsson AB 2015-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -21,7 +21,8 @@ %% -module(inet6_tls_dist). --export([childspecs/0, listen/1, accept/1, accept_connection/5, +-export([childspecs/0]). +-export([listen/1, accept/1, accept_connection/5, setup/5, close/1, select/1]). childspecs() -> @@ -43,4 +44,4 @@ setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> inet_tls_dist:gen_setup(inet6_tcp, Node, Type, MyNode, LongOrShortNames,SetupTime). close(Socket) -> - inet_tls_dist:close(Socket). + inet_tls_dist:gen_close(inet6_tcp, Socket). diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl index 0da4b3587f..47f400da6f 100644 --- a/lib/ssl/src/inet_tls_dist.erl +++ b/lib/ssl/src/inet_tls_dist.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2011-2016. All Rights Reserved. +%% Copyright Ericsson AB 2011-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -21,17 +21,31 @@ %% -module(inet_tls_dist). --export([childspecs/0, listen/1, accept/1, accept_connection/5, +-export([childspecs/0]). +-export([listen/1, accept/1, accept_connection/5, setup/5, close/1, select/1, is_node_name/1]). %% Generalized dist API -export([gen_listen/2, gen_accept/2, gen_accept_connection/6, - gen_setup/6, gen_select/2]). + gen_setup/6, gen_close/2, gen_select/2]). + +-export([split_node/1, nodelay/0]). -include_lib("kernel/include/net_address.hrl"). -include_lib("kernel/include/dist.hrl"). -include_lib("kernel/include/dist_util.hrl"). +%% -undef(trace). +%% -define(trace(Fmt,Args), +%% erlang:display( +%% [erlang:convert_time_unit( +%% erlang:monotonic_time() +%% - erlang:system_info(start_time), native, microsecond), +%% node(), +%% lists:flatten(io_lib:format(Fmt, Args))])). + +%% ------------------------------------------------------------------------- + childspecs() -> {ok, [{ssl_dist_sup,{ssl_dist_sup, start_link, []}, permanent, infinity, supervisor, [ssl_dist_sup]}]}. @@ -40,51 +54,186 @@ select(Node) -> gen_select(inet_tcp, Node). gen_select(Driver, Node) -> - case split_node(atom_to_list(Node), $@, []) of - [_, Host] -> - case inet:getaddr(Host, Driver:family()) of + case split_node(Node) of + false -> + false; + Host -> + case Driver:getaddr(Host) of {ok, _} -> true; _ -> false - end; - _ -> - false + end end. -is_node_name(Node) when is_atom(Node) -> - select(Node); -is_node_name(_) -> - false. +%% ------------------------------------------------------------------------- + +is_node_name(Node) -> + case split_node(Node) of + false -> + false; + _Host -> + true + end. + +%% ------------------------------------------------------------------------- listen(Name) -> gen_listen(inet_tcp, Name). gen_listen(Driver, Name) -> - ssl_tls_dist_proxy:listen(Driver, Name). + case inet_tcp_dist:gen_listen(Driver, Name) of + {ok, {Socket, Address, Creation}} -> + inet:setopts(Socket, [{packet, 4}]), + {ok, {Socket, Address#net_address{protocol=tls}, Creation}}; + Other -> + Other + end. + +%% ------------------------------------------------------------------------- accept(Listen) -> gen_accept(inet_tcp, Listen). gen_accept(Driver, Listen) -> - ssl_tls_dist_proxy:accept(Driver, Listen). + Kernel = self(), + spawn_opt( + fun () -> + accept_loop(Driver, Listen, Kernel) + end, + [link, {priority, max}]). + +accept_loop(Driver, Listen, Kernel) -> + ?trace("~p~n",[{?MODULE, accept_loop, self()}]), + case Driver:accept(Listen) of + {ok, Socket} -> + Opts = get_ssl_options(server), + wait_for_code_server(), + case ssl:ssl_accept( + Socket, [{active, false}, {packet, 4}] ++ Opts) of + {ok, SslSocket} -> + DistCtrl = ssl_tls_dist_ctrl:start(SslSocket), + ?trace("~p~n", + [{?MODULE, accept_loop, accepted, + SslSocket, DistCtrl, self()}]), + ok = ssl:controlling_process(SslSocket, DistCtrl), + Kernel ! + {accept, self(), DistCtrl, Driver:family(), tls}, + receive + {Kernel, controller, Pid} -> + ?trace("~p~n", + [{?MODULE, accept_loop, + controller, self()}]), + ssl_tls_dist_ctrl:set_supervisor(DistCtrl, Pid), + Pid ! {self(), controller}; + {Kernel, unsupported_protocol} -> + ?trace("~p~n", + [{?MODULE, accept_loop, + unsupported_protocol, self()}]), + exit(unsupported_protocol) + end, + accept_loop(Driver, Listen, Kernel); + {error, {options, _}} = Error -> + %% Bad options: that's probably our fault. + %% Let's log that. + error_logger:error_msg( + "Cannot accept TLS distribution connection: ~s~n", + [ssl:format_error(Error)]), + gen_tcp:close(Socket); + _ -> + gen_tcp:close(Socket) + end; + Error -> + exit(Error) + end, + accept_loop(Driver, Listen, Kernel). + +wait_for_code_server() -> + %% This is an ugly hack. Upgrading a socket to TLS requires the + %% crypto module to be loaded. Loading the crypto module triggers + %% its on_load function, which calls code:priv_dir/1 to find the + %% directory where its NIF library is. However, distribution is + %% started earlier than the code server, so the code server is not + %% necessarily started yet, and code:priv_dir/1 might fail because + %% of that, if we receive an incoming connection on the + %% distribution port early enough. + %% + %% If the on_load function of a module fails, the module is + %% unloaded, and the function call that triggered loading it fails + %% with 'undef', which is rather confusing. + %% + %% Thus, the ssl_tls_dist_proxy process will terminate, and be + %% restarted by ssl_dist_sup. However, it won't have any memory + %% of being asked by net_kernel to listen for incoming + %% connections. Hence, the node will believe that it's open for + %% distribution, but it actually isn't. + %% + %% So let's avoid that by waiting for the code server to start. + case whereis(code_server) of + undefined -> + timer:sleep(10), + wait_for_code_server(); + Pid when is_pid(Pid) -> + ok + end. + +%% ------------------------------------------------------------------------- -accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) -> - gen_accept_connection(inet_tcp, AcceptPid, Socket, MyNode, Allowed, SetupTime). +accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + gen_accept_connection( + inet_tcp, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime). -gen_accept_connection(Driver, AcceptPid, Socket, MyNode, Allowed, SetupTime) -> +gen_accept_connection( + Driver, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> Kernel = self(), - spawn_link(fun() -> do_accept(Driver, Kernel, AcceptPid, Socket, - MyNode, Allowed, SetupTime) end). + spawn_opt( + fun() -> + do_accept( + Driver, Kernel, AcceptPid, DistCtrl, + MyNode, Allowed, SetupTime) + end, + [link, {priority, max}]). + +do_accept(Driver, Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + receive + {AcceptPid, controller} -> + Timer = dist_util:start_timer(SetupTime), + case check_ip(Driver, DistCtrl) of + true -> + HSData0 = ssl_tls_dist_ctrl:hs_data_common(DistCtrl), + HSData = + HSData0#hs_data{ + kernel_pid = Kernel, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + allowed = Allowed}, + dist_util:handshake_other_started(HSData); + {false,IP} -> + error_logger:error_msg( + "** Connection attempt from " + "disallowed IP ~w ** ~n", [IP]), + ?shutdown(no_node) + end + end. -setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> - gen_setup(inet_tcp, Node, Type, MyNode, LongOrShortNames,SetupTime). -gen_setup(Driver, Node, Type, MyNode, LongOrShortNames,SetupTime) -> + +setup(Node, Type, MyNode, LongOrShortNames, SetupTime) -> + gen_setup(inet_tcp, Node, Type, MyNode, LongOrShortNames, SetupTime). + +gen_setup(Driver, Node, Type, MyNode, LongOrShortNames, SetupTime) -> Kernel = self(), - spawn_opt(fun() -> do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) end, [link, {priority, max}]). - + spawn_opt( + fun() -> + do_setup( + Driver, Kernel, Node, Type, + MyNode, LongOrShortNames, SetupTime) + end, + [link, {priority, max}]). + do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> [Name, Address] = splitnode(Driver, Node, LongOrShortNames), - case inet:getaddr(Address, Driver:family()) of + case Driver:getaddr(Address) of {ok, Ip} -> Timer = dist_util:start_timer(SetupTime), ErlEpmd = net_kernel:epmd_module(), @@ -92,12 +241,37 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> {port, TcpPort, Version} -> ?trace("port_please(~p) -> version ~p~n", [Node,Version]), + Opts = connect_options(get_ssl_options(client)), dist_util:reset_timer(Timer), - case ssl_tls_dist_proxy:connect(Driver, Ip, TcpPort) of - {ok, Socket} -> - HSData = connect_hs_data(Kernel, Node, MyNode, Socket, - Timer, Version, Ip, TcpPort, Address, - Type), + case ssl:connect( + Ip, TcpPort, + [binary, {active, false}, {packet, 4}, + Driver:family(), nodelay()] ++ Opts) of + {ok, SslSocket} -> + ?trace("~p~n", + [{?MODULE, do_setup, + ssl_socket, SslSocket}]), + DistCtrl = ssl_tls_dist_ctrl:start(SslSocket), + ssl_tls_dist_ctrl:set_supervisor( + DistCtrl, self()), + ok = + ssl:controlling_process( + SslSocket, DistCtrl), + HSData0 = + ssl_tls_dist_ctrl:hs_data_common(DistCtrl), + HSData = + HSData0#hs_data{ + kernel_pid = Kernel, + other_node = Node, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + other_version = Version, + request_type = Type}, + ?trace("~p~n", + [{?MODULE, do_setup, + handshake_we_started, HSData}]), dist_util:handshake_we_started(HSData); Other -> %% Other Node may have closed since @@ -105,7 +279,8 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> ?trace("other node (~p) " "closed since port_please.~n", [Node]), - ?shutdown2(Node, {shutdown, {connect_failed, Other}}) + ?shutdown2(Node, + {shutdown, {connect_failed, Other}}) end; Other -> ?trace("port_please (~p) " @@ -113,38 +288,25 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> ?shutdown2(Node, {shutdown, {port_please_failed, Other}}) end; Other -> - ?trace("inet_getaddr(~p) " - "failed (~p).~n", [Node,Other]), - ?shutdown2(Node, {shutdown, {inet_getaddr_failed, Other}}) + ?trace("~w:getaddr(~p) " + "failed (~p).~n", [Driver, Address, Other]), + ?shutdown2(Node, {shutdown, {getaddr_failed, Other}}) end. close(Socket) -> - gen_tcp:close(Socket), - ok. + gen_close(inet, Socket). + +gen_close(Driver, Socket) -> + Driver:close(Socket). -do_accept(Driver, Kernel, AcceptPid, Socket, MyNode, Allowed, SetupTime) -> - process_flag(priority, max), - receive - {AcceptPid, controller} -> - Timer = dist_util:start_timer(SetupTime), - case check_ip(Driver, Socket) of - true -> - HSData = accept_hs_data(Kernel, MyNode, Socket, Timer, Allowed), - dist_util:handshake_other_started(HSData); - {false,IP} -> - error_logger:error_msg("** Connection attempt from " - "disallowed IP ~w ** ~n", [IP]), - ?shutdown(no_node) - end - end. %% ------------------------------------------------------------ %% Do only accept new connection attempts from nodes at our %% own LAN, if the check_ip environment parameter is true. %% ------------------------------------------------------------ -check_ip(Driver, Socket) -> +check_ip(Driver, DistCtrl) -> case application:get_env(check_ip) of {ok, true} -> - case get_ifs(Socket) of + case get_ifs(DistCtrl) of {ok, IFs, IP} -> check_ip(Driver, IFs, IP); _ -> @@ -154,9 +316,19 @@ check_ip(Driver, Socket) -> true end. -get_ifs(Socket) -> +check_ip(Driver, [{OwnIP, _, Netmask}|IFs], PeerIP) -> + case {Driver:mask(Netmask, PeerIP), Driver:mask(Netmask, OwnIP)} of + {M, M} -> true; + _ -> check_ip(IFs, PeerIP) + end; +check_ip(_Driver, [], PeerIP) -> + {false, PeerIP}. + +get_ifs(DistCtrl) -> + Socket = ssl_tls_dist_ctrl:get_socket(DistCtrl), case inet:peername(Socket) of {ok, {IP, _}} -> + %% XXX this is seriously broken for IPv6 case inet:getif(Socket) of {ok, IFs} -> {ok, IFs, IP}; Error -> Error @@ -165,14 +337,6 @@ get_ifs(Socket) -> Error end. -check_ip(Driver, [{OwnIP, _, Netmask}|IFs], PeerIP) -> - case {Driver:mask(Netmask, PeerIP), Driver:mask(Netmask, OwnIP)} of - {M, M} -> true; - _ -> check_ip(IFs, PeerIP) - end; -check_ip(_Driver, [], PeerIP) -> - {false, PeerIP}. - %% If Node is illegal terminate the connection setup!! splitnode(Driver, Node, LongOrShortNames) -> @@ -196,23 +360,34 @@ check_node(Driver, Name, Node, Host, LongOrShortNames) -> {ok, _} -> [Name, Host]; _ -> - error_logger:error_msg("** System running to use " - "fully qualified " - "hostnames **~n" - "** Hostname ~s is illegal **~n", - [Host]), + error_logger:error_msg( + "** System running to use " + "fully qualified hostnames **~n" + "** Hostname ~s is illegal **~n", + [Host]), ?shutdown(Node) end; [_, _ | _] when LongOrShortNames == shortnames -> - error_logger:error_msg("** System NOT running to use fully qualified " - "hostnames **~n" - "** Hostname ~s is illegal **~n", - [Host]), + error_logger:error_msg( + "** System NOT running to use " + "fully qualified hostnames **~n" + "** Hostname ~s is illegal **~n", + [Host]), ?shutdown(Node); _ -> [Name, Host] end. +split_node(Node) when is_atom(Node) -> + case split_node(atom_to_list(Node), $@, []) of + [_, Host] -> + Host; + _ -> + false + end; +split_node(_) -> + false. +%% split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; split_node([H|T], Chr, Ack) -> @@ -220,70 +395,119 @@ split_node([H|T], Chr, Ack) -> split_node([], _, Ack) -> [lists:reverse(Ack)]. -connect_hs_data(Kernel, Node, MyNode, Socket, Timer, Version, Ip, TcpPort, Address, Type) -> - common_hs_data(Kernel, MyNode, Socket, Timer, - #hs_data{other_node = Node, - other_version = Version, - f_address = - fun(_,_) -> - #net_address{address = {Ip,TcpPort}, - host = Address, - protocol = proxy, - family = inet} - end, - request_type = Type - }). - -accept_hs_data(Kernel, MyNode, Socket, Timer, Allowed) -> - common_hs_data(Kernel, MyNode, Socket, Timer, #hs_data{ - allowed = Allowed, - f_address = fun get_remote_id/2 - }). - -common_hs_data(Kernel, MyNode, Socket, Timer, HsData) -> - HsData#hs_data{ - kernel_pid = Kernel, - this_node = MyNode, - socket = Socket, - timer = Timer, - this_flags = 0, - f_send = - fun(S,D) -> - gen_tcp:send(S,D) - end, - f_recv = - fun(S,N,T) -> - gen_tcp:recv(S,N,T) - end, - f_setopts_pre_nodeup = - fun(S) -> - inet:setopts(S, [{active, false}, {packet, 4}]) - end, - f_setopts_post_nodeup = - fun(S) -> - inet:setopts(S, [{deliver, port},{active, true}]) - end, - f_getll = - fun(S) -> - inet:getll(S) - end, - mf_tick = - fun(S) -> - gen_tcp:send(S, <<>>) - end, - mf_getstat = - fun(S) -> - {ok, Stats} = inet:getstat(S, [recv_cnt, send_cnt, send_pend]), - R = proplists:get_value(recv_cnt, Stats, 0), - W = proplists:get_value(send_cnt, Stats, 0), - P = proplists:get_value(send_pend, Stats, 0), - {ok, R,W,P} - end}. - -get_remote_id(Socket, _Node) -> - case ssl_tls_dist_proxy:get_tcp_address(Socket) of - {ok, Address} -> - Address; - {error, _Reason} -> - ?shutdown(no_node) +%% ------------------------------------------------------------------------- + +connect_options(Opts) -> + case application:get_env(kernel, inet_dist_connect_options) of + {ok,ConnectOpts} -> + lists:ukeysort(1, ConnectOpts ++ Opts); + _ -> + Opts + end. + +%% we may not always want the nodelay behaviour +%% for performance reasons +nodelay() -> + case application:get_env(kernel, dist_nodelay) of + undefined -> + {nodelay, true}; + {ok, true} -> + {nodelay, true}; + {ok, false} -> + {nodelay, false}; + _ -> + {nodelay, true} + end. + + +get_ssl_options(Type) -> + case init:get_argument(ssl_dist_opt) of + {ok, Args} -> + [{erl_dist, true} | ssl_options(Type, lists:append(Args))]; + _ -> + [{erl_dist, true}] + end. + +ssl_options(_,[]) -> + []; +ssl_options(server, ["client_" ++ _, _Value |T]) -> + ssl_options(server,T); +ssl_options(client, ["server_" ++ _, _Value|T]) -> + ssl_options(client,T); +ssl_options(server, ["server_certfile", Value|T]) -> + [{certfile, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_certfile", Value | T]) -> + [{certfile, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_cacertfile", Value|T]) -> + [{cacertfile, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_cacertfile", Value|T]) -> + [{cacertfile, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_keyfile", Value|T]) -> + [{keyfile, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_keyfile", Value|T]) -> + [{keyfile, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_password", Value|T]) -> + [{password, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_password", Value|T]) -> + [{password, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_verify", Value|T]) -> + [{verify, atomize(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_verify", Value|T]) -> + [{verify, atomize(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_verify_fun", Value|T]) -> + [{verify_fun, verify_fun(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_verify_fun", Value|T]) -> + [{verify_fun, verify_fun(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_crl_check", Value|T]) -> + [{crl_check, atomize(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_crl_check", Value|T]) -> + [{crl_check, atomize(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_crl_cache", Value|T]) -> + [{crl_cache, termify(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_crl_cache", Value|T]) -> + [{crl_cache, termify(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_reuse_sessions", Value|T]) -> + [{reuse_sessions, atomize(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_reuse_sessions", Value|T]) -> + [{reuse_sessions, atomize(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_secure_renegotiate", Value|T]) -> + [{secure_renegotiate, atomize(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_secure_renegotiate", Value|T]) -> + [{secure_renegotiate, atomize(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_depth", Value|T]) -> + [{depth, list_to_integer(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_depth", Value|T]) -> + [{depth, list_to_integer(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_hibernate_after", Value|T]) -> + [{hibernate_after, list_to_integer(Value)} | ssl_options(server,T)]; +ssl_options(client, ["client_hibernate_after", Value|T]) -> + [{hibernate_after, list_to_integer(Value)} | ssl_options(client,T)]; +ssl_options(server, ["server_ciphers", Value|T]) -> + [{ciphers, Value} | ssl_options(server,T)]; +ssl_options(client, ["client_ciphers", Value|T]) -> + [{ciphers, Value} | ssl_options(client,T)]; +ssl_options(server, ["server_dhfile", Value|T]) -> + [{dhfile, Value} | ssl_options(server,T)]; +ssl_options(server, ["server_fail_if_no_peer_cert", Value|T]) -> + [{fail_if_no_peer_cert, atomize(Value)} | ssl_options(server,T)]; +ssl_options(Type, Opts) -> + error(malformed_ssl_dist_opt, [Type, Opts]). + +atomize(List) when is_list(List) -> + list_to_atom(List); +atomize(Atom) when is_atom(Atom) -> + Atom. + +termify(String) when is_list(String) -> + {ok, Tokens, _} = erl_scan:string(String ++ "."), + {ok, Term} = erl_parse:parse_term(Tokens), + Term. + +verify_fun(Value) -> + case termify(Value) of + {Mod, Func, State} when is_atom(Mod), is_atom(Func) -> + Fun = fun Mod:Func/3, + {Fun, State}; + _ -> + error(malformed_ssl_dist_opt, [Value]) end. diff --git a/lib/ssl/src/ssl.app.src b/lib/ssl/src/ssl.app.src index 064dcd6892..a5286b6747 100644 --- a/lib/ssl/src/ssl.app.src +++ b/lib/ssl/src/ssl.app.src @@ -37,7 +37,7 @@ %% Erlang Distribution over SSL/TLS inet_tls_dist, inet6_tls_dist, - ssl_tls_dist_proxy, + ssl_tls_dist_ctrl, ssl_dist_sup, ssl_dist_connection_sup, ssl_dist_admin_sup, diff --git a/lib/ssl/src/ssl_dist_sup.erl b/lib/ssl/src/ssl_dist_sup.erl index 690b896919..c241a9bced 100644 --- a/lib/ssl/src/ssl_dist_sup.erl +++ b/lib/ssl/src/ssl_dist_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2011-2016. All Rights Reserved. +%% Copyright Ericsson AB 2011-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -46,8 +46,7 @@ start_link() -> init([]) -> AdminSup = ssl_admin_child_spec(), ConnectionSup = ssl_connection_sup(), - ProxyServer = proxy_server_child_spec(), - {ok, {{one_for_all, 10, 3600}, [AdminSup, ProxyServer, ConnectionSup]}}. + {ok, {{one_for_all, 10, 3600}, [AdminSup, ConnectionSup]}}. %%-------------------------------------------------------------------- %%% Internal functions @@ -69,12 +68,3 @@ ssl_connection_sup() -> Modules = [ssl_connection_sup], Type = supervisor, {Name, StartFunc, Restart, Shutdown, Type, Modules}. - -proxy_server_child_spec() -> - Name = ssl_tls_dist_proxy, - StartFunc = {ssl_tls_dist_proxy, start_link, []}, - Restart = permanent, - Shutdown = 4000, - Modules = [ssl_tls_dist_proxy], - Type = worker, - {Name, StartFunc, Restart, Shutdown, Type, Modules}. diff --git a/lib/ssl/src/ssl_tls_dist_ctrl.erl b/lib/ssl/src/ssl_tls_dist_ctrl.erl new file mode 100644 index 0000000000..cbb39e71ec --- /dev/null +++ b/lib/ssl/src/ssl_tls_dist_ctrl.erl @@ -0,0 +1,380 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(ssl_tls_dist_ctrl). + +-include_lib("kernel/include/dist.hrl"). +-include_lib("kernel/include/dist_util.hrl"). +-include_lib("kernel/include/net_address.hrl"). + +-export([start/1, get_socket/1, set_supervisor/2, hs_data_common/1]). + +%%% ------------------------------------------------------------ + +%% In order to avoid issues with lingering signal binaries +%% we enable off-heap message queue data as well as fullsweep +%% after 0. The fullsweeps will be cheap since we have more +%% or less no live data. +common_spawn_opts() -> + [{message_queue_data, off_heap}, + {fullsweep_after, 0}]. + +%%% ------------------------------------------------------------ + +start(SslSocket) -> + spawn_opt( + fun () -> + setup(SslSocket) + end, + [{priority, max}] ++ common_spawn_opts()). + +get_socket(DistCtrl) -> + call(DistCtrl, get_socket). + +set_supervisor(DistCtrl, Pid) -> + call(DistCtrl, {set_supervisor, Pid}). + +hs_data_common(DistCtrl) -> + TickHandler = call(DistCtrl, tick_handler), + SslSocket = get_socket(DistCtrl), + #hs_data{ + f_send = + fun (Ctrl, Packet) when Ctrl == DistCtrl -> + call(Ctrl, {send, Packet}) + end, + f_recv = + fun (Ctrl, Length, Timeout) when Ctrl == DistCtrl -> + case call(Ctrl, {recv, Length, Timeout}) of + {ok, Bin} when is_binary(Bin) -> + {ok, binary_to_list(Bin)}; + Other -> + Other + end + end, + f_setopts_pre_nodeup = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, pre_nodeup) + end, + f_setopts_post_nodeup = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, post_nodeup) + end, + f_getll = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, getll) + end, + f_handshake_complete = + fun (Ctrl, Node, DHandle) when Ctrl == DistCtrl -> + call(Ctrl, {handshake_complete, Node, DHandle}) + end, + f_address = + fun (Ctrl, Node) when Ctrl == DistCtrl -> + case call(Ctrl, {check_address, Node}) of + {error, no_node} -> + %% No '@' or more than one '@' in node name. + ?shutdown(no_node); + Res -> + Res + end + end, + mf_setopts = + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + case setopts_filter(Opts) of + [] -> + ssl:setopts(SslSocket, Opts); + Opts1 -> + {error, {badopts,Opts1}} + end + end, + mf_getopts = + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + ssl:getopts(SslSocket, Opts) + end, + mf_getstat = + fun (Ctrl) when Ctrl == DistCtrl -> + case ssl:getstat( + SslSocket, [recv_cnt, send_cnt, send_pend]) of + {ok, Stat} -> + split_stat(Stat,0,0,0); + Error -> + Error + end + end, + mf_tick = + fun (Ctrl) when Ctrl == DistCtrl -> + TickHandler ! tick + end}. + +%%% ------------------------------------------------------------ + +call(DistCtrl, Msg) -> + Ref = erlang:monitor(process, DistCtrl), + DistCtrl ! {Ref, self(), Msg}, + receive + {Ref, Res} -> + erlang:demonitor(Ref, [flush]), + Res; + {'DOWN', Ref, process, DistCtrl, Reason} -> + exit({dist_controller_exit, Reason}) + end. + +setopts_filter(Opts) -> + [Opt || {K,_} = Opt <- Opts, + K =:= active orelse K =:= deliver orelse K =:= packet]. + +split_stat([{recv_cnt, R}|Stat], _, W, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_cnt, W}|Stat], R, _, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_pend, P}|Stat], R, W, _) -> + split_stat(Stat, R, W, P); +split_stat([], R, W, P) -> + {ok, R, W, P}. + +%%% ------------------------------------------------------------ +%%% Distribution controller processes +%%% ------------------------------------------------------------ + +%% +%% There will be five parties working together when the +%% connection is up: +%% - The SSL socket. Providing a TLS connection +%% to the other node. +%% - The output handler. It will dispatch all outgoing +%% traffic from the VM to the gen_tcp socket. This +%% process is registered as distribution controller +%% for this channel with the VM. +%% - The input handler. It will dispatch all incoming +%% traffic from the gen_tcp socket to the VM. This +%% process is also the socket owner and receives +%% incoming traffic using active-N. +%% - The tick handler. Dispatches asynchronous tick +%% requests to the socket. It executes on max priority +%% since it is important to get ticks through to the +%% other end. +%% - The channel supervisor (provided by dist_util). It +%% monitors traffic. Issue tick requests to the tick +%% handler when no outgoing traffic is seen and bring +%% the connection down if no incoming traffic is seen. +%% This process also executes on max priority. +%% +%% These parties are linked togheter so should one +%% of them fail, all of them are terminated and the +%% connection is taken down. +%% + + +%% +%% The tick handler process writes a tick to the +%% socket when it receives a 'tick' message from +%% the connection supervisor. +%% +%% We are not allowed to block the connection +%% superviser when writing a tick and we also want +%% the tick to go through even during a heavily +%% loaded system. gen_tcp does not have a +%% non-blocking send operation exposed in its API +%% and we don't want to run the distribution +%% controller under high priority. Therefore this +%% separate process with max prio that dispatches +%% ticks. +%% +tick_handler(SslSocket) -> + receive + tick -> + %% May block due to busy port... + sock_send(SslSocket, ""), + flush_ticks(SslSocket); + _ -> + tick_handler(SslSocket) + end. + +flush_ticks(SslSocket) -> + receive + tick -> + flush_ticks(SslSocket) + after 0 -> + tick_handler(SslSocket) + end. + +setup(SslSocket) -> + TickHandler = + spawn_opt( + fun () -> + tick_handler(SslSocket) + end, + [link, {priority, max}] ++ common_spawn_opts()), + setup_loop(SslSocket, TickHandler, undefined). + +%% +%% During the handshake phase we loop in setup(). +%% When the connection is up we spawn an input handler and +%% continue as output handler. +%% +setup_loop(SslSocket, TickHandler, Sup) -> + receive + {ssl_closed, SslSocket} -> + exit(connection_closed); + {ssl_error, SslSocket} -> + exit(connection_closed); + + {Ref, From, {set_supervisor, Pid}} -> + Res = link(Pid), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Pid); + + {Ref, From, tick_handler} -> + From ! {Ref, TickHandler}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, get_socket} -> + From ! {Ref, SslSocket}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {send, Packet}} -> + Res = ssl:send(SslSocket, Packet), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {recv, Length, Timeout}} -> + Res = ssl:recv(SslSocket, Length, Timeout), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {check_address, Node}} -> + Res = + case ssl:peername(SslSocket) of + {ok, Address} -> + case inet_tls_dist:split_node(Node) of + false -> + {error, no_node}; + Host -> + #net_address{ + address=Address, host=Host, + protocol=tls, family=inet} + end + end, + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, pre_nodeup} -> + Res = + ssl:setopts( + SslSocket, + [{active, false}, {packet, 4}, inet_tls_dist:nodelay()]), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, post_nodeup} -> + Res = + ssl:setopts( + SslSocket, + [{active, once}, {packet, 4}, inet_tls_dist:nodelay()]), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, getll} -> + From ! {Ref, {ok, self()}}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {handshake_complete, _Node, DHandle}} -> + From ! {Ref, ok}, + %% Handshake complete! Begin dispatching traffic... + %% From now on we execute on normal priority + process_flag(priority, normal), + erlang:dist_ctrl_get_data_notification(DHandle), + loop(DHandle, SslSocket) + end. + +loop(DHandle, SslSocket) -> + receive + dist_data -> + %% Outgoing data from this node... + try send_data(DHandle, SslSocket) + catch _ : _ -> death_row() + end, + loop(DHandle, SslSocket); + + {send, From, Ref, Data} -> + %% This is for testing only! + %% + %% Needed by some OTP distribution + %% test suites... + sock_send(SslSocket, Data), + From ! {Ref, ok}, + loop(DHandle, SslSocket); + + {ssl_closed, SslSocket} -> + %% Connection to remote node terminated... + exit(connection_closed); + {ssl_error, SslSocket, _Reason} -> + %% Connection to remote node terminated... + exit(connection_closed); + {ssl, SslSocket, Data} -> + %% Incoming data from remote node... + ok = ssl:setopts(SslSocket, [{active, once}]), + try erlang:dist_ctrl_put_data(DHandle, Data) + catch _ : _ -> death_row() + end, + loop(DHandle, SslSocket); + + _ -> + %% Drop garbage message... + loop(DHandle, SslSocket) + end. + +send_data(DHandle, SslSocket) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + erlang:dist_ctrl_get_data_notification(DHandle); + Data -> + sock_send(SslSocket, Data), + send_data(DHandle, SslSocket) + end. + +sock_send(SslSocket, Data) -> + try ssl:send(SslSocket, Data) of + ok -> ok; + {error, Reason} -> + death_row({send_error, Reason}) + catch + Type:Reason -> + death_row({send_error, {Type, Reason}}) + end. + +death_row() -> + death_row(connection_closed). + +death_row(normal) -> + %% We do not want to exit with normal + %% exit reason since it wont bring down + %% linked processes... + death_row(); +death_row(Reason) -> + %% When the connection is on its way down operations + %% begin to fail. We catch the failures and call + %% this function waiting for termination. We should + %% be terminated by one of our links to the other + %% involved parties that began bringing the + %% connection down. By waiting for termination we + %% avoid altering the exit reason for the connection + %% teardown. We however limit the wait to 5 seconds + %% and bring down the connection ourselves if not + %% terminated... + receive after 5000 -> exit(Reason) end. -- cgit v1.2.3 From 278cc4e5a2552397437999f482bcc4237ed36d35 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Thu, 14 Sep 2017 16:11:27 +0200 Subject: Separate in and out in dist ctrl --- lib/ssl/src/inet_tls_dist.erl | 8 +++-- lib/ssl/src/ssl_tls_dist_ctrl.erl | 69 ++++++++++++++++++++++++++------------- 2 files changed, 51 insertions(+), 26 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl index 47f400da6f..65dab189a3 100644 --- a/lib/ssl/src/inet_tls_dist.erl +++ b/lib/ssl/src/inet_tls_dist.erl @@ -345,11 +345,13 @@ splitnode(Driver, Node, LongOrShortNames) -> Host = lists:append(Tail), check_node(Driver, Name, Node, Host, LongOrShortNames); [_] -> - error_logger:error_msg("** Nodename ~p illegal, no '@' character **~n", - [Node]), + error_logger:error_msg( + "** Nodename ~p illegal, no '@' character **~n", + [Node]), ?shutdown(Node); _ -> - error_logger:error_msg("** Nodename ~p illegal **~n", [Node]), + error_logger:error_msg( + "** Nodename ~p illegal **~n", [Node]), ?shutdown(Node) end. diff --git a/lib/ssl/src/ssl_tls_dist_ctrl.erl b/lib/ssl/src/ssl_tls_dist_ctrl.erl index cbb39e71ec..c1d5bc689e 100644 --- a/lib/ssl/src/ssl_tls_dist_ctrl.erl +++ b/lib/ssl/src/ssl_tls_dist_ctrl.erl @@ -277,7 +277,7 @@ setup_loop(SslSocket, TickHandler, Sup) -> Res = ssl:setopts( SslSocket, - [{active, false}, {packet, 4}, inet_tls_dist:nodelay()]), + [{packet, 4}, inet_tls_dist:nodelay()]), From ! {Ref, Res}, setup_loop(SslSocket, TickHandler, Sup); @@ -285,7 +285,7 @@ setup_loop(SslSocket, TickHandler, Sup) -> Res = ssl:setopts( SslSocket, - [{active, once}, {packet, 4}, inet_tls_dist:nodelay()]), + [{packet, 4}, inet_tls_dist:nodelay()]), From ! {Ref, Res}, setup_loop(SslSocket, TickHandler, Sup); @@ -296,30 +296,33 @@ setup_loop(SslSocket, TickHandler, Sup) -> {Ref, From, {handshake_complete, _Node, DHandle}} -> From ! {Ref, ok}, %% Handshake complete! Begin dispatching traffic... + %% + %% Use a dedicated input process to push the + %% input-output-flow-control-deadlock problem + %% to the SSL implementation. + InputHandler = + spawn_opt( + fun () -> + link(Sup), + ssl:setopts(SslSocket, [{active, once}]), + receive + DHandle -> + input_loop(DHandle, SslSocket) + end + end, + [link] ++ common_spawn_opts()), + ok = ssl:controlling_process(SslSocket, InputHandler), + ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler), + InputHandler ! DHandle, + %% %% From now on we execute on normal priority process_flag(priority, normal), erlang:dist_ctrl_get_data_notification(DHandle), - loop(DHandle, SslSocket) + output_loop(DHandle, SslSocket) end. -loop(DHandle, SslSocket) -> +input_loop(DHandle, SslSocket) -> receive - dist_data -> - %% Outgoing data from this node... - try send_data(DHandle, SslSocket) - catch _ : _ -> death_row() - end, - loop(DHandle, SslSocket); - - {send, From, Ref, Data} -> - %% This is for testing only! - %% - %% Needed by some OTP distribution - %% test suites... - sock_send(SslSocket, Data), - From ! {Ref, ok}, - loop(DHandle, SslSocket); - {ssl_closed, SslSocket} -> %% Connection to remote node terminated... exit(connection_closed); @@ -330,13 +333,33 @@ loop(DHandle, SslSocket) -> %% Incoming data from remote node... ok = ssl:setopts(SslSocket, [{active, once}]), try erlang:dist_ctrl_put_data(DHandle, Data) - catch _ : _ -> death_row() + catch _:_ -> death_row() end, - loop(DHandle, SslSocket); + input_loop(DHandle, SslSocket); + _ -> + %% Drop garbage message... + input_loop(DHandle, SslSocket) + end. +output_loop(DHandle, SslSocket) -> + receive + dist_data -> + %% Outgoing data from this node... + try send_data(DHandle, SslSocket) + catch _ : _ -> death_row() + end, + output_loop(DHandle, SslSocket); + {send, From, Ref, Data} -> + %% This is for testing only! + %% + %% Needed by some OTP distribution + %% test suites... + sock_send(SslSocket, Data), + From ! {Ref, ok}, + output_loop(DHandle, SslSocket); _ -> %% Drop garbage message... - loop(DHandle, SslSocket) + output_loop(DHandle, SslSocket) end. send_data(DHandle, SslSocket) -> -- cgit v1.2.3 From cb3a074c2c191b5465205706710aa1a8e2d4e0ee Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 15 Sep 2017 09:35:27 +0200 Subject: Avoid dialyzer warning --- lib/ssl/src/ssl_tls_dist_ctrl.erl | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/ssl_tls_dist_ctrl.erl b/lib/ssl/src/ssl_tls_dist_ctrl.erl index c1d5bc689e..1504d1c1e4 100644 --- a/lib/ssl/src/ssl_tls_dist_ctrl.erl +++ b/lib/ssl/src/ssl_tls_dist_ctrl.erl @@ -381,14 +381,16 @@ sock_send(SslSocket, Data) -> death_row({send_error, {Type, Reason}}) end. +-spec death_row() -> no_return(). death_row() -> death_row(connection_closed). -death_row(normal) -> - %% We do not want to exit with normal - %% exit reason since it wont bring down - %% linked processes... - death_row(); +-spec death_row(term()) -> no_return(). +%% death_row(normal) -> +%% %% We do not want to exit with normal +%% %% exit reason since it wont bring down +%% %% linked processes... +%% death_row(); death_row(Reason) -> %% When the connection is on its way down operations %% begin to fail. We catch the failures and call -- cgit v1.2.3 From 275da9e0e7f876ec7c9b9fe3405f1ca40fdbbd17 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 15 Sep 2017 11:24:28 +0200 Subject: Remove ssl_tls_dist_proxy --- lib/ssl/src/inet_tls_dist.erl | 2 +- lib/ssl/src/ssl_tls_dist_proxy.erl | 479 ------------------------------------- 2 files changed, 1 insertion(+), 480 deletions(-) delete mode 100644 lib/ssl/src/ssl_tls_dist_proxy.erl (limited to 'lib') diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl index 65dab189a3..ef2a608b3c 100644 --- a/lib/ssl/src/inet_tls_dist.erl +++ b/lib/ssl/src/inet_tls_dist.erl @@ -160,7 +160,7 @@ wait_for_code_server() -> %% unloaded, and the function call that triggered loading it fails %% with 'undef', which is rather confusing. %% - %% Thus, the ssl_tls_dist_proxy process will terminate, and be + %% Thus, the ssl_tls_dist_ctrl process will terminate, and be %% restarted by ssl_dist_sup. However, it won't have any memory %% of being asked by net_kernel to listen for incoming %% connections. Hence, the node will believe that it's open for diff --git a/lib/ssl/src/ssl_tls_dist_proxy.erl b/lib/ssl/src/ssl_tls_dist_proxy.erl deleted file mode 100644 index 08947f24dd..0000000000 --- a/lib/ssl/src/ssl_tls_dist_proxy.erl +++ /dev/null @@ -1,479 +0,0 @@ -%% -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 2011-2016. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% %CopyrightEnd% -%% --module(ssl_tls_dist_proxy). - - --export([listen/2, accept/2, connect/3, get_tcp_address/1]). --export([init/1, start_link/0, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, ssl_options/2]). - --include_lib("kernel/include/net_address.hrl"). - --record(state, - {listen, - accept_loop - }). - --define(PPRE, 4). --define(PPOST, 4). - - -%%==================================================================== -%% Internal application API -%%==================================================================== - -listen(Driver, Name) -> - gen_server:call(?MODULE, {listen, Driver, Name}, infinity). - -accept(Driver, Listen) -> - gen_server:call(?MODULE, {accept, Driver, Listen}, infinity). - -connect(Driver, Ip, Port) -> - gen_server:call(?MODULE, {connect, Driver, Ip, Port}, infinity). - - -do_listen(Options) -> - {First,Last} = case application:get_env(kernel,inet_dist_listen_min) of - {ok,N} when is_integer(N) -> - case application:get_env(kernel, - inet_dist_listen_max) of - {ok,M} when is_integer(M) -> - {N,M}; - _ -> - {N,N} - end; - _ -> - {0,0} - end, - do_listen(First, Last, listen_options([{backlog,128}|Options])). - -do_listen(First,Last,_) when First > Last -> - {error,eaddrinuse}; -do_listen(First,Last,Options) -> - case gen_tcp:listen(First, Options) of - {error, eaddrinuse} -> - do_listen(First+1,Last,Options); - Other -> - Other - end. - -listen_options(Opts0) -> - Opts1 = - case application:get_env(kernel, inet_dist_use_interface) of - {ok, Ip} -> - [{ip, Ip} | Opts0]; - _ -> - Opts0 - end, - case application:get_env(kernel, inet_dist_listen_options) of - {ok,ListenOpts} -> - ListenOpts ++ Opts1; - _ -> - Opts1 - end. - -connect_options(Opts) -> - case application:get_env(kernel, inet_dist_connect_options) of - {ok,ConnectOpts} -> - lists:ukeysort(1, ConnectOpts ++ Opts); - _ -> - Opts - end. - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -init([]) -> - process_flag(priority, max), - {ok, #state{}}. - -handle_call({listen, Driver, Name}, _From, State) -> - case gen_tcp:listen(0, [{active, false}, {packet,?PPRE}, {ip, loopback}]) of - {ok, Socket} -> - {ok, World} = do_listen([{active, false}, binary, {packet,?PPRE}, {reuseaddr, true}, - Driver:family()]), - {ok, TcpAddress} = get_tcp_address(Socket), - {ok, WorldTcpAddress} = get_tcp_address(World), - {_,Port} = WorldTcpAddress#net_address.address, - ErlEpmd = net_kernel:epmd_module(), - case ErlEpmd:register_node(Name, Port, Driver) of - {ok, Creation} -> - {reply, {ok, {Socket, TcpAddress, Creation}}, - State#state{listen={Socket, World}}}; - {error, _} = Error -> - {reply, Error, State} - end; - Error -> - {reply, Error, State} - end; - -handle_call({accept, _Driver, Listen}, {From, _}, State = #state{listen={_, World}}) -> - Self = self(), - ErtsPid = spawn_link(fun() -> accept_loop(Self, erts, Listen, From) end), - WorldPid = spawn_link(fun() -> accept_loop(Self, world, World, Listen) end), - {reply, ErtsPid, State#state{accept_loop={ErtsPid, WorldPid}}}; - -handle_call({connect, Driver, Ip, Port}, {From, _}, State) -> - Me = self(), - Pid = spawn_link(fun() -> setup_proxy(Driver, Ip, Port, Me) end), - receive - {Pid, go_ahead, LPort} -> - Res = {ok, Socket} = try_connect(LPort), - case gen_tcp:controlling_process(Socket, From) of - {error, badarg} = Error -> {reply, Error, State}; % From is dead anyway. - ok -> - flush_old_controller(From, Socket), - {reply, Res, State} - end; - {Pid, Error} -> - {reply, Error, State} - end; - -handle_call(_What, _From, State) -> - {reply, ok, State}. - -handle_cast(_What, State) -> - {noreply, State}. - -handle_info(_What, State) -> - {noreply, State}. - -terminate(_Reason, _St) -> - ok. - -code_change(_OldVsn, St, _Extra) -> - {ok, St}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- -get_tcp_address(Socket) -> - case inet:sockname(Socket) of - {ok, Address} -> - {ok, Host} = inet:gethostname(), - NetAddress = #net_address{ - address = Address, - host = Host, - protocol = proxy, - family = inet - }, - {ok, NetAddress}; - {error, _} = Error -> Error - end. - -accept_loop(Proxy, erts = Type, Listen, Extra) -> - process_flag(priority, max), - case gen_tcp:accept(Listen) of - {ok, Socket} -> - Extra ! {accept,self(),Socket,inet,proxy}, - receive - {_Kernel, controller, Pid} -> - inet:setopts(Socket, [nodelay()]), - ok = gen_tcp:controlling_process(Socket, Pid), - flush_old_controller(Pid, Socket), - Pid ! {self(), controller}; - {_Kernel, unsupported_protocol} -> - exit(unsupported_protocol) - end; - {error, closed} -> - %% The listening socket is closed: the proxy process is - %% shutting down. Exit normally, to avoid generating a - %% spurious error report. - exit(normal); - Error -> - exit(Error) - end, - accept_loop(Proxy, Type, Listen, Extra); - -accept_loop(Proxy, world = Type, Listen, Extra) -> - process_flag(priority, max), - case gen_tcp:accept(Listen) of - {ok, Socket} -> - Opts = get_ssl_options(server), - wait_for_code_server(), - case ssl:ssl_accept(Socket, Opts) of - {ok, SslSocket} -> - PairHandler = - spawn_link(fun() -> - setup_connection(SslSocket, Extra) - end), - ok = ssl:controlling_process(SslSocket, PairHandler), - flush_old_controller(PairHandler, SslSocket); - {error, {options, _}} = Error -> - %% Bad options: that's probably our fault. Let's log that. - error_logger:error_msg("Cannot accept TLS distribution connection: ~s~n", - [ssl:format_error(Error)]), - gen_tcp:close(Socket); - _ -> - gen_tcp:close(Socket) - end; - Error -> - exit(Error) - end, - accept_loop(Proxy, Type, Listen, Extra). - -wait_for_code_server() -> - %% This is an ugly hack. Upgrading a socket to TLS requires the - %% crypto module to be loaded. Loading the crypto module triggers - %% its on_load function, which calls code:priv_dir/1 to find the - %% directory where its NIF library is. However, distribution is - %% started earlier than the code server, so the code server is not - %% necessarily started yet, and code:priv_dir/1 might fail because - %% of that, if we receive an incoming connection on the - %% distribution port early enough. - %% - %% If the on_load function of a module fails, the module is - %% unloaded, and the function call that triggered loading it fails - %% with 'undef', which is rather confusing. - %% - %% Thus, the ssl_tls_dist_proxy process will terminate, and be - %% restarted by ssl_dist_sup. However, it won't have any memory - %% of being asked by net_kernel to listen for incoming - %% connections. Hence, the node will believe that it's open for - %% distribution, but it actually isn't. - %% - %% So let's avoid that by waiting for the code server to start. - case whereis(code_server) of - undefined -> - timer:sleep(10), - wait_for_code_server(); - Pid when is_pid(Pid) -> - ok - end. - -try_connect(Port) -> - case gen_tcp:connect({127,0,0,1}, Port, [{active, false}, {packet,?PPRE}, nodelay()]) of - R = {ok, _S} -> - R; - {error, _R} -> - try_connect(Port) - end. - -setup_proxy(Driver, Ip, Port, Parent) -> - process_flag(trap_exit, true), - Opts = connect_options(get_ssl_options(client)), - case ssl:connect(Ip, Port, [{active, true}, binary, {packet,?PPRE}, nodelay(), - Driver:family()] ++ Opts) of - {ok, World} -> - {ok, ErtsL} = gen_tcp:listen(0, [{active, true}, {ip, loopback}, binary, {packet,?PPRE}]), - {ok, #net_address{address={_,LPort}}} = get_tcp_address(ErtsL), - Parent ! {self(), go_ahead, LPort}, - case gen_tcp:accept(ErtsL) of - {ok, Erts} -> - %% gen_tcp:close(ErtsL), - loop_conn_setup(World, Erts); - Err -> - Parent ! {self(), Err} - end; - {error, {options, _}} = Err -> - %% Bad options: that's probably our fault. Let's log that. - error_logger:error_msg("Cannot open TLS distribution connection: ~s~n", - [ssl:format_error(Err)]), - Parent ! {self(), Err}; - Err -> - Parent ! {self(), Err} - end. - - -%% we may not always want the nodelay behaviour -%% %% for performance reasons - -nodelay() -> - case application:get_env(kernel, dist_nodelay) of - undefined -> - {nodelay, true}; - {ok, true} -> - {nodelay, true}; - {ok, false} -> - {nodelay, false}; - _ -> - {nodelay, true} - end. - -setup_connection(World, ErtsListen) -> - process_flag(trap_exit, true), - {ok, TcpAddress} = get_tcp_address(ErtsListen), - {_Addr,Port} = TcpAddress#net_address.address, - {ok, Erts} = gen_tcp:connect({127,0,0,1}, Port, [{active, true}, binary, {packet,?PPRE}, nodelay()]), - ssl:setopts(World, [{active,true}, {packet,?PPRE}, nodelay()]), - loop_conn_setup(World, Erts). - -loop_conn_setup(World, Erts) -> - receive - {ssl, World, Data = <<$a, _/binary>>} -> - gen_tcp:send(Erts, Data), - ssl:setopts(World, [{packet,?PPOST}, nodelay()]), - inet:setopts(Erts, [{packet,?PPOST}, nodelay()]), - loop_conn(World, Erts); - {tcp, Erts, Data = <<$a, _/binary>>} -> - ssl:send(World, Data), - ssl:setopts(World, [{packet,?PPOST}, nodelay()]), - inet:setopts(Erts, [{packet,?PPOST}, nodelay()]), - loop_conn(World, Erts); - {ssl, World, Data = <<_, _/binary>>} -> - gen_tcp:send(Erts, Data), - loop_conn_setup(World, Erts); - {tcp, Erts, Data = <<_, _/binary>>} -> - ssl:send(World, Data), - loop_conn_setup(World, Erts); - {ssl, World, Data} -> - gen_tcp:send(Erts, Data), - loop_conn_setup(World, Erts); - {tcp, Erts, Data} -> - ssl:send(World, Data), - loop_conn_setup(World, Erts); - {tcp_closed, Erts} -> - ssl:close(World); - {ssl_closed, World} -> - gen_tcp:close(Erts); - {ssl_error, World, _} -> - - ssl:close(World) - end. - -loop_conn(World, Erts) -> - receive - {ssl, World, Data} -> - gen_tcp:send(Erts, Data), - loop_conn(World, Erts); - {tcp, Erts, Data} -> - ssl:send(World, Data), - loop_conn(World, Erts); - {tcp_closed, Erts} -> - ssl:close(World); - {ssl_closed, World} -> - gen_tcp:close(Erts); - {ssl_error, World, _} -> - ssl:close(World) - end. - -get_ssl_options(Type) -> - case init:get_argument(ssl_dist_opt) of - {ok, Args} -> - [{erl_dist, true} | ssl_options(Type, lists:append(Args))]; - _ -> - [{erl_dist, true}] - end. - -ssl_options(_,[]) -> - []; -ssl_options(server, ["client_" ++ _, _Value |T]) -> - ssl_options(server,T); -ssl_options(client, ["server_" ++ _, _Value|T]) -> - ssl_options(client,T); -ssl_options(server, ["server_certfile", Value|T]) -> - [{certfile, Value} | ssl_options(server,T)]; -ssl_options(client, ["client_certfile", Value | T]) -> - [{certfile, Value} | ssl_options(client,T)]; -ssl_options(server, ["server_cacertfile", Value|T]) -> - [{cacertfile, Value} | ssl_options(server,T)]; -ssl_options(client, ["client_cacertfile", Value|T]) -> - [{cacertfile, Value} | ssl_options(client,T)]; -ssl_options(server, ["server_keyfile", Value|T]) -> - [{keyfile, Value} | ssl_options(server,T)]; -ssl_options(client, ["client_keyfile", Value|T]) -> - [{keyfile, Value} | ssl_options(client,T)]; -ssl_options(server, ["server_password", Value|T]) -> - [{password, Value} | ssl_options(server,T)]; -ssl_options(client, ["client_password", Value|T]) -> - [{password, Value} | ssl_options(client,T)]; -ssl_options(server, ["server_verify", Value|T]) -> - [{verify, atomize(Value)} | ssl_options(server,T)]; -ssl_options(client, ["client_verify", Value|T]) -> - [{verify, atomize(Value)} | ssl_options(client,T)]; -ssl_options(server, ["server_verify_fun", Value|T]) -> - [{verify_fun, verify_fun(Value)} | ssl_options(server,T)]; -ssl_options(client, ["client_verify_fun", Value|T]) -> - [{verify_fun, verify_fun(Value)} | ssl_options(client,T)]; -ssl_options(server, ["server_crl_check", Value|T]) -> - [{crl_check, atomize(Value)} | ssl_options(server,T)]; -ssl_options(client, ["client_crl_check", Value|T]) -> - [{crl_check, atomize(Value)} | ssl_options(client,T)]; -ssl_options(server, ["server_crl_cache", Value|T]) -> - [{crl_cache, termify(Value)} | ssl_options(server,T)]; -ssl_options(client, ["client_crl_cache", Value|T]) -> - [{crl_cache, termify(Value)} | ssl_options(client,T)]; -ssl_options(server, ["server_reuse_sessions", Value|T]) -> - [{reuse_sessions, atomize(Value)} | ssl_options(server,T)]; -ssl_options(client, ["client_reuse_sessions", Value|T]) -> - [{reuse_sessions, atomize(Value)} | ssl_options(client,T)]; -ssl_options(server, ["server_secure_renegotiate", Value|T]) -> - [{secure_renegotiate, atomize(Value)} | ssl_options(server,T)]; -ssl_options(client, ["client_secure_renegotiate", Value|T]) -> - [{secure_renegotiate, atomize(Value)} | ssl_options(client,T)]; -ssl_options(server, ["server_depth", Value|T]) -> - [{depth, list_to_integer(Value)} | ssl_options(server,T)]; -ssl_options(client, ["client_depth", Value|T]) -> - [{depth, list_to_integer(Value)} | ssl_options(client,T)]; -ssl_options(server, ["server_hibernate_after", Value|T]) -> - [{hibernate_after, list_to_integer(Value)} | ssl_options(server,T)]; -ssl_options(client, ["client_hibernate_after", Value|T]) -> - [{hibernate_after, list_to_integer(Value)} | ssl_options(client,T)]; -ssl_options(server, ["server_ciphers", Value|T]) -> - [{ciphers, Value} | ssl_options(server,T)]; -ssl_options(client, ["client_ciphers", Value|T]) -> - [{ciphers, Value} | ssl_options(client,T)]; -ssl_options(server, ["server_dhfile", Value|T]) -> - [{dhfile, Value} | ssl_options(server,T)]; -ssl_options(server, ["server_fail_if_no_peer_cert", Value|T]) -> - [{fail_if_no_peer_cert, atomize(Value)} | ssl_options(server,T)]; -ssl_options(Type, Opts) -> - error(malformed_ssl_dist_opt, [Type, Opts]). - -atomize(List) when is_list(List) -> - list_to_atom(List); -atomize(Atom) when is_atom(Atom) -> - Atom. - -termify(String) when is_list(String) -> - {ok, Tokens, _} = erl_scan:string(String ++ "."), - {ok, Term} = erl_parse:parse_term(Tokens), - Term. - -verify_fun(Value) -> - case termify(Value) of - {Mod, Func, State} when is_atom(Mod), is_atom(Func) -> - Fun = fun Mod:Func/3, - {Fun, State}; - _ -> - error(malformed_ssl_dist_opt, [Value]) - end. - -flush_old_controller(Pid, Socket) -> - receive - {tcp, Socket, Data} -> - Pid ! {tcp, Socket, Data}, - flush_old_controller(Pid, Socket); - {tcp_closed, Socket} -> - Pid ! {tcp_closed, Socket}, - flush_old_controller(Pid, Socket); - {ssl, Socket, Data} -> - Pid ! {ssl, Socket, Data}, - flush_old_controller(Pid, Socket); - {ssl_closed, Socket} -> - Pid ! {ssl_closed, Socket}, - flush_old_controller(Pid, Socket) - after 0 -> - ok - end. -- cgit v1.2.3 From 6e28a7909c665cc316d657dda02a2b8655ecc5da Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Thu, 21 Sep 2017 16:18:37 +0200 Subject: Remove ssl_tls_dist_ctrl process --- lib/ssl/src/dtls_connection.erl | 1 + lib/ssl/src/inet_tls_dist.erl | 329 +++++++++++++++++++++++++++++----------- lib/ssl/src/ssl_connection.erl | 242 ++++++++++++++++++++++++----- lib/ssl/src/tls_connection.erl | 10 +- 4 files changed, 454 insertions(+), 128 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/dtls_connection.erl b/lib/ssl/src/dtls_connection.erl index ff3e69bae5..56ed232b2d 100644 --- a/lib/ssl/src/dtls_connection.erl +++ b/lib/ssl/src/dtls_connection.erl @@ -541,6 +541,7 @@ handle_info(new_cookie_secret, StateName, previous_cookie_secret => Secret}}}; handle_info(Msg, StateName, State) -> ssl_connection:handle_info(Msg, StateName, State). +%%% ssl_connection:StateName(info, Msg, State, ?MODULE). handle_call(Event, From, StateName, State) -> diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl index ef2a608b3c..72cf73e79c 100644 --- a/lib/ssl/src/inet_tls_dist.erl +++ b/lib/ssl/src/inet_tls_dist.erl @@ -31,18 +31,33 @@ -export([split_node/1, nodelay/0]). +-export([dbg/0]). % Debug + -include_lib("kernel/include/net_address.hrl"). -include_lib("kernel/include/dist.hrl"). -include_lib("kernel/include/dist_util.hrl"). -%% -undef(trace). -%% -define(trace(Fmt,Args), -%% erlang:display( -%% [erlang:convert_time_unit( -%% erlang:monotonic_time() -%% - erlang:system_info(start_time), native, microsecond), -%% node(), -%% lists:flatten(io_lib:format(Fmt, Args))])). +-include("ssl_api.hrl"). + +%%%-undef(trace). +%%%%%-define(trace, true). +%%%-ifdef(trace). +%%%trace(Module, FunctionName, Line, Info, Value) -> +%%% erlang:display( +%%% [{erlang:convert_time_unit( +%%% erlang:monotonic_time() +%%% - erlang:system_info(start_time), native, microsecond), +%%% node(), self()}, +%%% {Module, FunctionName, Line}, Info, Value]), +%%% Value. +%%%-else. +%%%trace(_Module, _FunctionName, _Line, _Info, Value) -> Value. +%%%-endif. +%%%-undef(trace). +%%%-define( +%%% trace(Info, Body), +%%% trace(?MODULE, ?FUNCTION_NAME, ?LINE, (Info), begin Body end)). +trace(Term) -> Term. %% ------------------------------------------------------------------------- @@ -76,6 +91,130 @@ is_node_name(Node) -> %% ------------------------------------------------------------------------- +hs_data_common(#sslsocket{pid = DistCtrl} = SslSocket) -> + #hs_data{ + f_send = + fun (Ctrl, Packet) when Ctrl == DistCtrl -> + f_send(SslSocket, Packet) + end, + f_recv = + fun (Ctrl, Length, Timeout) when Ctrl == DistCtrl -> + f_recv(SslSocket, Length, Timeout) + end, + f_setopts_pre_nodeup = + fun (Ctrl) when Ctrl == DistCtrl -> + f_setopts_pre_nodeup(SslSocket) + end, + f_setopts_post_nodeup = + fun (Ctrl) when Ctrl == DistCtrl -> +%%% sys:trace(Ctrl, true), + f_setopts_post_nodeup(SslSocket) + end, + f_getll = + fun (Ctrl) when Ctrl == DistCtrl -> + f_getll(DistCtrl) + end, + f_address = + fun (Ctrl, Node) when Ctrl == DistCtrl -> + f_address(SslSocket, Node) + end, + mf_tick = + fun (Ctrl) when Ctrl == DistCtrl -> + mf_tick(DistCtrl) + end, + mf_getstat = + fun (Ctrl) when Ctrl == DistCtrl -> + mf_getstat(SslSocket) + end, + mf_setopts = + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + mf_setopts(SslSocket, Opts) + end, + mf_getopts = + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + mf_getopts(SslSocket, Opts) + end, + f_handshake_complete = + fun (Ctrl, Node, DHandle) when Ctrl == DistCtrl -> + f_handshake_complete(DistCtrl, Node, DHandle) + end}. + +f_send(SslSocket, Packet) -> + ssl:send(SslSocket, Packet). + +f_recv(SslSocket, Length, Timeout) -> + case ssl:recv(SslSocket, Length, Timeout) of + {ok, Bin} when is_binary(Bin) -> + {ok, binary_to_list(Bin)}; + Other -> + Other + end. + +f_setopts_pre_nodeup(_SslSocket) -> + ok. + +f_setopts_post_nodeup(_SslSocket) -> + ok. + +f_getll(DistCtrl) -> + {ok, DistCtrl}. + +f_address(SslSocket, Node) -> + case ssl:peername(SslSocket) of + {ok, Address} -> + case split_node(Node) of + false -> + {error, no_node}; + Host -> + #net_address{ + address=Address, host=Host, + protocol=tls, family=inet} + end + end. + +mf_tick(DistCtrl) -> + DistCtrl ! tick, + ok. + +mf_getstat(SslSocket) -> + case ssl:getstat( + SslSocket, [recv_cnt, send_cnt, send_pend]) of + {ok, Stat} -> + split_stat(Stat,0,0,0); + Error -> + Error + end. + +mf_setopts(SslSocket, Opts) -> + case setopts_filter(Opts) of + [] -> + ssl:setopts(SslSocket, Opts); + Opts1 -> + {error, {badopts,Opts1}} + end. + +mf_getopts(SslSocket, Opts) -> + ssl:getopts(SslSocket, Opts). + +f_handshake_complete(DistCtrl, Node, DHandle) -> + ssl_connection:handshake_complete(DistCtrl, Node, DHandle). + + +setopts_filter(Opts) -> + [Opt || {K,_} = Opt <- Opts, + K =:= active orelse K =:= deliver orelse K =:= packet]. + +split_stat([{recv_cnt, R}|Stat], _, W, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_cnt, W}|Stat], R, _, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_pend, P}|Stat], R, W, _) -> + split_stat(Stat, R, W, P); +split_stat([], R, W, P) -> + {ok, R, W, P}. + +%% ------------------------------------------------------------------------- + listen(Name) -> gen_listen(inet_tcp, Name). @@ -95,40 +234,33 @@ accept(Listen) -> gen_accept(Driver, Listen) -> Kernel = self(), - spawn_opt( - fun () -> - accept_loop(Driver, Listen, Kernel) - end, - [link, {priority, max}]). + monitor_pid( + spawn_opt( + fun () -> + accept_loop(Driver, Listen, Kernel) + end, + [link, {priority, max}])). accept_loop(Driver, Listen, Kernel) -> - ?trace("~p~n",[{?MODULE, accept_loop, self()}]), case Driver:accept(Listen) of {ok, Socket} -> Opts = get_ssl_options(server), wait_for_code_server(), case ssl:ssl_accept( Socket, [{active, false}, {packet, 4}] ++ Opts) of - {ok, SslSocket} -> - DistCtrl = ssl_tls_dist_ctrl:start(SslSocket), - ?trace("~p~n", - [{?MODULE, accept_loop, accepted, - SslSocket, DistCtrl, self()}]), - ok = ssl:controlling_process(SslSocket, DistCtrl), - Kernel ! - {accept, self(), DistCtrl, Driver:family(), tls}, + {ok, #sslsocket{pid = DistCtrl} = SslSocket} -> + monitor_pid(DistCtrl), + trace( + Kernel ! + {accept, self(), DistCtrl, + Driver:family(), tls}), receive {Kernel, controller, Pid} -> - ?trace("~p~n", - [{?MODULE, accept_loop, - controller, self()}]), - ssl_tls_dist_ctrl:set_supervisor(DistCtrl, Pid), - Pid ! {self(), controller}; + ok = ssl:controlling_process(SslSocket, Pid), + trace( + Pid ! {self(), controller}); {Kernel, unsupported_protocol} -> - ?trace("~p~n", - [{?MODULE, accept_loop, - unsupported_protocol, self()}]), - exit(unsupported_protocol) + exit(trace(unsupported_protocol)) end, accept_loop(Driver, Listen, Kernel); {error, {options, _}} = Error -> @@ -137,12 +269,14 @@ accept_loop(Driver, Listen, Kernel) -> error_logger:error_msg( "Cannot accept TLS distribution connection: ~s~n", [ssl:format_error(Error)]), - gen_tcp:close(Socket); - _ -> - gen_tcp:close(Socket) + _ = trace(Error), + gen_tcp:close(Socket); + Other -> + _ = trace(Other), + gen_tcp:close(Socket) end; Error -> - exit(Error) + exit(trace(Error)) end, accept_loop(Driver, Listen, Kernel). @@ -184,21 +318,23 @@ accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> gen_accept_connection( Driver, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> Kernel = self(), - spawn_opt( - fun() -> - do_accept( - Driver, Kernel, AcceptPid, DistCtrl, - MyNode, Allowed, SetupTime) - end, - [link, {priority, max}]). + monitor_pid( + spawn_opt( + fun() -> + do_accept( + Driver, Kernel, AcceptPid, DistCtrl, + MyNode, Allowed, SetupTime) + end, + [link, {priority, max}])). do_accept(Driver, Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + SslSocket = ssl_connection:get_sslsocket(DistCtrl), receive {AcceptPid, controller} -> Timer = dist_util:start_timer(SetupTime), - case check_ip(Driver, DistCtrl) of + case check_ip(Driver, SslSocket) of true -> - HSData0 = ssl_tls_dist_ctrl:hs_data_common(DistCtrl), + HSData0 = hs_data_common(SslSocket), HSData = HSData0#hs_data{ kernel_pid = Kernel, @@ -207,12 +343,12 @@ do_accept(Driver, Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> timer = Timer, this_flags = 0, allowed = Allowed}, - dist_util:handshake_other_started(HSData); + dist_util:handshake_other_started(trace(HSData)); {false,IP} -> error_logger:error_msg( "** Connection attempt from " "disallowed IP ~w ** ~n", [IP]), - ?shutdown(no_node) + ?shutdown(trace(no_node)) end end. @@ -223,42 +359,33 @@ setup(Node, Type, MyNode, LongOrShortNames, SetupTime) -> gen_setup(Driver, Node, Type, MyNode, LongOrShortNames, SetupTime) -> Kernel = self(), - spawn_opt( - fun() -> - do_setup( - Driver, Kernel, Node, Type, - MyNode, LongOrShortNames, SetupTime) - end, - [link, {priority, max}]). + monitor_pid( + spawn_opt( + fun() -> + do_setup( + Driver, Kernel, Node, Type, + MyNode, LongOrShortNames, SetupTime) + end, + [link, {priority, max}])). do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> [Name, Address] = splitnode(Driver, Node, LongOrShortNames), case Driver:getaddr(Address) of {ok, Ip} -> - Timer = dist_util:start_timer(SetupTime), + Timer = trace(dist_util:start_timer(SetupTime)), ErlEpmd = net_kernel:epmd_module(), case ErlEpmd:port_please(Name, Ip) of {port, TcpPort, Version} -> - ?trace("port_please(~p) -> version ~p~n", - [Node,Version]), - Opts = connect_options(get_ssl_options(client)), + Opts = trace(connect_options(get_ssl_options(client))), dist_util:reset_timer(Timer), case ssl:connect( Ip, TcpPort, [binary, {active, false}, {packet, 4}, Driver:family(), nodelay()] ++ Opts) of - {ok, SslSocket} -> - ?trace("~p~n", - [{?MODULE, do_setup, - ssl_socket, SslSocket}]), - DistCtrl = ssl_tls_dist_ctrl:start(SslSocket), - ssl_tls_dist_ctrl:set_supervisor( - DistCtrl, self()), - ok = - ssl:controlling_process( - SslSocket, DistCtrl), - HSData0 = - ssl_tls_dist_ctrl:hs_data_common(DistCtrl), + {ok, #sslsocket{pid = DistCtrl} = SslSocket} -> + monitor_pid(DistCtrl), + ok = ssl:controlling_process(SslSocket, self()), + HSData0 = hs_data_common(SslSocket), HSData = HSData0#hs_data{ kernel_pid = Kernel, @@ -269,44 +396,37 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> this_flags = 0, other_version = Version, request_type = Type}, - ?trace("~p~n", - [{?MODULE, do_setup, - handshake_we_started, HSData}]), - dist_util:handshake_we_started(HSData); + dist_util:handshake_we_started(trace(HSData)); Other -> %% Other Node may have closed since %% port_please ! - ?trace("other node (~p) " - "closed since port_please.~n", - [Node]), - ?shutdown2(Node, - {shutdown, {connect_failed, Other}}) + ?shutdown2( + Node, + trace({shutdown, {connect_failed, Other}})) end; Other -> - ?trace("port_please (~p) " - "failed.~n", [Node]), - ?shutdown2(Node, {shutdown, {port_please_failed, Other}}) + ?shutdown2( + Node, + trace({shutdown, {port_please_failed, Other}})) end; Other -> - ?trace("~w:getaddr(~p) " - "failed (~p).~n", [Driver, Address, Other]), - ?shutdown2(Node, {shutdown, {getaddr_failed, Other}}) + ?shutdown2(Node, trace({shutdown, {getaddr_failed, Other}})) end. close(Socket) -> gen_close(inet, Socket). gen_close(Driver, Socket) -> - Driver:close(Socket). + trace(Driver:close(Socket)). %% ------------------------------------------------------------ %% Do only accept new connection attempts from nodes at our %% own LAN, if the check_ip environment parameter is true. %% ------------------------------------------------------------ -check_ip(Driver, DistCtrl) -> +check_ip(Driver, SslSocket) -> case application:get_env(check_ip) of {ok, true} -> - case get_ifs(DistCtrl) of + case get_ifs(SslSocket) of {ok, IFs, IP} -> check_ip(Driver, IFs, IP); _ -> @@ -324,8 +444,7 @@ check_ip(Driver, [{OwnIP, _, Netmask}|IFs], PeerIP) -> check_ip(_Driver, [], PeerIP) -> {false, PeerIP}. -get_ifs(DistCtrl) -> - Socket = ssl_tls_dist_ctrl:get_socket(DistCtrl), +get_ifs(#sslsocket{fd = {gen_tcp, Socket, _}}) -> case inet:peername(Socket) of {ok, {IP, _}} -> %% XXX this is seriously broken for IPv6 @@ -513,3 +632,35 @@ verify_fun(Value) -> _ -> error(malformed_ssl_dist_opt, [Value]) end. + +%% ------------------------------------------------------------------------- + +%% Keep an eye on distribution Pid:s we know of +monitor_pid(Pid) -> + spawn( + fun () -> + MRef = erlang:monitor(process, Pid), + receive + {'DOWN', MRef, _, _, normal} -> + error_logger:error_report( + [dist_proc_died, + {reason, normal}, + {pid, Pid}]); + {'DOWN', MRef, _, _, Reason} -> + error_logger:info_report( + [dist_proc_died, + {reason, Reason}, + {pid, Pid}]) + end + end), + Pid. + +dbg() -> + dbg:stop(), + dbg:tracer(), + dbg:p(all, c), + dbg:tpl(?MODULE, cx), + dbg:tpl(erlang, dist_ctrl_get_data_notification, cx), + dbg:tpl(erlang, dist_ctrl_get_data, cx), + dbg:tpl(erlang, dist_ctrl_put_data, cx), + ok. diff --git a/lib/ssl/src/ssl_connection.erl b/lib/ssl/src/ssl_connection.erl index 5d48325719..008e7e1704 100644 --- a/lib/ssl/src/ssl_connection.erl +++ b/lib/ssl/src/ssl_connection.erl @@ -44,12 +44,14 @@ -export([send/2, recv/3, close/2, shutdown/2, new_user/2, get_opts/2, set_opts/2, peer_certificate/1, renegotiation/1, negotiated_protocol/1, prf/5, + get_sslsocket/1, handshake_complete/3, connection_information/2, handle_common_event/5 ]). %% General gen_statem state functions with extra callback argument %% to determine if it is an SSL/TLS or DTLS gen_statem machine --export([init/4, hello/4, abbreviated/4, certify/4, cipher/4, connection/4, downgrade/4]). +-export([init/4, hello/4, abbreviated/4, certify/4, cipher/4, + connection/4, death_row/4, downgrade/4]). %% gen_statem callbacks -export([terminate/3, format_status/2]). @@ -262,6 +264,13 @@ peer_certificate(ConnectionPid) -> renegotiation(ConnectionPid) -> call(ConnectionPid, renegotiate). + +get_sslsocket(ConnectionPid) -> + call(ConnectionPid, get_sslsocket). + +handshake_complete(ConnectionPid, Node, DHandle) -> + call(ConnectionPid, {handshake_complete, Node, DHandle}). + %%-------------------------------------------------------------------- -spec prf(pid(), binary() | 'master_secret', binary(), [binary() | ssl:prf_random()], non_neg_integer()) -> @@ -359,6 +368,12 @@ init({call, From}, {start, {Opts, EmOpts}, Timeout}, socket_options = SockOpts} = State0, Connection) -> try SslOpts = ssl:handle_options(Opts, OrigSSLOptions), + case SslOpts of + #ssl_options{erl_dist = true} -> + process_flag(priority, max); + _ -> + ok + end, State = ssl_config(SslOpts, Role, State0), init({call, From}, {start, Timeout}, State#state{ssl_options = SslOpts, socket_options = new_emulated(EmOpts, SockOpts)}, Connection) @@ -748,7 +763,7 @@ cipher(Type, Msg, State, Connection) -> #state{}, tls_connection | dtls_connection) -> gen_statem:state_function_result(). %%-------------------------------------------------------------------- -connection({call, From}, {application_data, Data}, +connection({call, {FromPid, _} = From}, {application_data, Data}, #state{protocol_cb = Connection} = State, Connection) -> %% We should look into having a worker process to do this to %% parallize send and receive decoding and not block the receiver @@ -756,7 +771,13 @@ connection({call, From}, {application_data, Data}, try write_application_data(Data, From, State) catch throw:Error -> - hibernate_after(connection, State, [{reply, From, Error}]) + case self() of + FromPid -> + {stop, {shutdown, Error}}; + _ -> + hibernate_after( + connection, State, [{reply, From, Error}]) + end end; connection({call, RecvFrom}, {recv, N, Timeout}, #state{protocol_cb = Connection, socket_options = @@ -784,6 +805,28 @@ connection({call, From}, negotiated_protocol, #state{negotiated_protocol = SelectedProtocol} = State, _) -> hibernate_after(connection, State, [{reply, From, {ok, SelectedProtocol}}]); +connection( + {call, From}, {handshake_complete, _Node, DHandle}, + #state{ + ssl_options = #ssl_options{erl_dist = true}, + socket_options = SockOpts, + protocol_specific = ProtocolSpecific} = State, + Connection) -> + %% From now on we execute on normal priority + process_flag(priority, normal), + try erlang:dist_ctrl_get_data_notification(DHandle) of + _ -> + NewState = + State#state{ + socket_options = + SockOpts#socket_options{active = true}, + protocol_specific = + ProtocolSpecific#{d_handle => DHandle}}, + {Record, NewerState} = Connection:next_record_if_active(NewState), + Connection:next_event(connection, Record, NewerState, [{reply, From, ok}]) + catch _:Reason -> + death_row(State, Reason) + end; connection({call, From}, Msg, State, Connection) -> handle_call(Msg, From, connection, State, Connection); connection(info, Msg, State, _) -> @@ -793,6 +836,30 @@ connection(internal, {recv, _}, State, Connection) -> connection(Type, Msg, State, Connection) -> handle_common_event(Type, Msg, connection, State, Connection). +%%-------------------------------------------------------------------- +-spec death_row(gen_statem:event_type(), term(), + #state{}, tls_connection | dtls_connection) -> + gen_statem:state_function_result(). +%%-------------------------------------------------------------------- +%% We just wait for the owner to die which triggers the monitor, +%% or the socket may die too +death_row( + info, {'DOWN', MonitorRef, _, _, Reason}, + #state{user_application={MonitorRef,_Pid} = State}, + _) -> + {stop, {shutdown, Reason}, State}; +death_row( + info, {'EXIT', Socket, Reason}, #state{socket = Socket} = State, _) -> + {stop, {shutdown, Reason}, State}; +death_row(state_timeout, Reason, _State, _Connection) -> + {stop, {shutdown,Reason}}; +death_row(_Type, _Msg, State, _Connection) -> + {keep_state, State, [postpone]}. + +%% State entry function +death_row(State, Reason) -> + {next_state, death_row, State, [{state_timeout, 5000, Reason}]}. + %%-------------------------------------------------------------------- -spec downgrade(gen_statem:event_type(), term(), #state{}, tls_connection | dtls_connection) -> @@ -804,10 +871,10 @@ downgrade(internal, #alert{description = ?CLOSE_NOTIFY}, tls_socket:setopts(Transport, Socket, [{active, false}, {packet, 0}, {mode, binary}]), Transport:controlling_process(Socket, Pid), gen_statem:reply(From, {ok, Socket}), - {stop, normal, State}; + stop_normal(State); downgrade(timeout, downgrade, #state{downgrade = {_, From}} = State, _) -> gen_statem:reply(From, {error, timeout}), - {stop, normal, State}; + stop_normal(State); downgrade(Type, Event, State, Connection) -> handle_common_event(Type, Event, downgrade, State, Connection). @@ -877,7 +944,7 @@ handle_call({shutdown, How0}, From, _, #state{transport_cb = Transport, negotiated_version = Version, connection_states = ConnectionStates, - socket = Socket}, Connection) -> + socket = Socket} = State, Connection) -> case How0 of How when How == write; How == both -> Alert = ?ALERT_REC(?WARNING, ?CLOSE_NOTIFY), @@ -893,7 +960,7 @@ handle_call({shutdown, How0}, From, _, {keep_state_and_data, [{reply, From, ok}]}; Error -> gen_statem:reply(From, {error, Error}), - {stop, normal} + stop_normal(State) end; handle_call({recv, _N, _Timeout}, From, _, #state{socket_options = @@ -928,6 +995,15 @@ handle_call({set_opts, Opts0}, From, StateName, handle_call(renegotiate, From, StateName, _, _) when StateName =/= connection -> {keep_state_and_data, [{reply, From, {error, already_renegotiating}}]}; + +handle_call( + get_sslsocket, From, _StateName, + #state{transport_cb = Transport, socket = Socket, tracker = Tracker}, + Connection) -> + SslSocket = + Connection:socket(self(), Transport, Socket, Connection, Tracker), + {keep_state_and_data, [{reply, From, SslSocket}]}; + handle_call({prf, Secret, Label, Seed, WantedLength}, From, _, #state{connection_states = ConnectionStates, negotiated_version = Version}, _) -> @@ -964,18 +1040,19 @@ handle_info({ErrorTag, Socket, econnaborted}, StateName, tracker = Tracker} = State) when StateName =/= connection -> alert_user(Transport, Tracker,Socket, StartFrom, ?ALERT_REC(?FATAL, ?CLOSE_NOTIFY), Role, Connection), - {stop, normal, State}; + stop_normal(State); handle_info({ErrorTag, Socket, Reason}, StateName, #state{socket = Socket, error_tag = ErrorTag} = State) -> Report = io_lib:format("SSL: Socket error: ~p ~n", [Reason]), - error_logger:info_report(Report), + error_logger:error_report(Report), handle_normal_shutdown(?ALERT_REC(?FATAL, ?CLOSE_NOTIFY), StateName, State), - {stop, normal, State}; + stop_normal(State); -handle_info({'DOWN', MonitorRef, _, _, _}, _, - State = #state{user_application={MonitorRef,_Pid}}) -> - {stop, normal, State}; +handle_info( + {'DOWN', MonitorRef, _, _, _}, _, + #state{user_application={MonitorRef,_Pid}} = State) -> + stop_normal(State); %%% So that terminate will be run when supervisor issues shutdown handle_info({'EXIT', _Sup, shutdown}, _StateName, State) -> @@ -983,6 +1060,8 @@ handle_info({'EXIT', _Sup, shutdown}, _StateName, State) -> handle_info({'EXIT', Socket, normal}, _StateName, #state{socket = Socket} = State) -> %% Handle as transport close" {stop, {shutdown, transport_closed}, State}; +handle_info({'EXIT', Socket, Reason}, _StateName, #state{socket = Socket} = State) -> + {stop, {shutdown, Reason}, State}; handle_info(allow_renegotiate, StateName, State) -> {next_state, StateName, State#state{allow_renegotiate = true}}; @@ -1001,11 +1080,62 @@ handle_info({cancel_start_or_recv, RecvFrom}, StateName, handle_info({cancel_start_or_recv, _RecvFrom}, StateName, State) -> {next_state, StateName, State#state{timer = undefined}}; +handle_info( + dist_data = Msg, + connection, + #state{ + ssl_options = #ssl_options{erl_dist = true}, + protocol_specific = #{d_handle := DHandle}} = State) -> + eat_msgs(Msg), + try send_dist_data(connection, State, DHandle, []) + catch _:Reason -> + death_row(State, Reason) + end; +handle_info( + tick = Msg, + connection, + #state{ + ssl_options = #ssl_options{erl_dist = true}, + protocol_specific = #{d_handle := _}}) -> + eat_msgs(Msg), + {keep_state_and_data, + [{next_event, {call, {self(), undefined}}, {application_data, <<>>}}]}; + handle_info(Msg, StateName, #state{socket = Socket, error_tag = Tag} = State) -> Report = io_lib:format("SSL: Got unexpected info: ~p ~n", [{Msg, Tag, Socket}]), error_logger:info_report(Report), {next_state, StateName, State}. +send_dist_data(StateName, State, DHandle, Acc) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + erlang:dist_ctrl_get_data_notification(DHandle), + hibernate_after(StateName, State, lists:reverse(Acc)); + Data -> + send_dist_data( + StateName, State, DHandle, + [{next_event, {call, {self(), undefined}}, {application_data, Data}} + |Acc]) + end. + +%% Overload mitigation +eat_msgs(Msg) -> + receive Msg -> eat_msgs(Msg) + after 0 -> ok + end. + +%% When running with erl_dist the stop reason 'normal' +%% would be too silent and prevent cleanup +stop_normal(State) -> + Reason = + case State of + #state{ssl_options = #ssl_options{erl_dist = true}} -> + {shutdown, normal}; + _ -> + normal + end, + {stop, Reason, State}. + %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- @@ -1080,7 +1210,7 @@ format_status(terminate, [_, StateName, State]) -> %%-------------------------------------------------------------------- %%% %%-------------------------------------------------------------------- -write_application_data(Data0, From, +write_application_data(Data0, {FromPid, _} = From, #state{socket = Socket, negotiated_version = Version, protocol_cb = Connection, @@ -1095,10 +1225,19 @@ write_application_data(Data0, From, Connection:renegotiate(State#state{renegotiation = {true, internal}}, [{next_event, {call, From}, {application_data, Data0}}]); false -> - {Msgs, ConnectionStates} = Connection:encode_data(Data, Version, ConnectionStates0), - Result = Connection:send(Transport, Socket, Msgs), - ssl_connection:hibernate_after(connection, State#state{connection_states = ConnectionStates}, - [{reply, From, Result}]) + {Msgs, ConnectionStates} = + Connection:encode_data(Data, Version, ConnectionStates0), + NewState = State#state{connection_states = ConnectionStates}, + case Connection:send(Transport, Socket, Msgs) of + ok when FromPid =:= self() -> + hibernate_after(connection, NewState, []); + Error when FromPid =:= self() -> + {stop, {shutdown, Error}, NewState}; + ok -> + hibernate_after(connection, NewState, [{reply, From, ok}]); + Result -> + hibernate_after(connection, NewState, [{reply, From, Result}]) + end end. read_application_data(Data, #state{user_application = {_Mon, Pid}, @@ -1118,30 +1257,57 @@ read_application_data(Data, #state{user_application = {_Mon, Pid}, end, case get_data(SOpts, BytesToRead, Buffer1) of {ok, ClientData, Buffer} -> % Send data - SocketOpt = deliver_app_data(Transport, Socket, SOpts, - ClientData, Pid, RecvFrom, Tracker, Connection), - cancel_timer(Timer), - State = State0#state{user_data_buffer = Buffer, - start_or_recv_from = undefined, - timer = undefined, - bytes_to_read = undefined, - socket_options = SocketOpt - }, - if - SocketOpt#socket_options.active =:= false; Buffer =:= <<>> -> - %% Passive mode, wait for active once or recv - %% Active and empty, get more data - Connection:next_record_if_active(State); - true -> %% We have more data - read_application_data(<<>>, State) - end; + case State0 of + #state{ + ssl_options = #ssl_options{erl_dist = true}, + protocol_specific = #{d_handle := DHandle}} -> + State = + State0#state{ + user_data_buffer = Buffer, + bytes_to_read = undefined}, + try erlang:dist_ctrl_put_data(DHandle, ClientData) of + _ + when SOpts#socket_options.active =:= false; + Buffer =:= <<>> -> + %% Passive mode, wait for active once or recv + %% Active and empty, get more data + Connection:next_record_if_active(State); + _ -> %% We have more data + read_application_data(<<>>, State) + catch _:Reason -> + death_row(State, Reason) + end; + _ -> + SocketOpt = + deliver_app_data( + Transport, Socket, SOpts, + ClientData, Pid, RecvFrom, Tracker, Connection), + cancel_timer(Timer), + State = + State0#state{ + user_data_buffer = Buffer, + start_or_recv_from = undefined, + timer = undefined, + bytes_to_read = undefined, + socket_options = SocketOpt + }, + if + SocketOpt#socket_options.active =:= false; + Buffer =:= <<>> -> + %% Passive mode, wait for active once or recv + %% Active and empty, get more data + Connection:next_record_if_active(State); + true -> %% We have more data + read_application_data(<<>>, State) + end + end; {more, Buffer} -> % no reply, we need more data Connection:next_record(State0#state{user_data_buffer = Buffer}); {passive, Buffer} -> Connection:next_record_if_active(State0#state{user_data_buffer = Buffer}); {error,_Reason} -> %% Invalid packet in packet mode deliver_packet_error(Transport, Socket, SOpts, Buffer1, Pid, RecvFrom, Tracker, Connection), - {stop, normal, State0} + stop_normal(State0) end. %%-------------------------------------------------------------------- %%% @@ -1151,12 +1317,12 @@ handle_alert(#alert{level = ?FATAL} = Alert, StateName, protocol_cb = Connection, ssl_options = SslOpts, start_or_recv_from = From, host = Host, port = Port, session = Session, user_application = {_Mon, Pid}, - role = Role, socket_options = Opts, tracker = Tracker}) -> + role = Role, socket_options = Opts, tracker = Tracker} = State) -> invalidate_session(Role, Host, Port, Session), log_alert(SslOpts#ssl_options.log_alert, Role, Connection:protocol_name(), StateName, Alert#alert{role = opposite_role(Role)}), alert_user(Transport, Tracker, Socket, StateName, Opts, Pid, From, Alert, Role, Connection), - {stop, normal}; + stop_normal(State); handle_alert(#alert{level = ?WARNING, description = ?CLOSE_NOTIFY} = Alert, StateName, State) -> diff --git a/lib/ssl/src/tls_connection.erl b/lib/ssl/src/tls_connection.erl index e3ffbea3d3..1c506fe951 100644 --- a/lib/ssl/src/tls_connection.erl +++ b/lib/ssl/src/tls_connection.erl @@ -65,7 +65,7 @@ %% gen_statem state functions -export([init/3, error/3, downgrade/3, %% Initiation and take down states hello/3, certify/3, cipher/3, abbreviated/3, %% Handshake states - connection/3]). + connection/3, death_row/3]). %% gen_statem callbacks -export([callback_mode/0, terminate/3, code_change/4, format_status/2]). @@ -377,6 +377,13 @@ connection(internal, #client_hello{}, connection(Type, Event, State) -> ssl_connection:connection(Type, Event, State, ?MODULE). +%%-------------------------------------------------------------------- +-spec death_row(gen_statem:event_type(), term(), #state{}) -> + gen_statem:state_function_result(). +%%-------------------------------------------------------------------- +death_row(Type, Event, State) -> + ssl_connection:death_row(Type, Event, State, ?MODULE). + %%-------------------------------------------------------------------- -spec downgrade(gen_statem:event_type(), term(), #state{}) -> gen_statem:state_function_result(). @@ -435,6 +442,7 @@ handle_info({CloseTag, Socket}, StateName, end; handle_info(Msg, StateName, State) -> ssl_connection:handle_info(Msg, StateName, State). +%%% ssl_connection:StateName(info, Msg, State, ?MODULE). handle_common_event(internal, #alert{} = Alert, StateName, #state{negotiated_version = Version} = State) -> -- cgit v1.2.3 From 80103ef6d8e1dd2f3be1ec33924c74f4f9362301 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Wed, 27 Sep 2017 11:43:34 +0200 Subject: Remove ssl_tls_dist_ctrl module --- lib/ssl/src/Makefile | 3 +- lib/ssl/src/inet_tls_dist.erl | 2 +- lib/ssl/src/ssl.app.src | 1 - lib/ssl/src/ssl_tls_dist_ctrl.erl | 405 -------------------------------------- 4 files changed, 2 insertions(+), 409 deletions(-) delete mode 100644 lib/ssl/src/ssl_tls_dist_ctrl.erl (limited to 'lib') diff --git a/lib/ssl/src/Makefile b/lib/ssl/src/Makefile index f6fe23b584..8eba5cf347 100644 --- a/lib/ssl/src/Makefile +++ b/lib/ssl/src/Makefile @@ -87,8 +87,7 @@ MODULES= \ ssl_v2 \ ssl_v3 \ tls_v1 \ - dtls_v1 \ - ssl_tls_dist_ctrl + dtls_v1 INTERNAL_HRL_FILES = \ ssl_alert.hrl ssl_cipher.hrl \ diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl index 72cf73e79c..131c3b56a5 100644 --- a/lib/ssl/src/inet_tls_dist.erl +++ b/lib/ssl/src/inet_tls_dist.erl @@ -294,7 +294,7 @@ wait_for_code_server() -> %% unloaded, and the function call that triggered loading it fails %% with 'undef', which is rather confusing. %% - %% Thus, the ssl_tls_dist_ctrl process will terminate, and be + %% Thus, the accept process will terminate, and be %% restarted by ssl_dist_sup. However, it won't have any memory %% of being asked by net_kernel to listen for incoming %% connections. Hence, the node will believe that it's open for diff --git a/lib/ssl/src/ssl.app.src b/lib/ssl/src/ssl.app.src index a5286b6747..a75193a008 100644 --- a/lib/ssl/src/ssl.app.src +++ b/lib/ssl/src/ssl.app.src @@ -37,7 +37,6 @@ %% Erlang Distribution over SSL/TLS inet_tls_dist, inet6_tls_dist, - ssl_tls_dist_ctrl, ssl_dist_sup, ssl_dist_connection_sup, ssl_dist_admin_sup, diff --git a/lib/ssl/src/ssl_tls_dist_ctrl.erl b/lib/ssl/src/ssl_tls_dist_ctrl.erl deleted file mode 100644 index 1504d1c1e4..0000000000 --- a/lib/ssl/src/ssl_tls_dist_ctrl.erl +++ /dev/null @@ -1,405 +0,0 @@ -%% -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 2017. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%% -%% %CopyrightEnd% -%% --module(ssl_tls_dist_ctrl). - --include_lib("kernel/include/dist.hrl"). --include_lib("kernel/include/dist_util.hrl"). --include_lib("kernel/include/net_address.hrl"). - --export([start/1, get_socket/1, set_supervisor/2, hs_data_common/1]). - -%%% ------------------------------------------------------------ - -%% In order to avoid issues with lingering signal binaries -%% we enable off-heap message queue data as well as fullsweep -%% after 0. The fullsweeps will be cheap since we have more -%% or less no live data. -common_spawn_opts() -> - [{message_queue_data, off_heap}, - {fullsweep_after, 0}]. - -%%% ------------------------------------------------------------ - -start(SslSocket) -> - spawn_opt( - fun () -> - setup(SslSocket) - end, - [{priority, max}] ++ common_spawn_opts()). - -get_socket(DistCtrl) -> - call(DistCtrl, get_socket). - -set_supervisor(DistCtrl, Pid) -> - call(DistCtrl, {set_supervisor, Pid}). - -hs_data_common(DistCtrl) -> - TickHandler = call(DistCtrl, tick_handler), - SslSocket = get_socket(DistCtrl), - #hs_data{ - f_send = - fun (Ctrl, Packet) when Ctrl == DistCtrl -> - call(Ctrl, {send, Packet}) - end, - f_recv = - fun (Ctrl, Length, Timeout) when Ctrl == DistCtrl -> - case call(Ctrl, {recv, Length, Timeout}) of - {ok, Bin} when is_binary(Bin) -> - {ok, binary_to_list(Bin)}; - Other -> - Other - end - end, - f_setopts_pre_nodeup = - fun (Ctrl) when Ctrl == DistCtrl -> - call(Ctrl, pre_nodeup) - end, - f_setopts_post_nodeup = - fun (Ctrl) when Ctrl == DistCtrl -> - call(Ctrl, post_nodeup) - end, - f_getll = - fun (Ctrl) when Ctrl == DistCtrl -> - call(Ctrl, getll) - end, - f_handshake_complete = - fun (Ctrl, Node, DHandle) when Ctrl == DistCtrl -> - call(Ctrl, {handshake_complete, Node, DHandle}) - end, - f_address = - fun (Ctrl, Node) when Ctrl == DistCtrl -> - case call(Ctrl, {check_address, Node}) of - {error, no_node} -> - %% No '@' or more than one '@' in node name. - ?shutdown(no_node); - Res -> - Res - end - end, - mf_setopts = - fun (Ctrl, Opts) when Ctrl == DistCtrl -> - case setopts_filter(Opts) of - [] -> - ssl:setopts(SslSocket, Opts); - Opts1 -> - {error, {badopts,Opts1}} - end - end, - mf_getopts = - fun (Ctrl, Opts) when Ctrl == DistCtrl -> - ssl:getopts(SslSocket, Opts) - end, - mf_getstat = - fun (Ctrl) when Ctrl == DistCtrl -> - case ssl:getstat( - SslSocket, [recv_cnt, send_cnt, send_pend]) of - {ok, Stat} -> - split_stat(Stat,0,0,0); - Error -> - Error - end - end, - mf_tick = - fun (Ctrl) when Ctrl == DistCtrl -> - TickHandler ! tick - end}. - -%%% ------------------------------------------------------------ - -call(DistCtrl, Msg) -> - Ref = erlang:monitor(process, DistCtrl), - DistCtrl ! {Ref, self(), Msg}, - receive - {Ref, Res} -> - erlang:demonitor(Ref, [flush]), - Res; - {'DOWN', Ref, process, DistCtrl, Reason} -> - exit({dist_controller_exit, Reason}) - end. - -setopts_filter(Opts) -> - [Opt || {K,_} = Opt <- Opts, - K =:= active orelse K =:= deliver orelse K =:= packet]. - -split_stat([{recv_cnt, R}|Stat], _, W, P) -> - split_stat(Stat, R, W, P); -split_stat([{send_cnt, W}|Stat], R, _, P) -> - split_stat(Stat, R, W, P); -split_stat([{send_pend, P}|Stat], R, W, _) -> - split_stat(Stat, R, W, P); -split_stat([], R, W, P) -> - {ok, R, W, P}. - -%%% ------------------------------------------------------------ -%%% Distribution controller processes -%%% ------------------------------------------------------------ - -%% -%% There will be five parties working together when the -%% connection is up: -%% - The SSL socket. Providing a TLS connection -%% to the other node. -%% - The output handler. It will dispatch all outgoing -%% traffic from the VM to the gen_tcp socket. This -%% process is registered as distribution controller -%% for this channel with the VM. -%% - The input handler. It will dispatch all incoming -%% traffic from the gen_tcp socket to the VM. This -%% process is also the socket owner and receives -%% incoming traffic using active-N. -%% - The tick handler. Dispatches asynchronous tick -%% requests to the socket. It executes on max priority -%% since it is important to get ticks through to the -%% other end. -%% - The channel supervisor (provided by dist_util). It -%% monitors traffic. Issue tick requests to the tick -%% handler when no outgoing traffic is seen and bring -%% the connection down if no incoming traffic is seen. -%% This process also executes on max priority. -%% -%% These parties are linked togheter so should one -%% of them fail, all of them are terminated and the -%% connection is taken down. -%% - - -%% -%% The tick handler process writes a tick to the -%% socket when it receives a 'tick' message from -%% the connection supervisor. -%% -%% We are not allowed to block the connection -%% superviser when writing a tick and we also want -%% the tick to go through even during a heavily -%% loaded system. gen_tcp does not have a -%% non-blocking send operation exposed in its API -%% and we don't want to run the distribution -%% controller under high priority. Therefore this -%% separate process with max prio that dispatches -%% ticks. -%% -tick_handler(SslSocket) -> - receive - tick -> - %% May block due to busy port... - sock_send(SslSocket, ""), - flush_ticks(SslSocket); - _ -> - tick_handler(SslSocket) - end. - -flush_ticks(SslSocket) -> - receive - tick -> - flush_ticks(SslSocket) - after 0 -> - tick_handler(SslSocket) - end. - -setup(SslSocket) -> - TickHandler = - spawn_opt( - fun () -> - tick_handler(SslSocket) - end, - [link, {priority, max}] ++ common_spawn_opts()), - setup_loop(SslSocket, TickHandler, undefined). - -%% -%% During the handshake phase we loop in setup(). -%% When the connection is up we spawn an input handler and -%% continue as output handler. -%% -setup_loop(SslSocket, TickHandler, Sup) -> - receive - {ssl_closed, SslSocket} -> - exit(connection_closed); - {ssl_error, SslSocket} -> - exit(connection_closed); - - {Ref, From, {set_supervisor, Pid}} -> - Res = link(Pid), - From ! {Ref, Res}, - setup_loop(SslSocket, TickHandler, Pid); - - {Ref, From, tick_handler} -> - From ! {Ref, TickHandler}, - setup_loop(SslSocket, TickHandler, Sup); - - {Ref, From, get_socket} -> - From ! {Ref, SslSocket}, - setup_loop(SslSocket, TickHandler, Sup); - - {Ref, From, {send, Packet}} -> - Res = ssl:send(SslSocket, Packet), - From ! {Ref, Res}, - setup_loop(SslSocket, TickHandler, Sup); - - {Ref, From, {recv, Length, Timeout}} -> - Res = ssl:recv(SslSocket, Length, Timeout), - From ! {Ref, Res}, - setup_loop(SslSocket, TickHandler, Sup); - - {Ref, From, {check_address, Node}} -> - Res = - case ssl:peername(SslSocket) of - {ok, Address} -> - case inet_tls_dist:split_node(Node) of - false -> - {error, no_node}; - Host -> - #net_address{ - address=Address, host=Host, - protocol=tls, family=inet} - end - end, - From ! {Ref, Res}, - setup_loop(SslSocket, TickHandler, Sup); - - {Ref, From, pre_nodeup} -> - Res = - ssl:setopts( - SslSocket, - [{packet, 4}, inet_tls_dist:nodelay()]), - From ! {Ref, Res}, - setup_loop(SslSocket, TickHandler, Sup); - - {Ref, From, post_nodeup} -> - Res = - ssl:setopts( - SslSocket, - [{packet, 4}, inet_tls_dist:nodelay()]), - From ! {Ref, Res}, - setup_loop(SslSocket, TickHandler, Sup); - - {Ref, From, getll} -> - From ! {Ref, {ok, self()}}, - setup_loop(SslSocket, TickHandler, Sup); - - {Ref, From, {handshake_complete, _Node, DHandle}} -> - From ! {Ref, ok}, - %% Handshake complete! Begin dispatching traffic... - %% - %% Use a dedicated input process to push the - %% input-output-flow-control-deadlock problem - %% to the SSL implementation. - InputHandler = - spawn_opt( - fun () -> - link(Sup), - ssl:setopts(SslSocket, [{active, once}]), - receive - DHandle -> - input_loop(DHandle, SslSocket) - end - end, - [link] ++ common_spawn_opts()), - ok = ssl:controlling_process(SslSocket, InputHandler), - ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler), - InputHandler ! DHandle, - %% - %% From now on we execute on normal priority - process_flag(priority, normal), - erlang:dist_ctrl_get_data_notification(DHandle), - output_loop(DHandle, SslSocket) - end. - -input_loop(DHandle, SslSocket) -> - receive - {ssl_closed, SslSocket} -> - %% Connection to remote node terminated... - exit(connection_closed); - {ssl_error, SslSocket, _Reason} -> - %% Connection to remote node terminated... - exit(connection_closed); - {ssl, SslSocket, Data} -> - %% Incoming data from remote node... - ok = ssl:setopts(SslSocket, [{active, once}]), - try erlang:dist_ctrl_put_data(DHandle, Data) - catch _:_ -> death_row() - end, - input_loop(DHandle, SslSocket); - _ -> - %% Drop garbage message... - input_loop(DHandle, SslSocket) - end. - -output_loop(DHandle, SslSocket) -> - receive - dist_data -> - %% Outgoing data from this node... - try send_data(DHandle, SslSocket) - catch _ : _ -> death_row() - end, - output_loop(DHandle, SslSocket); - {send, From, Ref, Data} -> - %% This is for testing only! - %% - %% Needed by some OTP distribution - %% test suites... - sock_send(SslSocket, Data), - From ! {Ref, ok}, - output_loop(DHandle, SslSocket); - _ -> - %% Drop garbage message... - output_loop(DHandle, SslSocket) - end. - -send_data(DHandle, SslSocket) -> - case erlang:dist_ctrl_get_data(DHandle) of - none -> - erlang:dist_ctrl_get_data_notification(DHandle); - Data -> - sock_send(SslSocket, Data), - send_data(DHandle, SslSocket) - end. - -sock_send(SslSocket, Data) -> - try ssl:send(SslSocket, Data) of - ok -> ok; - {error, Reason} -> - death_row({send_error, Reason}) - catch - Type:Reason -> - death_row({send_error, {Type, Reason}}) - end. - --spec death_row() -> no_return(). -death_row() -> - death_row(connection_closed). - --spec death_row(term()) -> no_return(). -%% death_row(normal) -> -%% %% We do not want to exit with normal -%% %% exit reason since it wont bring down -%% %% linked processes... -%% death_row(); -death_row(Reason) -> - %% When the connection is on its way down operations - %% begin to fail. We catch the failures and call - %% this function waiting for termination. We should - %% be terminated by one of our links to the other - %% involved parties that began bringing the - %% connection down. By waiting for termination we - %% avoid altering the exit reason for the connection - %% teardown. We however limit the wait to 5 seconds - %% and bring down the connection ourselves if not - %% terminated... - receive after 5000 -> exit(Reason) end. -- cgit v1.2.3 From 0a0f28747f6765103f160b75f5114be866ed28ce Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Wed, 27 Sep 2017 11:44:19 +0200 Subject: Pass all info's to the ssl_connection state function --- lib/ssl/src/dtls_connection.erl | 3 +-- lib/ssl/src/ssl_connection.erl | 43 +++++++++++++++++++++-------------------- lib/ssl/src/tls_connection.erl | 3 +-- 3 files changed, 24 insertions(+), 25 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/dtls_connection.erl b/lib/ssl/src/dtls_connection.erl index 56ed232b2d..ad560c0756 100644 --- a/lib/ssl/src/dtls_connection.erl +++ b/lib/ssl/src/dtls_connection.erl @@ -540,8 +540,7 @@ handle_info(new_cookie_secret, StateName, CookieInfo#{current_cookie_secret => dtls_v1:cookie_secret(), previous_cookie_secret => Secret}}}; handle_info(Msg, StateName, State) -> - ssl_connection:handle_info(Msg, StateName, State). -%%% ssl_connection:StateName(info, Msg, State, ?MODULE). + ssl_connection:StateName(info, Msg, State, ?MODULE). handle_call(Event, From, StateName, State) -> diff --git a/lib/ssl/src/ssl_connection.erl b/lib/ssl/src/ssl_connection.erl index 008e7e1704..dd62ccddcd 100644 --- a/lib/ssl/src/ssl_connection.erl +++ b/lib/ssl/src/ssl_connection.erl @@ -829,6 +829,26 @@ connection( end; connection({call, From}, Msg, State, Connection) -> handle_call(Msg, From, connection, State, Connection); +connection( + info, dist_data = Msg, + #state{ + ssl_options = #ssl_options{erl_dist = true}, + protocol_specific = #{d_handle := DHandle}} = State, + _) -> + eat_msgs(Msg), + try send_dist_data(connection, State, DHandle, []) + catch _:Reason -> + death_row(State, Reason) + end; +connection( + info, tick = Msg, + #state{ + ssl_options = #ssl_options{erl_dist = true}, + protocol_specific = #{d_handle := _}}, + _) -> + eat_msgs(Msg), + {keep_state_and_data, + [{next_event, {call, {self(), undefined}}, {application_data, <<>>}}]}; connection(info, Msg, State, _) -> handle_info(Msg, connection, State); connection(internal, {recv, _}, State, Connection) -> @@ -1080,32 +1100,13 @@ handle_info({cancel_start_or_recv, RecvFrom}, StateName, handle_info({cancel_start_or_recv, _RecvFrom}, StateName, State) -> {next_state, StateName, State#state{timer = undefined}}; -handle_info( - dist_data = Msg, - connection, - #state{ - ssl_options = #ssl_options{erl_dist = true}, - protocol_specific = #{d_handle := DHandle}} = State) -> - eat_msgs(Msg), - try send_dist_data(connection, State, DHandle, []) - catch _:Reason -> - death_row(State, Reason) - end; -handle_info( - tick = Msg, - connection, - #state{ - ssl_options = #ssl_options{erl_dist = true}, - protocol_specific = #{d_handle := _}}) -> - eat_msgs(Msg), - {keep_state_and_data, - [{next_event, {call, {self(), undefined}}, {application_data, <<>>}}]}; - handle_info(Msg, StateName, #state{socket = Socket, error_tag = Tag} = State) -> Report = io_lib:format("SSL: Got unexpected info: ~p ~n", [{Msg, Tag, Socket}]), error_logger:info_report(Report), {next_state, StateName, State}. + + send_dist_data(StateName, State, DHandle, Acc) -> case erlang:dist_ctrl_get_data(DHandle) of none -> diff --git a/lib/ssl/src/tls_connection.erl b/lib/ssl/src/tls_connection.erl index 1c506fe951..16b30a92c6 100644 --- a/lib/ssl/src/tls_connection.erl +++ b/lib/ssl/src/tls_connection.erl @@ -441,8 +441,7 @@ handle_info({CloseTag, Socket}, StateName, next_event(StateName, no_record, State) end; handle_info(Msg, StateName, State) -> - ssl_connection:handle_info(Msg, StateName, State). -%%% ssl_connection:StateName(info, Msg, State, ?MODULE). + ssl_connection:StateName(info, Msg, State, ?MODULE). handle_common_event(internal, #alert{} = Alert, StateName, #state{negotiated_version = Version} = State) -> -- cgit v1.2.3 From 90b1e711cbb4b614ef8cdd396586b29624173612 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Thu, 28 Sep 2017 14:53:04 +0200 Subject: Disable debug function --- lib/ssl/src/inet_tls_dist.erl | 61 ++++++++++++++++--------------------------- 1 file changed, 23 insertions(+), 38 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/inet_tls_dist.erl b/lib/ssl/src/inet_tls_dist.erl index 131c3b56a5..d644cbe66a 100644 --- a/lib/ssl/src/inet_tls_dist.erl +++ b/lib/ssl/src/inet_tls_dist.erl @@ -39,26 +39,6 @@ -include("ssl_api.hrl"). -%%%-undef(trace). -%%%%%-define(trace, true). -%%%-ifdef(trace). -%%%trace(Module, FunctionName, Line, Info, Value) -> -%%% erlang:display( -%%% [{erlang:convert_time_unit( -%%% erlang:monotonic_time() -%%% - erlang:system_info(start_time), native, microsecond), -%%% node(), self()}, -%%% {Module, FunctionName, Line}, Info, Value]), -%%% Value. -%%%-else. -%%%trace(_Module, _FunctionName, _Line, _Info, Value) -> Value. -%%%-endif. -%%%-undef(trace). -%%%-define( -%%% trace(Info, Body), -%%% trace(?MODULE, ?FUNCTION_NAME, ?LINE, (Info), begin Body end)). -trace(Term) -> Term. - %% ------------------------------------------------------------------------- childspecs() -> @@ -247,7 +227,8 @@ accept_loop(Driver, Listen, Kernel) -> Opts = get_ssl_options(server), wait_for_code_server(), case ssl:ssl_accept( - Socket, [{active, false}, {packet, 4}] ++ Opts) of + Socket, [{active, false}, {packet, 4}] ++ Opts, + net_kernel:connecttime()) of {ok, #sslsocket{pid = DistCtrl} = SslSocket} -> monitor_pid(DistCtrl), trace( @@ -381,7 +362,8 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> case ssl:connect( Ip, TcpPort, [binary, {active, false}, {packet, 4}, - Driver:family(), nodelay()] ++ Opts) of + Driver:family(), nodelay()] ++ Opts, + net_kernel:connecttime()) of {ok, #sslsocket{pid = DistCtrl} = SslSocket} -> monitor_pid(DistCtrl), ok = ssl:controlling_process(SslSocket, self()), @@ -635,24 +617,27 @@ verify_fun(Value) -> %% ------------------------------------------------------------------------- +%% Trace point +trace(Term) -> Term. + %% Keep an eye on distribution Pid:s we know of monitor_pid(Pid) -> - spawn( - fun () -> - MRef = erlang:monitor(process, Pid), - receive - {'DOWN', MRef, _, _, normal} -> - error_logger:error_report( - [dist_proc_died, - {reason, normal}, - {pid, Pid}]); - {'DOWN', MRef, _, _, Reason} -> - error_logger:info_report( - [dist_proc_died, - {reason, Reason}, - {pid, Pid}]) - end - end), + %%spawn( + %% fun () -> + %% MRef = erlang:monitor(process, Pid), + %% receive + %% {'DOWN', MRef, _, _, normal} -> + %% error_logger:error_report( + %% [dist_proc_died, + %% {reason, normal}, + %% {pid, Pid}]); + %% {'DOWN', MRef, _, _, Reason} -> + %% error_logger:info_report( + %% [dist_proc_died, + %% {reason, Reason}, + %% {pid, Pid}]) + %% end + %% end), Pid. dbg() -> -- cgit v1.2.3 From cb1b585a710a3f5c724749ce267f55d3c1f85503 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 29 Sep 2017 14:39:12 +0200 Subject: Update runtime dependencies --- lib/ssl/src/ssl.app.src | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/ssl.app.src b/lib/ssl/src/ssl.app.src index a75193a008..afb9c0f14c 100644 --- a/lib/ssl/src/ssl.app.src +++ b/lib/ssl/src/ssl.app.src @@ -62,7 +62,7 @@ {applications, [crypto, public_key, kernel, stdlib]}, {env, []}, {mod, {ssl_app, []}}, - {runtime_dependencies, ["stdlib-3.2","public_key-1.2","kernel-3.0", - "erts-7.0","crypto-3.3", "inets-5.10.7"]}]}. + {runtime_dependencies, ["stdlib-3.2","public_key-1.2","kernel-6.0", + "erts-10.0","crypto-3.3", "inets-5.10.7"]}]}. -- cgit v1.2.3