%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% ------------------------------------------------------------------------- %% %% Module for encrypted Erlang protocol - a minimal encrypted %% distribution protocol based on only a shared secret %% and the crypto application %% -module(inet_crypto_dist). -define(DIST_NAME, inet_crypto). -define(DIST_PROTO, crypto). -define(DRIVER, inet_tcp). -define(FAMILY, inet). -export([listen/1, accept/1, accept_connection/5, setup/5, close/1, select/1, is_node_name/1]). %% Generalized dist API, for sibling IPv6 module inet6_crypto_dist -export([gen_listen/2, gen_accept/2, gen_accept_connection/6, gen_setup/6, gen_close/2, gen_select/2]). -export([nodelay/0]). %% Debug %%%-compile(export_all). -export([dbg/0, test_server/0, test_client/1]). -include_lib("kernel/include/net_address.hrl"). -include_lib("kernel/include/dist.hrl"). -include_lib("kernel/include/dist_util.hrl"). -define(PACKET_SIZE, 65536). -define(BUFFER_SIZE, (?PACKET_SIZE bsl 4)). %% ------------------------------------------------------------------------- -record(params, {socket, dist_handle, hmac_algorithm = sha256, aead_cipher = aes_gcm, rekey_key, iv = 12, key = 16, tag_len = 16, rekey_interval = 262144 }). params(Socket) -> #params{socket = Socket}. -record(key_pair, {type = ecdh, %% The curve choice greatly affects setup time, %% we really want an Edwards curve but that would %% require a very new openssl version. %% Twisted brainpool curves (*t1) are faster than %% non-twisted (*r1), 256 is much faster than 384, %% and so on... %%% params = brainpoolP384t1, params = brainpoolP256t1, public, private}). -define(KEY_PAIR_LIFE_TIME, 3600000). % 1 hour -define(KEY_PAIR_LIFE_COUNT, 256). % Number of connection setups %% ------------------------------------------------------------------------- %% Keep the node's public/private key pair in the process state %% of a key pair server linked to the acceptor process. %% Create the key pair the first time it is needed %% so crypto gets time to start first. %% start_key_pair_server() -> monitor_dist_proc( spawn_link( fun () -> register(?MODULE, self()), key_pair_server() end)). key_pair_server() -> key_pair_server(undefined, undefined, undefined). %% key_pair_server(KeyPair) -> key_pair_server( KeyPair, erlang:start_timer(?KEY_PAIR_LIFE_TIME, self(), discard), ?KEY_PAIR_LIFE_COUNT). %% key_pair_server(_KeyPair, Timer, 0) -> cancel_timer(Timer), key_pair_server(); key_pair_server(KeyPair, Timer, Count) -> receive {Pid, Tag, get_key_pair} -> case KeyPair of undefined -> KeyPair_1 = generate_key_pair(), Pid ! {Tag, KeyPair_1}, key_pair_server(KeyPair_1); #key_pair{} -> Pid ! {Tag, KeyPair}, key_pair_server(KeyPair, Timer, Count - 1) end; {Pid, Tag, get_new_key_pair} -> cancel_timer(Timer), KeyPair_1 = generate_key_pair(), Pid ! {Tag, KeyPair_1}, key_pair_server(KeyPair_1); {timeout, Timer, discard} when is_reference(Timer) -> key_pair_server() end. generate_key_pair() -> #key_pair{type = Type, params = Params} = #key_pair{}, {Public, Private} = crypto:generate_key(Type, Params), #key_pair{public = Public, private = Private}. cancel_timer(undefined) -> ok; cancel_timer(Timer) -> case erlang:cancel_timer(Timer) of false -> receive {timeout, Timer, _} -> ok end; _RemainingTime -> ok end. get_key_pair() -> call_key_pair_server(get_key_pair). get_new_key_pair() -> call_key_pair_server(get_new_key_pair). call_key_pair_server(Request) -> Pid = whereis(?MODULE), Ref = erlang:monitor(process, Pid), Pid ! {self(), Ref, Request}, receive {Ref, Reply} -> erlang:demonitor(Ref, [flush]), Reply; {'DOWN', Ref, process, Pid, Reason} -> error(Reason) end. compute_shared_secret( #key_pair{ type = PublicKeyType, params = PublicKeyParams, private = PrivKey}, PubKey) -> %% crypto:compute_key(PublicKeyType, PubKey, PrivKey, PublicKeyParams). %% ------------------------------------------------------------------------- %% Erlang distribution plugin structure explained to myself %% ------- %% These are the processes involved in the distribution: %% * net_kernel %% * The Acceptor %% * The Controller | Handshaker | Ticker %% * The DistCtrl process that may be split into: %% + The Output controller %% + The Input controller %% For the regular inet_tcp_dist distribution module, DistCtrl %% is not one or two processes, but one port - a gen_tcp socket %% %% When the VM is started with the argument "-proto_dist inet_crypto" %% net_kernel registers the module inet_crypto_dist acli,oams distribution %% module. net_kernel calls listen/1 to create a listen socket %% and then accept/1 with the listen socket as argument to spawn %% the Acceptor process, which is linked to net_kernel. Apparently %% the listen socket is owned by net_kernel - I wonder if it could %% be owned by the Acceptor process instead... %% %% The Acceptor process calls blocking accept on the listen socket %% and when an incoming socket is returned it spawns the DistCtrl %% process a linked to the Acceptor. The ownership of the accepted %% socket is transferred to the DistCtrl process. %% A message is sent to net_kernel to inform it that an incoming %% connection has appeared and the Acceptor awaits a reply from net_kernel. %% %% net_kernel then calls accept_connection/5 to spawn the Controller | %% Handshaker | Ticker process that is linked to net_kernel. %% The Controller then awaits a message from the Acceptor process. %% %% When net_kernel has spawned the Controller it replies with a message %% to the Acceptor that then calls DistCtrl to changes its links %% so DistCtrl ends up linked to the Controller and not to the Acceptor. %% The Acceptor then sends a message to the Controller. The Controller %% then changes role into the Handshaker creates a #hs_data{} record %% and calls dist_util:handshake_other_started/1. After this %% the Acceptor goes back into a blocking accept on the listen socket. %% %% For the regular distribution inet_tcp_dist DistCtrl is a gen_tcp socket %% and when it is a process it also acts as a socket. The #hs_data{} %% record used by dist_util presents a set of funs that are used %% by dist_util to perform the distribution handshake. These funs %% make sure to transfer the handshake messages through the DistCtrl %% "socket". %% %% When the handshake is finished a fun for this purpose in #hs_data{} %% is called, which tells DistCtrl that it does not need to be prepared %% for any more #hs_data{} handshake calls. The DistCtrl process in this %% module then spawns the Input controller process that gets ownership %% of the connection's gen_tcp socket and changes into {active, N} mode %% so now it gets all incoming traffic and delivers that to the VM. %% The original DistCtrl process changes role into the Output controller %% process and starts asking the VM for outbound messages and transfers %% them on the connection socket. %% %% The Handshaker now changes into the Ticker role, and uses only two %% functions in the #hs_data{} record; one to get socket statistics %% and one to send a tick. None of these may block for any reason %% in particular not for a congested socket since that would destroy %% connection supervision. %% %% %% For an connection net_kernel calls setup/5 which spawns the %% Controller process as linked to net_kernel. This Controller process %% connects to the other node's listen socket and when that is succesful %% spawns the DistCtrl process as linked to the controller and transfers %% socket ownership to it. %% %% Then the Controller creates the #hs_data{} record and calls %% dist_util:handshake_we_started/1 which changes the process role %% into Handshaker. %% %% When the distribution handshake is finished the procedure is just %% as for an incoming connection above. %% %% %% To sum it up. %% %% There is an Acceptor process that is linked to net_kernel and %% informs it when new connections arrive. %% %% net_kernel spawns Controllers for incoming and for outgoing connections. %% these Controllers use the DistCtrl processes to do distribution %% handshake and after that becomes Tickers that supervise the connection. %% %% The Controller | Handshaker | Ticker is linked to net_kernel, and to %% DistCtrl, one or both. If any of these connection processes would die %% all others should be killed by the links. Therefore none of them may %% terminate with reason 'normal'. %% ------------------------------------------------------------------------- -compile({inline, [socket_options/0]}). socket_options() -> [binary, {active, false}, {packet, 2}, {nodelay, true}, {sndbuf, ?BUFFER_SIZE}, {recbuf, ?BUFFER_SIZE}, {buffer, ?BUFFER_SIZE}]. %% ------------------------------------------------------------------------- %% select/1 is called by net_kernel to ask if this distribution protocol %% is willing to handle Node %% select(Node) -> gen_select(Node, ?DRIVER). gen_select(Node, Driver) -> case dist_util:split_node(Node) of {node, _, Host} -> case Driver:getaddr(Host) of {ok, _} -> true; _ -> false end; _ -> false end. %% ------------------------------------------------------------------------- is_node_name(Node) -> dist_util:is_node_name(Node). %% ------------------------------------------------------------------------- %% Called by net_kernel to create a listen socket for this %% distribution protocol. This listen socket is used by %% the Acceptor process. %% listen(Name) -> gen_listen(Name, ?DRIVER). gen_listen(Name, Driver) -> case inet_tcp_dist:gen_listen(Driver, Name) of {ok, {Socket, Address, Creation}} -> inet:setopts(Socket, socket_options()), {ok, {Socket, Address#net_address{protocol = ?DIST_PROTO}, Creation}}; Other -> Other end. %% ------------------------------------------------------------------------- %% Called by net_kernel to spawn the Acceptor process that awaits %% new connection in a blocking accept and informs net_kernel %% when a new connection has appeared, and starts the DistCtrl %% "socket" process for the connection. %% accept(Listen) -> gen_accept(Listen, ?DRIVER). gen_accept(Listen, Driver) -> NetKernel = self(), %% %% Spawn Acceptor process %% monitor_dist_proc( spawn_opt( fun () -> start_key_pair_server(), accept_loop(Listen, Driver, NetKernel) end, [link, {priority, max}])). accept_loop(Listen, Driver, NetKernel) -> case Driver:accept(trace(Listen)) of {ok, Socket} -> wait_for_code_server(), Timeout = net_kernel:connecttime(), DistCtrl = start_dist_ctrl(trace(Socket), Timeout), %% DistCtrl is a "socket" NetKernel ! trace({accept, self(), DistCtrl, Driver:family(), ?DIST_PROTO}), receive {NetKernel, controller, Controller} -> call_dist_ctrl(DistCtrl, {controller, Controller, self()}), Controller ! {self(), controller, Socket}; {NetKernel, unsupported_protocol} -> exit(unsupported_protocol) end, accept_loop(Listen, Driver, NetKernel); AcceptError -> exit({accept, AcceptError}) end. wait_for_code_server() -> %% This is an ugly hack. Starting encryption on a connection %% requires the crypto module to be loaded. Loading the crypto %% module triggers its on_load function, which calls %% code:priv_dir/1 to find the directory where its NIF library is. %% However, distribution is started earlier than the code server, %% so the code server is not necessarily started yet, and %% code:priv_dir/1 might fail because of that, if we receive %% an incoming connection on the distribution port early enough. %% %% If the on_load function of a module fails, the module is %% unloaded, and the function call that triggered loading it fails %% with 'undef', which is rather confusing. %% %% So let's avoid that by waiting for the code server to start. %% case whereis(code_server) of undefined -> timer:sleep(10), wait_for_code_server(); Pid when is_pid(Pid) -> ok end. %% ------------------------------------------------------------------------- %% Called by net_kernel when a new connection has appeared, to spawn %% a Controller process that performs the handshake with the new node, %% and then becomes the Ticker connection supervisor. %% ------------------------------------------------------------------------- accept_connection(Acceptor, DistCtrl, MyNode, Allowed, SetupTime) -> gen_accept_connection( Acceptor, DistCtrl, MyNode, Allowed, SetupTime, ?DRIVER). gen_accept_connection( Acceptor, DistCtrl, MyNode, Allowed, SetupTime, Driver) -> NetKernel = self(), %% %% Spawn Controller/handshaker/ticker process %% monitor_dist_proc( spawn_opt( fun() -> do_accept( Acceptor, DistCtrl, trace(MyNode), Allowed, SetupTime, Driver, NetKernel) end, [link, {priority, max}])). do_accept( Acceptor, DistCtrl, MyNode, Allowed, SetupTime, Driver, NetKernel) -> %% receive {Acceptor, controller, Socket} -> Timer = dist_util:start_timer(SetupTime), HSData = hs_data_common( NetKernel, MyNode, DistCtrl, Timer, Socket, Driver:family()), HSData_1 = HSData#hs_data{ this_node = MyNode, this_flags = 0, allowed = Allowed}, dist_util:handshake_other_started(trace(HSData_1)) end. %% ------------------------------------------------------------------------- %% Called by net_kernel to spawn a Controller process that sets up %% a new connection to another Erlang node, performs the handshake %% with the other it, and then becomes the Ticker process %% that supervises the connection. %% ------------------------------------------------------------------------- setup(Node, Type, MyNode, LongOrShortNames, SetupTime) -> gen_setup(Node, Type, MyNode, LongOrShortNames, SetupTime, ?DRIVER). gen_setup(Node, Type, MyNode, LongOrShortNames, SetupTime, Driver) -> NetKernel = self(), %% %% Spawn Controller/handshaker/ticker process %% monitor_dist_proc( spawn_opt( setup_fun( Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel), [link, {priority, max}])). -spec setup_fun(_,_,_,_,_,_,_) -> fun(() -> no_return()). setup_fun( Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel) -> %% fun() -> do_setup( trace(Node), Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel) end. -spec do_setup(_,_,_,_,_,_,_) -> no_return(). do_setup( Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel) -> %% {Name, Address} = split_node(Driver, Node, LongOrShortNames), ErlEpmd = net_kernel:epmd_module(), {ARMod, ARFun} = get_address_resolver(ErlEpmd, Driver), Timer = trace(dist_util:start_timer(SetupTime)), case ARMod:ARFun(Name, Address, Driver:family()) of {ok, Ip, TcpPort, Version} -> do_setup_connect( Node, Type, MyNode, Timer, Driver, NetKernel, Ip, TcpPort, Version); {ok, Ip} -> case ErlEpmd:port_please(Name, Ip) of {port, TcpPort, Version} -> do_setup_connect( Node, Type, MyNode, Timer, Driver, NetKernel, Ip, TcpPort, trace(Version)); Other -> _ = trace( {ErlEpmd, port_please, [Name, Ip], Other}), ?shutdown(Node) end; Other -> _ = trace( {ARMod, ARFun, [Name, Address, Driver:family()], Other}), ?shutdown(Node) end. -spec do_setup_connect(_,_,_,_,_,_,_,_,_) -> no_return(). do_setup_connect( Node, Type, MyNode, Timer, Driver, NetKernel, Ip, TcpPort, Version) -> dist_util:reset_timer(Timer), ConnectOpts = trace(connect_options(socket_options())), case Driver:connect(Ip, TcpPort, ConnectOpts) of {ok, Socket} -> DistCtrl = try start_dist_ctrl(Socket, net_kernel:connecttime()) catch error : {dist_ctrl, _} = DistCtrlError -> _ = trace(DistCtrlError), ?shutdown(Node) end, %% DistCtrl is a "socket" HSData = hs_data_common( NetKernel, MyNode, DistCtrl, Timer, Socket, Driver:family()), HSData_1 = HSData#hs_data{ other_node = Node, this_flags = 0, other_version = Version, request_type = Type}, dist_util:handshake_we_started(trace(HSData_1)); ConnectError -> _ = trace( {Driver, connect, [Ip, TcpPort, ConnectOpts], ConnectError}), ?shutdown(Node) end. %% ------------------------------------------------------------------------- %% close/1 is only called by net_kernel on the socket returned by listen/1. close(Socket) -> gen_close(Socket, ?DRIVER). gen_close(Socket, Driver) -> Driver:close(trace(Socket)). %% ------------------------------------------------------------------------- hs_data_common(NetKernel, MyNode, DistCtrl, Timer, Socket, Family) -> %% Field 'socket' below is set to DistCtrl, which makes %% the distribution handshake process (ticker) call %% the funs below with DistCtrl as the S argument. %% So, S =:= DistCtrl below... #hs_data{ kernel_pid = NetKernel, this_node = MyNode, socket = DistCtrl, timer = Timer, %% f_send = % -> ok | {error, closed}=>?shutdown() fun (S, Packet) when S =:= DistCtrl -> try call_dist_ctrl(S, {send, Packet}) catch error : {dist_ctrl, Reason} -> _ = trace(Reason), {error, closed} end end, f_recv = % -> {ok, List} | Other=>?shutdown() fun (S, 0, infinity) when S =:= DistCtrl -> try call_dist_ctrl(S, recv) of {ok, Bin} when is_binary(Bin) -> {ok, binary_to_list(Bin)}; Error -> Error catch error : {dist_ctrl, Reason} -> {error, trace(Reason)} end end, f_setopts_pre_nodeup = fun (S) when S =:= DistCtrl -> ok end, f_setopts_post_nodeup = fun (S) when S =:= DistCtrl -> ok end, f_getll = fun (S) when S =:= DistCtrl -> {ok, S} %% DistCtrl is the distribution port end, f_address = % -> #net_address{} | ?shutdown() fun (S, Node) when S =:= DistCtrl -> try call_dist_ctrl(S, peername) of {ok, Address} -> case dist_util:split_node(Node) of {node, _, Host} -> #net_address{ address = Address, host = Host, protocol = ?DIST_PROTO, family = Family}; _ -> ?shutdown(Node) end; Error -> _ = trace(Error), ?shutdown(Node) catch error : {dist_ctrl, Reason} -> _ = trace(Reason), ?shutdown(Node) end end, f_handshake_complete = % -> ok | ?shutdown() fun (S, Node, DistHandle) when S =:= DistCtrl -> try call_dist_ctrl(S, {handshake_complete, DistHandle}) catch error : {dist_ctrl, Reason} -> _ = trace(Reason), ?shutdown(Node) end end, %% %% mf_tick/1, mf_getstat/1, mf_setopts/2 and mf_getopts/2 %% are called by the ticker any time after f_handshake_complete/3 %% so they may not block the caller even for congested socket mf_tick = fun (S) when S =:= DistCtrl -> S ! dist_tick end, mf_getstat = % -> {ok, RecvCnt, SendCnt, SendPend} | Other=>ignore_it fun (S) when S =:= DistCtrl -> case inet:getstat(Socket, [recv_cnt, send_cnt, send_pend]) of {ok, Stat} -> split_stat(Stat, 0, 0, 0); Error -> trace(Error) end end, mf_setopts = fun (S, Opts) when S =:= DistCtrl -> inet:setopts(Socket, setopts_filter(Opts)) end, mf_getopts = fun (S, Opts) when S =:= DistCtrl -> inet:getopts(Socket, Opts) end}. setopts_filter(Opts) -> [Opt || Opt <- Opts, case Opt of {K, _} when K =:= active; K =:= deliver; K =:= packet -> false; K when K =:= list; K =:= binary -> false; K when K =:= inet; K =:= inet6 -> false; _ -> true end]. split_stat([{recv_cnt, R}|Stat], _, W, P) -> split_stat(Stat, R, W, P); split_stat([{send_cnt, W}|Stat], R, _, P) -> split_stat(Stat, R, W, P); split_stat([{send_pend, P}|Stat], R, W, _) -> split_stat(Stat, R, W, P); split_stat([], R, W, P) -> {ok, R, W, P}. %% ------------------------------------------------------------ %% Determine if EPMD module supports address resolving. Default %% is to use inet_tcp:getaddr/2. %% ------------------------------------------------------------ get_address_resolver(EpmdModule, _Driver) -> case erlang:function_exported(EpmdModule, address_please, 3) of true -> {EpmdModule, address_please}; _ -> {erl_epmd, address_please} end. %% If Node is illegal terminate the connection setup!! split_node(Driver, Node, LongOrShortNames) -> case dist_util:split_node(Node) of {node, Name, Host} -> check_node(Driver, Node, Name, Host, LongOrShortNames); {host, _} -> error_logger:error_msg( "** Nodename ~p illegal, no '@' character **~n", [Node]), ?shutdown2(Node, trace({illegal_node_n@me, Node})); _ -> error_logger:error_msg( "** Nodename ~p illegal **~n", [Node]), ?shutdown2(Node, trace({illegal_node_name, Node})) end. check_node(Driver, Node, Name, Host, LongOrShortNames) -> case string:split(Host, ".", all) of [_] when LongOrShortNames =:= longnames -> case Driver:parse_address(Host) of {ok, _} -> {Name, Host}; _ -> error_logger:error_msg( "** System running to use " "fully qualified hostnames **~n" "** Hostname ~s is illegal **~n", [Host]), ?shutdown2(Node, trace({not_longnames, Host})) end; [_, _|_] when LongOrShortNames =:= shortnames -> error_logger:error_msg( "** System NOT running to use " "fully qualified hostnames **~n" "** Hostname ~s is illegal **~n", [Host]), ?shutdown2(Node, trace({not_shortnames, Host})); _ -> {Name, Host} end. %% ------------------------------------------------------------------------- connect_options(Opts) -> case application:get_env(kernel, inet_dist_connect_options) of {ok, ConnectOpts} -> Opts ++ setopts_filter(ConnectOpts); _ -> Opts end. %% we may not always want the nodelay behaviour %% for performance reasons nodelay() -> case application:get_env(kernel, dist_nodelay) of undefined -> {nodelay, true}; {ok, true} -> {nodelay, true}; {ok, false} -> {nodelay, false}; _ -> {nodelay, true} end. %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% The DistCtrl process(es). %% %% At net_kernel handshake_complete spawns off the input controller that %% takes over the socket ownership, and itself becomes the output controller %% %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% XXX Missing to "productified": %%% * Cryptoanalysis by experts, this is crypto amateur work. %%% * Is it useful over inet_tls_dist; i.e to not have to bother %%% with certificates but instead manage a secret cluster cookie? %%% * An application to belong to (kernel) %%% * Restart and/or code reload policy (not needed in kernel) %%% * Fitting into the epmd/Erlang distro protocol version framework %%% (something needs to be created for multiple protocols, epmd, %%% multiple address families, fallback to previous version, etc) %% Debug client and server test_server() -> {ok, Listen} = gen_tcp:listen(0, socket_options()), {ok, Port} = inet:port(Listen), io:format(?MODULE_STRING":test_client(~w).~n", [Port]), {ok, Socket} = gen_tcp:accept(Listen), test(Socket). test_client(Port) -> {ok, Socket} = gen_tcp:connect(localhost, Port, socket_options()), test(Socket). test(Socket) -> start_dist_ctrl(Socket, 10000). %% ------------------------------------------------------------------------- start_dist_ctrl(Socket, Timeout) -> Secret = atom_to_binary(auth:get_cookie(), latin1), Controller = self(), Server = monitor_dist_proc( spawn_opt( fun () -> receive {?MODULE, From, start} -> {SendParams, RecvParams} = init(Socket, Secret), reply(From, self()), handshake(SendParams, 1, RecvParams, 1, Controller) end end, [link, {priority, max}, {message_queue_data, off_heap}, {fullsweep_after, 0}])), ok = gen_tcp:controlling_process(Socket, Server), call_dist_ctrl(Server, start, Timeout). call_dist_ctrl(Server, Msg) -> call_dist_ctrl(Server, Msg, infinity). %% call_dist_ctrl(Server, Msg, Timeout) -> Ref = erlang:monitor(process, Server), Server ! {?MODULE, {Ref, self()}, Msg}, receive {Ref, Res} -> erlang:demonitor(Ref, [flush]), Res; {'DOWN', Ref, process, Server, Reason} -> error({dist_ctrl, Reason}) after Timeout -> % Timeout < infinity is only used by start_dist_ctrl/2 receive {'DOWN', Ref, process, Server, _} -> receive {Ref, _} -> ok after 0 -> ok end, error({dist_ctrl, timeout}) %% Server will be killed by link end end. reply({Ref, Pid}, Msg) -> Pid ! {Ref, Msg}, ok. %% ------------------------------------------------------------------------- -define(TCP_ACTIVE, 16). -define(CHUNK_SIZE, (?PACKET_SIZE - 512)). -define(HANDSHAKE_CHUNK, 1). -define(DATA_CHUNK, 2). -define(TICK_CHUNK, 3). -define(REKEY_CHUNK, 4). %% ------------------------------------------------------------------------- %% Crypto strategy %% ------- %% The crypto strategy is as simple as possible to get an encrypted %% connection as benchmark reference. It is geared around AEAD %% ciphers in particular AES-GCM. %% %% The init message and the start message must fit in the TCP buffers %% since both sides start with sending the init message, waits %% for the other end's init message, sends the start message %% and waits for the other end's start message. So if the send %% blocks we have a deadlock. %% %% The init + start sequence tries to implement Password Encrypted %% Key Exchange using a node public/private key pair and the %% shared secret (the Cookie) to create session encryption keys %% that can not be re-created if the shared secret is compromized, %% which should create forward secrecy. You need both nodes' %% key pairs and the shared secret to decrypt the traffic %% between the nodes. %% %% All exchanged messages uses {packet, 2} i.e 16 bit size header. %% %% The init message contains a random number and encrypted: the public key %% and two random numbers. The encryption is done with Key and IV hashed %% from the unencrypted random number and the shared secret. %% %% The other node's public key is used with the own node's private %% key to create a shared key that is hashed with one of the encrypted %% random numbers from each side to create Key and IV for the session. %% %% The start message contains the two encrypted random numbers %% this time encrypted with the session keys for verification %% by the other side, plus the rekey interval. The rekey interval %% is just there to get an early check for if the other side's %% maximum rekey interal is acceptable, it is just an embryo %% of some better check. Any side may rekey earlier but if the %% rekey interval is exceeded the connection fails. %% %% Subsequent encrypted messages has the sequence number and the length %% of the message as AAD data, and an incrementing IV. These messages %% has got a message type that differentiates data from ticks and rekeys. %% Ticks have a random size in an attempt to make them less obvious to spot. %% %% Rekeying is done by the sender that creates a new key pair and %% a new shared secret from the other end's public key and with %% this and the current key and iv hashes a new key and iv. %% The new public key is sent to the other end that uses it %% and its old private key to create the same new shared %% secret and from that a new key and iv. %% So the receiver keeps its private key, and the sender keeps %% the receivers public key for the connection's life time. %% While the sender generates a new key pair at every rekey, %% which changes the shared secret at every rekey. %% %% The only reaction to errors is to crash noisily (?) wich will bring %% down the connection and hopefully produce something useful %% in the local log, but all the other end sees is a closed connection. %% ------------------------------------------------------------------------- init(Socket, Secret) -> #key_pair{public = PubKey} = KeyPair = get_key_pair(), Params = params(Socket), {R2, R3, Msg} = init_msg(Params, PubKey, Secret), ok = gen_tcp:send(Socket, Msg), init_recv(Params, Secret, KeyPair, R2, R3). init_recv( #params{socket = Socket, iv = IVLen} = Params, Secret, KeyPair, R2, R3) -> %% {ok, InitMsg} = gen_tcp:recv(Socket, 0), IVSaltLen = IVLen - 6, try case init_msg(Params, Secret, KeyPair, R2, R3, InitMsg) of {#params{iv = <>} = SendParams, RecvParams, SendStartMsg} -> ok = gen_tcp:send(Socket, SendStartMsg), {ok, RecvStartMsg} = gen_tcp:recv(Socket, 0), #params{ iv = <>} = RecvParams_1 = start_msg(RecvParams, R2, R3, RecvStartMsg), {SendParams#params{iv = {IV2ASalt, IV2ANo}}, RecvParams_1#params{iv = {IV2BSalt, IV2BNo}}} end catch error : Reason : Stacktrace-> _ = trace({Reason, Stacktrace}), exit(connection_closed) end. init_msg( #params{ hmac_algorithm = HmacAlgo, aead_cipher = AeadCipher, key = KeyLen, iv = IVLen, tag_len = TagLen}, PubKeyA, Secret) -> %% RLen = KeyLen + IVLen, <> = crypto:strong_rand_bytes(3 * RLen), {Key1A, IV1A} = hmac_key_iv(HmacAlgo, R1A, Secret, KeyLen, IVLen), Plaintext = [R2A, R3A, PubKeyA], MsgLen = byte_size(R1A) + TagLen + iolist_size(Plaintext), AAD = [<>, R1A], {Ciphertext, Tag} = crypto:block_encrypt(AeadCipher, Key1A, IV1A, {AAD, Plaintext, TagLen}), Msg = [R1A, Tag, Ciphertext], {R2A, R3A, Msg}. %% init_msg( #params{ hmac_algorithm = HmacAlgo, aead_cipher = AeadCipher, key = KeyLen, iv = IVLen, tag_len = TagLen, rekey_interval = RekeyInterval} = Params, Secret, KeyPair, R2A, R3A, Msg) -> %% RLen = KeyLen + IVLen, case Msg of <> -> {Key1B, IV1B} = hmac_key_iv(HmacAlgo, R1B, Secret, KeyLen, IVLen), MsgLen = byte_size(Msg), AAD = [<>, R1B], case crypto:block_decrypt( AeadCipher, Key1B, IV1B, {AAD, Ciphertext, Tag}) of <> -> SharedSecret = compute_shared_secret(KeyPair, PubKeyB), %% {Key2A, IV2A} = hmac_key_iv( HmacAlgo, SharedSecret, [R2A, R3B], KeyLen, IVLen), SendParams = Params#params{ rekey_key = PubKeyB, key = Key2A, iv = IV2A}, %% StartCleartext = [R2B, R3B, <>], StartMsgLen = TagLen + iolist_size(StartCleartext), StartAAD = <>, {StartCiphertext, StartTag} = crypto:block_encrypt( AeadCipher, Key2A, IV2A, {StartAAD, StartCleartext, TagLen}), StartMsg = [StartTag, StartCiphertext], %% {Key2B, IV2B} = hmac_key_iv( HmacAlgo, SharedSecret, [R2B, R3A], KeyLen, IVLen), RecvParams = Params#params{ rekey_key = KeyPair, key = Key2B, iv = IV2B}, %% {SendParams, RecvParams, StartMsg} end end. start_msg( #params{ aead_cipher = AeadCipher, key = Key2B, iv = IV2B, tag_len = TagLen, rekey_interval = RekeyIntervalA} = RecvParams, R2A, R3A, Msg) -> %% case Msg of <> -> KeyLen = byte_size(Key2B), IVLen = byte_size(IV2B), RLen = KeyLen + IVLen, MsgLen = byte_size(Msg), AAD = <>, case crypto:block_decrypt( AeadCipher, Key2B, IV2B, {AAD, Ciphertext, Tag}) of <> when RekeyIntervalA =< (RekeyIntervalB bsl 2), RekeyIntervalB =< (RekeyIntervalA bsl 2) -> RecvParams#params{rekey_interval = RekeyIntervalB} end end. hmac_key_iv(HmacAlgo, MacKey, Data, KeyLen, IVLen) -> <> = crypto:hmac(HmacAlgo, MacKey, Data, KeyLen + IVLen), {Key, IV}. %% ------------------------------------------------------------------------- %% net_kernel distribution handshake in progress %% handshake( SendParams, SendSeq, #params{socket = Socket} = RecvParams, RecvSeq, Controller) -> receive {?MODULE, From, {controller, Controller_1, Parent}} -> Result = link(Controller_1), true = unlink(Parent), reply(From, Result), handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller_1); {?MODULE, From, {handshake_complete, DistHandle}} -> InputHandler = monitor_dist_proc( spawn_opt( fun () -> link(Controller), receive DistHandle -> ok = inet:setopts( Socket, [{active, ?TCP_ACTIVE}, nodelay()]), input_handler( RecvParams#params{ dist_handle = DistHandle}, RecvSeq, empty_q(), infinity) end end, [link, {priority, normal}, {message_queue_data, off_heap}, {fullsweep_after, 0}])), _ = monitor(process, InputHandler), % For the benchmark test ok = gen_tcp:controlling_process(Socket, InputHandler), ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler), InputHandler ! DistHandle, crypto:rand_seed_alg(crypto_cache), reply(From, ok), process_flag(priority, normal), erlang:dist_ctrl_get_data_notification(DistHandle), output_handler( SendParams#params{dist_handle = DistHandle}, SendSeq); %% {?MODULE, From, {send, Data}} -> case encrypt_and_send_chunk( SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data]) of {SendParams_1, SendSeq_1, ok} -> reply(From, ok), handshake( SendParams_1, SendSeq_1, RecvParams, RecvSeq, Controller); {_, _, Error} -> reply(From, {error, closed}), death_row({send, trace(Error)}) end; {?MODULE, From, recv} -> case recv_and_decrypt_chunk(RecvParams, RecvSeq) of {RecvParams_1, RecvSeq_1, {ok, _} = Reply} -> reply(From, Reply), handshake( SendParams, SendSeq, RecvParams_1, RecvSeq_1, Controller); {_, _, Error} -> reply(From, Error), death_row({recv, trace(Error)}) end; {?MODULE, From, peername} -> reply(From, inet:peername(Socket)), handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller); %% _Alien -> handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller) end. recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) -> case gen_tcp:recv(Socket, 0) of {ok, Chunk} -> case decrypt_chunk(RecvParams, RecvSeq, Chunk) of <> -> {RecvParams, RecvSeq + 1, {ok, Cleartext}}; OtherChunk when is_binary(OtherChunk) -> {RecvParams, RecvSeq + 1, {error, decrypt_error}}; #params{} = RecvParams_1 -> recv_and_decrypt_chunk(RecvParams_1, 0); error -> {RecvParams, RecvSeq, {error, decrypt_error}} end; Error -> {RecvParams, RecvSeq, Error} end. %% ------------------------------------------------------------------------- %% Output handler process %% %% The game here is to flush all dist_data and dist_tick messages, %% prioritize dist_data over dist_tick, and to not use selective receive output_handler(Params, Seq) -> receive Msg -> case Msg of dist_data -> output_handler_data(Params, Seq); dist_tick -> output_handler_tick(Params, Seq); Other -> %% Ignore _ = trace(Other), output_handler(Params, Seq) end end. output_handler_data(Params, Seq) -> receive Msg -> case Msg of dist_data -> output_handler_data(Params, Seq); dist_tick -> output_handler_data(Params, Seq); Other -> %% Ignore _ = trace(Other), output_handler_data(Params, Seq) end after 0 -> DistHandle = Params#params.dist_handle, Q = get_data(DistHandle, empty_q()), {Params_1, Seq_1} = output_handler_send(Params, Seq, Q), erlang:dist_ctrl_get_data_notification(DistHandle), output_handler(Params_1, Seq_1) end. output_handler_tick(Params, Seq) -> receive Msg -> case Msg of dist_data -> output_handler_data(Params, Seq); dist_tick -> output_handler_tick(Params, Seq); Other -> %% Ignore _ = trace(Other), output_handler_tick(Params, Seq) end after 0 -> TickSize = 7 + rand:uniform(56), TickData = binary:copy(<<0>>, TickSize), case encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData]) of {Params_1, Seq_1, ok} -> output_handler(Params_1, Seq_1); {_, _, Error} -> _ = trace(Error), death_row() end end. output_handler_send(Params, Seq, {_, Size, _} = Q) -> if ?CHUNK_SIZE < Size -> output_handler_send(Params, Seq, Q, ?CHUNK_SIZE); true -> case get_data(Params#params.dist_handle, Q) of {_, 0, _} -> {Params, Seq}; {_, Size, _} = Q_1 -> % Got no more output_handler_send(Params, Seq, Q_1, Size); Q_1 -> output_handler_send(Params, Seq, Q_1) end end. output_handler_send(Params, Seq, Q, Size) -> {Cleartext, Q_1} = deq_iovec(Size, Q), case encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Cleartext]) of {Params_1, Seq_1, ok} -> output_handler_send(Params_1, Seq_1, Q_1); {_, _, Error} -> _ = trace(Error), death_row() end. %% ------------------------------------------------------------------------- %% Input handler process %% %% Here is T = 0|infinity to steer if we should try to receive %% more data or not; start with infinity, and when we get some %% data try with 0 to see if more is waiting input_handler(#params{socket = Socket} = Params, Seq, Q, T) -> receive Msg -> case Msg of {tcp_passive, Socket} -> ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}]), Q_1 = case T of 0 -> deliver_data(Params#params.dist_handle, Q); infinity -> Q end, input_handler(Params, Seq, Q_1, infinity); {tcp, Socket, Chunk} -> input_chunk(Params, Seq, Q, T, Chunk); {tcp_closed, Socket} -> exit(connection_closed); Other -> %% Ignore... _ = trace(Other), input_handler(Params, Seq, Q, T) end after T -> Q_1 = deliver_data(Params#params.dist_handle, Q), input_handler(Params, Seq, Q_1, infinity) end. input_chunk(Params, Seq, Q, T, Chunk) -> case decrypt_chunk(Params, Seq, Chunk) of <> -> input_handler(Params, Seq + 1, enq_binary(Cleartext, Q), 0); <> -> input_handler(Params, Seq + 1, Q, T); OtherChunk when is_binary(OtherChunk) -> _ = trace(invalid_chunk), exit(connection_closed); #params{} = Params_1 -> input_handler(Params_1, 0, Q, T); error -> _ = trace(decrypt_error), exit(connection_closed) end. %% ------------------------------------------------------------------------- %% erlang:dist_ctrl_* helpers %% Get data for sending from the VM and place it in a queue %% get_data(DistHandle, {Front, Size, Rear}) -> get_data(DistHandle, Front, Size, Rear). %% get_data(DistHandle, Front, Size, Rear) -> case erlang:dist_ctrl_get_data(DistHandle) of none -> {Front, Size, Rear}; Bin when is_binary(Bin) -> Len = byte_size(Bin), get_data( DistHandle, Front, Size + 4 + Len, [Bin, <>|Rear]); [Bin1, Bin2] -> Len = byte_size(Bin1) + byte_size(Bin2), get_data( DistHandle, Front, Size + 4 + Len, [Bin2, Bin1, <>|Rear]); Iovec -> Len = iolist_size(Iovec), get_data( DistHandle, Front, Size + 4 + Len, lists:reverse(Iovec, [<>|Rear])) end. %% De-packet and deliver received data to the VM from a queue %% deliver_data(DistHandle, Q) -> case Q of {[], Size, []} -> Size = 0, % Assert Q; {[], Size, Rear} -> [Bin|Front] = lists:reverse(Rear), deliver_data(DistHandle, Front, Size, [], Bin); {[Bin|Front], Size, Rear} -> deliver_data(DistHandle, Front, Size, Rear, Bin) end. %% deliver_data(DistHandle, Front, Size, Rear, Bin) -> case Bin of <> -> erlang:dist_ctrl_put_data(DistHandle, DataA), erlang:dist_ctrl_put_data(DistHandle, DataB), deliver_data( DistHandle, Front, Size - (4 + DataSizeA + 4 + DataSizeB), Rear, Rest); <> -> erlang:dist_ctrl_put_data(DistHandle, Data), deliver_data(DistHandle, Front, Size - (4 + DataSize), Rear, Rest); <> -> TotalSize = 4 + DataSize, if TotalSize =< Size -> BinSize = byte_size(Bin), {MoreData, Q} = deq_iovec( TotalSize - BinSize, Front, Size - BinSize, Rear), erlang:dist_ctrl_put_data(DistHandle, [FirstData|MoreData]), deliver_data(DistHandle, Q); true -> % Incomplete data {[Bin|Front], Size, Rear} end; <<_/binary>> -> BinSize = byte_size(Bin), if 4 =< Size -> % Fragmented header - extract a header bin {RestHeader, {Front_1, _Size_1, Rear_1}} = deq_iovec(4 - BinSize, Front, Size - BinSize, Rear), Header = iolist_to_binary([Bin|RestHeader]), deliver_data(DistHandle, Front_1, Size, Rear_1, Header); true -> % Incomplete header {[Bin|Front], Size, Rear} end end. %% ------------------------------------------------------------------------- %% Encryption and decryption helpers encrypt_and_send_chunk( #params{ socket = Socket, rekey_interval = Seq, rekey_key = PubKeyB, key = Key, iv = {IVSalt, IVNo}, hmac_algorithm = HmacAlgo} = Params, Seq, Cleartext) -> %% KeyLen = byte_size(Key), IVSaltLen = byte_size(IVSalt), #key_pair{public = PubKeyA} = KeyPair = get_new_key_pair(), case gen_tcp:send( Socket, encrypt_chunk(Params, Seq, [?REKEY_CHUNK, PubKeyA])) of ok -> SharedSecret = compute_shared_secret(KeyPair, PubKeyB), IV = <<(IVNo + Seq):48>>, {Key_1, <>} = hmac_key_iv( HmacAlgo, SharedSecret, [Key, IVSalt, IV], KeyLen, IVSaltLen + 6), Params_1 = Params#params{key = Key_1, iv = {IVSalt_1, IVNo_1}}, Result = gen_tcp:send(Socket, encrypt_chunk(Params_1, 0, Cleartext)), {Params_1, 1, Result}; SendError -> {Params, Seq + 1, SendError} end; encrypt_and_send_chunk(#params{socket = Socket} = Params, Seq, Cleartext) -> Result = gen_tcp:send(Socket, encrypt_chunk(Params, Seq, Cleartext)), {Params, Seq + 1, Result}. encrypt_chunk( #params{ aead_cipher = AeadCipher, iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen}, Seq, Cleartext) -> %% ChunkLen = iolist_size(Cleartext) + TagLen, AAD = <>, IVBin = <>, {Ciphertext, CipherTag} = crypto:block_encrypt(AeadCipher, Key, IVBin, {AAD, Cleartext, TagLen}), Chunk = [Ciphertext,CipherTag], Chunk. decrypt_chunk( #params{ aead_cipher = AeadCipher, iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen} = Params, Seq, Chunk) -> %% ChunkLen = byte_size(Chunk), if ChunkLen < TagLen -> error; true -> AAD = <>, IVBin = <>, CiphertextLen = ChunkLen - TagLen, case Chunk of <> -> block_decrypt( Params, Seq, AeadCipher, Key, IVBin, {AAD, Ciphertext, CipherTag}); _ -> error end end. block_decrypt( #params{ rekey_key = #key_pair{public = PubKeyA} = KeyPair, rekey_interval = RekeyInterval} = Params, Seq, AeadCipher, Key, IV, Data) -> %% case crypto:block_decrypt(AeadCipher, Key, IV, Data) of <> -> PubKeyLen = byte_size(PubKeyA), case Rest of <> -> SharedSecret = compute_shared_secret(KeyPair, PubKeyB), KeyLen = byte_size(Key), IVLen = byte_size(IV), IVSaltLen = IVLen - 6, {Key_1, <>} = hmac_key_iv( Params#params.hmac_algorithm, SharedSecret, [Key, IV], KeyLen, IVLen), Params#params{iv = {IVSalt, IVNo}, key = Key_1}; _ -> error end; Chunk when is_binary(Chunk) -> case Seq of RekeyInterval -> %% This was one chunk too many without rekeying error; _ -> Chunk end; error -> error end. %% ------------------------------------------------------------------------- %% Queue of binaries i.e an iovec queue empty_q() -> {[], 0, []}. enq_binary(Bin, {Front, Size, Rear}) -> {Front, Size + byte_size(Bin), [Bin|Rear]}. deq_iovec(GetSize, {Front, Size, Rear}) when GetSize =< Size -> deq_iovec(GetSize, Front, Size, Rear, []). %% deq_iovec(GetSize, Front, Size, Rear) -> deq_iovec(GetSize, Front, Size, Rear, []). %% deq_iovec(GetSize, [], Size, Rear, Acc) -> deq_iovec(GetSize, lists:reverse(Rear), Size, [], Acc); deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) -> BinSize = byte_size(Bin), if BinSize < GetSize -> deq_iovec( GetSize - BinSize, Front, Size - BinSize, Rear, [Bin|Acc]); GetSize < BinSize -> {Bin1,Bin2} = erlang:split_binary(Bin, GetSize), {lists:reverse(Acc, [Bin1]), {[Bin2|Front], Size - GetSize, Rear}}; true -> {lists:reverse(Acc, [Bin]), {Front, Size - BinSize, Rear}} end. %% ------------------------------------------------------------------------- death_row() -> death_row(connection_closed). %% death_row(normal) -> death_row(connection_closed); death_row(Reason) -> receive after 5000 -> exit(Reason) end. %% ------------------------------------------------------------------------- %% Trace point trace(Term) -> Term. %% Keep an eye on this Pid (debug) -ifndef(undefined). monitor_dist_proc(Pid) -> Pid. -else. monitor_dist_proc(Pid) -> spawn( fun () -> MRef = erlang:monitor(process, Pid), receive {'DOWN', MRef, _, _, normal} -> error_logger:error_report( [dist_proc_died, {reason, normal}, {pid, Pid}]); {'DOWN', MRef, _, _, Reason} -> error_logger:info_report( [dist_proc_died, {reason, Reason}, {pid, Pid}]) end end), Pid. -endif. dbg() -> dbg:stop(), dbg:tracer(), dbg:p(all, c), dbg:tpl(?MODULE, trace, cx), dbg:tpl(erlang, dist_ctrl_get_data_notification, cx), dbg:tpl(erlang, dist_ctrl_get_data, cx), dbg:tpl(erlang, dist_ctrl_put_data, cx), ok.