diff options
Diffstat (limited to 'lib/ssl/src/ssl_tls_dist_ctrl.erl')
-rw-r--r-- | lib/ssl/src/ssl_tls_dist_ctrl.erl | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/lib/ssl/src/ssl_tls_dist_ctrl.erl b/lib/ssl/src/ssl_tls_dist_ctrl.erl new file mode 100644 index 0000000000..cbb39e71ec --- /dev/null +++ b/lib/ssl/src/ssl_tls_dist_ctrl.erl @@ -0,0 +1,380 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(ssl_tls_dist_ctrl). + +-include_lib("kernel/include/dist.hrl"). +-include_lib("kernel/include/dist_util.hrl"). +-include_lib("kernel/include/net_address.hrl"). + +-export([start/1, get_socket/1, set_supervisor/2, hs_data_common/1]). + +%%% ------------------------------------------------------------ + +%% In order to avoid issues with lingering signal binaries +%% we enable off-heap message queue data as well as fullsweep +%% after 0. The fullsweeps will be cheap since we have more +%% or less no live data. +common_spawn_opts() -> + [{message_queue_data, off_heap}, + {fullsweep_after, 0}]. + +%%% ------------------------------------------------------------ + +start(SslSocket) -> + spawn_opt( + fun () -> + setup(SslSocket) + end, + [{priority, max}] ++ common_spawn_opts()). + +get_socket(DistCtrl) -> + call(DistCtrl, get_socket). + +set_supervisor(DistCtrl, Pid) -> + call(DistCtrl, {set_supervisor, Pid}). + +hs_data_common(DistCtrl) -> + TickHandler = call(DistCtrl, tick_handler), + SslSocket = get_socket(DistCtrl), + #hs_data{ + f_send = + fun (Ctrl, Packet) when Ctrl == DistCtrl -> + call(Ctrl, {send, Packet}) + end, + f_recv = + fun (Ctrl, Length, Timeout) when Ctrl == DistCtrl -> + case call(Ctrl, {recv, Length, Timeout}) of + {ok, Bin} when is_binary(Bin) -> + {ok, binary_to_list(Bin)}; + Other -> + Other + end + end, + f_setopts_pre_nodeup = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, pre_nodeup) + end, + f_setopts_post_nodeup = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, post_nodeup) + end, + f_getll = + fun (Ctrl) when Ctrl == DistCtrl -> + call(Ctrl, getll) + end, + f_handshake_complete = + fun (Ctrl, Node, DHandle) when Ctrl == DistCtrl -> + call(Ctrl, {handshake_complete, Node, DHandle}) + end, + f_address = + fun (Ctrl, Node) when Ctrl == DistCtrl -> + case call(Ctrl, {check_address, Node}) of + {error, no_node} -> + %% No '@' or more than one '@' in node name. + ?shutdown(no_node); + Res -> + Res + end + end, + mf_setopts = + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + case setopts_filter(Opts) of + [] -> + ssl:setopts(SslSocket, Opts); + Opts1 -> + {error, {badopts,Opts1}} + end + end, + mf_getopts = + fun (Ctrl, Opts) when Ctrl == DistCtrl -> + ssl:getopts(SslSocket, Opts) + end, + mf_getstat = + fun (Ctrl) when Ctrl == DistCtrl -> + case ssl:getstat( + SslSocket, [recv_cnt, send_cnt, send_pend]) of + {ok, Stat} -> + split_stat(Stat,0,0,0); + Error -> + Error + end + end, + mf_tick = + fun (Ctrl) when Ctrl == DistCtrl -> + TickHandler ! tick + end}. + +%%% ------------------------------------------------------------ + +call(DistCtrl, Msg) -> + Ref = erlang:monitor(process, DistCtrl), + DistCtrl ! {Ref, self(), Msg}, + receive + {Ref, Res} -> + erlang:demonitor(Ref, [flush]), + Res; + {'DOWN', Ref, process, DistCtrl, Reason} -> + exit({dist_controller_exit, Reason}) + end. + +setopts_filter(Opts) -> + [Opt || {K,_} = Opt <- Opts, + K =:= active orelse K =:= deliver orelse K =:= packet]. + +split_stat([{recv_cnt, R}|Stat], _, W, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_cnt, W}|Stat], R, _, P) -> + split_stat(Stat, R, W, P); +split_stat([{send_pend, P}|Stat], R, W, _) -> + split_stat(Stat, R, W, P); +split_stat([], R, W, P) -> + {ok, R, W, P}. + +%%% ------------------------------------------------------------ +%%% Distribution controller processes +%%% ------------------------------------------------------------ + +%% +%% There will be five parties working together when the +%% connection is up: +%% - The SSL socket. Providing a TLS connection +%% to the other node. +%% - The output handler. It will dispatch all outgoing +%% traffic from the VM to the gen_tcp socket. This +%% process is registered as distribution controller +%% for this channel with the VM. +%% - The input handler. It will dispatch all incoming +%% traffic from the gen_tcp socket to the VM. This +%% process is also the socket owner and receives +%% incoming traffic using active-N. +%% - The tick handler. Dispatches asynchronous tick +%% requests to the socket. It executes on max priority +%% since it is important to get ticks through to the +%% other end. +%% - The channel supervisor (provided by dist_util). It +%% monitors traffic. Issue tick requests to the tick +%% handler when no outgoing traffic is seen and bring +%% the connection down if no incoming traffic is seen. +%% This process also executes on max priority. +%% +%% These parties are linked togheter so should one +%% of them fail, all of them are terminated and the +%% connection is taken down. +%% + + +%% +%% The tick handler process writes a tick to the +%% socket when it receives a 'tick' message from +%% the connection supervisor. +%% +%% We are not allowed to block the connection +%% superviser when writing a tick and we also want +%% the tick to go through even during a heavily +%% loaded system. gen_tcp does not have a +%% non-blocking send operation exposed in its API +%% and we don't want to run the distribution +%% controller under high priority. Therefore this +%% separate process with max prio that dispatches +%% ticks. +%% +tick_handler(SslSocket) -> + receive + tick -> + %% May block due to busy port... + sock_send(SslSocket, ""), + flush_ticks(SslSocket); + _ -> + tick_handler(SslSocket) + end. + +flush_ticks(SslSocket) -> + receive + tick -> + flush_ticks(SslSocket) + after 0 -> + tick_handler(SslSocket) + end. + +setup(SslSocket) -> + TickHandler = + spawn_opt( + fun () -> + tick_handler(SslSocket) + end, + [link, {priority, max}] ++ common_spawn_opts()), + setup_loop(SslSocket, TickHandler, undefined). + +%% +%% During the handshake phase we loop in setup(). +%% When the connection is up we spawn an input handler and +%% continue as output handler. +%% +setup_loop(SslSocket, TickHandler, Sup) -> + receive + {ssl_closed, SslSocket} -> + exit(connection_closed); + {ssl_error, SslSocket} -> + exit(connection_closed); + + {Ref, From, {set_supervisor, Pid}} -> + Res = link(Pid), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Pid); + + {Ref, From, tick_handler} -> + From ! {Ref, TickHandler}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, get_socket} -> + From ! {Ref, SslSocket}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {send, Packet}} -> + Res = ssl:send(SslSocket, Packet), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {recv, Length, Timeout}} -> + Res = ssl:recv(SslSocket, Length, Timeout), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {check_address, Node}} -> + Res = + case ssl:peername(SslSocket) of + {ok, Address} -> + case inet_tls_dist:split_node(Node) of + false -> + {error, no_node}; + Host -> + #net_address{ + address=Address, host=Host, + protocol=tls, family=inet} + end + end, + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, pre_nodeup} -> + Res = + ssl:setopts( + SslSocket, + [{active, false}, {packet, 4}, inet_tls_dist:nodelay()]), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, post_nodeup} -> + Res = + ssl:setopts( + SslSocket, + [{active, once}, {packet, 4}, inet_tls_dist:nodelay()]), + From ! {Ref, Res}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, getll} -> + From ! {Ref, {ok, self()}}, + setup_loop(SslSocket, TickHandler, Sup); + + {Ref, From, {handshake_complete, _Node, DHandle}} -> + From ! {Ref, ok}, + %% Handshake complete! Begin dispatching traffic... + %% From now on we execute on normal priority + process_flag(priority, normal), + erlang:dist_ctrl_get_data_notification(DHandle), + loop(DHandle, SslSocket) + end. + +loop(DHandle, SslSocket) -> + receive + dist_data -> + %% Outgoing data from this node... + try send_data(DHandle, SslSocket) + catch _ : _ -> death_row() + end, + loop(DHandle, SslSocket); + + {send, From, Ref, Data} -> + %% This is for testing only! + %% + %% Needed by some OTP distribution + %% test suites... + sock_send(SslSocket, Data), + From ! {Ref, ok}, + loop(DHandle, SslSocket); + + {ssl_closed, SslSocket} -> + %% Connection to remote node terminated... + exit(connection_closed); + {ssl_error, SslSocket, _Reason} -> + %% Connection to remote node terminated... + exit(connection_closed); + {ssl, SslSocket, Data} -> + %% Incoming data from remote node... + ok = ssl:setopts(SslSocket, [{active, once}]), + try erlang:dist_ctrl_put_data(DHandle, Data) + catch _ : _ -> death_row() + end, + loop(DHandle, SslSocket); + + _ -> + %% Drop garbage message... + loop(DHandle, SslSocket) + end. + +send_data(DHandle, SslSocket) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + erlang:dist_ctrl_get_data_notification(DHandle); + Data -> + sock_send(SslSocket, Data), + send_data(DHandle, SslSocket) + end. + +sock_send(SslSocket, Data) -> + try ssl:send(SslSocket, Data) of + ok -> ok; + {error, Reason} -> + death_row({send_error, Reason}) + catch + Type:Reason -> + death_row({send_error, {Type, Reason}}) + end. + +death_row() -> + death_row(connection_closed). + +death_row(normal) -> + %% We do not want to exit with normal + %% exit reason since it wont bring down + %% linked processes... + death_row(); +death_row(Reason) -> + %% When the connection is on its way down operations + %% begin to fail. We catch the failures and call + %% this function waiting for termination. We should + %% be terminated by one of our links to the other + %% involved parties that began bringing the + %% connection down. By waiting for termination we + %% avoid altering the exit reason for the connection + %% teardown. We however limit the wait to 5 seconds + %% and bring down the connection ourselves if not + %% terminated... + receive after 5000 -> exit(Reason) end. |