aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src')
-rw-r--r--lib/kernel/src/code_server.erl26
-rw-r--r--lib/kernel/src/dist_util.erl71
-rw-r--r--lib/kernel/src/erl_epmd.erl6
-rw-r--r--lib/kernel/src/inet.erl1
-rw-r--r--lib/kernel/src/inet6_tcp_dist.erl7
-rw-r--r--lib/kernel/src/inet_tcp_dist.erl29
-rw-r--r--lib/kernel/src/net_kernel.erl128
-rw-r--r--lib/kernel/src/os.erl64
8 files changed, 272 insertions, 60 deletions
diff --git a/lib/kernel/src/code_server.erl b/lib/kernel/src/code_server.erl
index 6174136507..48541ec500 100644
--- a/lib/kernel/src/code_server.erl
+++ b/lib/kernel/src/code_server.erl
@@ -135,10 +135,14 @@ split_paths([], _S, Path, Paths) ->
-spec call(term()) -> term().
call(Req) ->
+ Ref = erlang:monitor(process, ?MODULE),
?MODULE ! {code_call, self(), Req},
receive
{?MODULE, Reply} ->
- Reply
+ erlang:demonitor(Ref,[flush]),
+ Reply;
+ {'DOWN',Ref,process,_,_} ->
+ exit({'DOWN',code_server,Req})
end.
reply(Pid, Res) ->
@@ -933,14 +937,20 @@ del_ebin(Dir) ->
filename:join(del_ebin_1(filename:split(Dir))).
del_ebin_1([Parent,App,"ebin"]) ->
- Ext = archive_extension(),
- case filename:basename(Parent, Ext) of
- Parent ->
- %% Plain directory.
+ case filename:basename(Parent) of
+ [] ->
+ %% Parent is the root directory
[Parent,App];
- Archive ->
- %% Archive.
- [Archive]
+ _ ->
+ Ext = archive_extension(),
+ case filename:basename(Parent, Ext) of
+ Parent ->
+ %% Plain directory.
+ [Parent,App];
+ Archive ->
+ %% Archive.
+ [Archive]
+ end
end;
del_ebin_1([H|T]) ->
[H|del_ebin_1(T)];
diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl
index 47d0c1b861..8d2fc4d4b7 100644
--- a/lib/kernel/src/dist_util.erl
+++ b/lib/kernel/src/dist_util.erl
@@ -143,7 +143,11 @@ handshake_other_started(#hs_data{request_type=ReqType}=HSData0) ->
ChallengeB = recv_challenge_reply(HSData, ChallengeA, MyCookie),
send_challenge_ack(HSData, gen_digest(ChallengeB, HisCookie)),
?debug({dist_util, self(), accept_connection, Node}),
- connection(HSData).
+ connection(HSData);
+
+handshake_other_started(OldHsData) when element(1,OldHsData) =:= hs_data ->
+ handshake_other_started(convert_old_hsdata(OldHsData)).
+
%%
%% check if connecting node is allowed to connect
@@ -330,7 +334,20 @@ handshake_we_started(#hs_data{request_type=ReqType,
gen_digest(ChallengeA,HisCookie)),
reset_timer(NewHSData#hs_data.timer),
recv_challenge_ack(NewHSData, MyChallenge, MyCookie),
- connection(NewHSData).
+ connection(NewHSData);
+
+handshake_we_started(OldHsData) when element(1,OldHsData) =:= hs_data ->
+ handshake_we_started(convert_old_hsdata(OldHsData)).
+
+convert_old_hsdata({hs_data, KP, ON, TN, S, T, TF, A, OV, OF, OS, FS, FR,
+ FS_PRE, FS_POST, FG, FA, MFT, MFG, RT}) ->
+ #hs_data{
+ kernel_pid = KP, other_node = ON, this_node = TN, socket = S, timer = T,
+ this_flags = TF, allowed = A, other_version = OV, other_flags = OF,
+ other_started = OS, f_send = FS, f_recv = FR, f_setopts_pre_nodeup = FS_PRE,
+ f_setopts_post_nodeup = FS_POST, f_getll = FG, f_address = FA,
+ mf_tick = MFT, mf_getstat = MFG, request_type = RT}.
+
%% --------------------------------------------------------------
%% The connection has been established.
@@ -350,15 +367,15 @@ connection(#hs_data{other_node = Node,
mark_nodeup(HSData,Address),
case FPostNodeup(Socket) of
ok ->
- con_loop(HSData#hs_data.kernel_pid,
- Node,
- Socket,
- Address,
- HSData#hs_data.this_node,
- PType,
- #tick{},
- HSData#hs_data.mf_tick,
- HSData#hs_data.mf_getstat);
+ con_loop({HSData#hs_data.kernel_pid,
+ Node,
+ Socket,
+ PType,
+ HSData#hs_data.mf_tick,
+ HSData#hs_data.mf_getstat,
+ HSData#hs_data.mf_setopts,
+ HSData#hs_data.mf_getopts},
+ #tick{});
_ ->
?shutdown2(Node, connection_setup_failed)
end;
@@ -454,8 +471,8 @@ mark_nodeup(#hs_data{kernel_pid = Kernel,
?shutdown(Node)
end.
-con_loop(Kernel, Node, Socket, TcpAddress,
- MyNode, Type, Tick, MFTick, MFGetstat) ->
+con_loop({Kernel, Node, Socket, Type, MFTick, MFGetstat, MFSetOpts, MFGetOpts}=ConData,
+ Tick) ->
receive
{tcp_closed, Socket} ->
?shutdown2(Node, connection_closed);
@@ -468,15 +485,12 @@ con_loop(Kernel, Node, Socket, TcpAddress,
_ ->
ignore_it
end,
- con_loop(Kernel, Node, Socket, TcpAddress, MyNode, Type,
- Tick, MFTick, MFGetstat);
+ con_loop(ConData, Tick);
{Kernel, tick} ->
case send_tick(Socket, Tick, Type,
MFTick, MFGetstat) of
{ok, NewTick} ->
- con_loop(Kernel, Node, Socket, TcpAddress,
- MyNode, Type, NewTick, MFTick,
- MFGetstat);
+ con_loop(ConData, NewTick);
{error, not_responding} ->
error_msg("** Node ~p not responding **~n"
"** Removing (timedout) connection **~n",
@@ -489,13 +503,24 @@ con_loop(Kernel, Node, Socket, TcpAddress,
case MFGetstat(Socket) of
{ok, Read, Write, _} ->
From ! {self(), get_status, {ok, Read, Write}},
- con_loop(Kernel, Node, Socket, TcpAddress,
- MyNode,
- Type, Tick,
- MFTick, MFGetstat);
+ con_loop(ConData, Tick);
_ ->
?shutdown2(Node, get_status_failed)
- end
+ end;
+ {From, Ref, {setopts, Opts}} ->
+ Ret = case MFSetOpts of
+ undefined -> {error, enotsup};
+ _ -> MFSetOpts(Socket, Opts)
+ end,
+ From ! {Ref, Ret},
+ con_loop(ConData, Tick);
+ {From, Ref, {getopts, Opts}} ->
+ Ret = case MFGetOpts of
+ undefined -> {error, enotsup};
+ _ -> MFGetOpts(Socket, Opts)
+ end,
+ From ! {Ref, Ret},
+ con_loop(ConData, Tick)
end.
diff --git a/lib/kernel/src/erl_epmd.erl b/lib/kernel/src/erl_epmd.erl
index f8ef4a475d..7bc9e2ede3 100644
--- a/lib/kernel/src/erl_epmd.erl
+++ b/lib/kernel/src/erl_epmd.erl
@@ -103,6 +103,10 @@ names(EpmdAddr) ->
register_node(Name, PortNo) ->
register_node(Name, PortNo, inet).
+register_node(Name, PortNo, inet_tcp) ->
+ register_node(Name, PortNo, inet);
+register_node(Name, PortNo, inet6_tcp) ->
+ register_node(Name, PortNo, inet6);
register_node(Name, PortNo, Family) ->
gen_server:call(erl_epmd, {register, Name, PortNo, Family}, infinity).
@@ -403,8 +407,6 @@ select_best_version(L1, _H1, _L2, H2) when L1 > H2 ->
0;
select_best_version(_L1, H1, L2, _H2) when L2 > H1 ->
0;
-select_best_version(_L1, H1, L2, _H2) when L2 > H1 ->
- 0;
select_best_version(_L1, H1, _L2, H2) ->
erlang:min(H1, H2).
diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl
index a91a6ed517..75dd800c6b 100644
--- a/lib/kernel/src/inet.erl
+++ b/lib/kernel/src/inet.erl
@@ -75,6 +75,7 @@
-export_type([address_family/0, hostent/0, hostname/0, ip4_address/0,
ip6_address/0, ip_address/0, port_number/0,
local_address/0, socket_address/0, returned_non_ip_address/0,
+ socket_setopt/0, socket_getopt/0,
posix/0, socket/0, stat_option/0]).
%% imports
-import(lists, [append/1, duplicate/2, filter/2, foldl/3]).
diff --git a/lib/kernel/src/inet6_tcp_dist.erl b/lib/kernel/src/inet6_tcp_dist.erl
index 3aa61973af..9b6c2745d5 100644
--- a/lib/kernel/src/inet6_tcp_dist.erl
+++ b/lib/kernel/src/inet6_tcp_dist.erl
@@ -24,6 +24,7 @@
-export([listen/1, accept/1, accept_connection/5,
setup/5, close/1, select/1, is_node_name/1]).
+-export([setopts/2, getopts/2]).
%% ------------------------------------------------------------
%% Select this protocol based on node name
@@ -72,3 +73,9 @@ close(Socket) ->
is_node_name(Node) when is_atom(Node) ->
inet_tcp_dist:is_node_name(Node).
+
+setopts(S, Opts) ->
+ inet_tcp_dist:setopts(S, Opts).
+
+getopts(S, Opts) ->
+ inet_tcp_dist:getopts(S, Opts).
diff --git a/lib/kernel/src/inet_tcp_dist.erl b/lib/kernel/src/inet_tcp_dist.erl
index f91d7ef7c3..3084bd599a 100644
--- a/lib/kernel/src/inet_tcp_dist.erl
+++ b/lib/kernel/src/inet_tcp_dist.erl
@@ -24,13 +24,16 @@
-export([listen/1, accept/1, accept_connection/5,
setup/5, close/1, select/1, is_node_name/1]).
+%% Optional
+-export([setopts/2, getopts/2]).
+
%% Generalized dist API
-export([gen_listen/2, gen_accept/2, gen_accept_connection/6,
gen_setup/6, gen_select/2]).
%% internal exports
--export([accept_loop/3,do_accept/7,do_setup/7,getstat/1]).
+-export([accept_loop/3,do_accept/7,do_setup/7,getstat/1,tick/2]).
-import(error_logger,[error_msg/2]).
@@ -74,7 +77,7 @@ gen_listen(Driver, Name) ->
TcpAddress = get_tcp_address(Driver, Socket),
{_,Port} = TcpAddress#net_address.address,
ErlEpmd = net_kernel:epmd_module(),
- case ErlEpmd:register_node(Name, Port) of
+ case ErlEpmd:register_node(Name, Port, Driver) of
{ok, Creation} ->
{ok, {Socket, TcpAddress, Creation}};
Error ->
@@ -215,8 +218,10 @@ do_accept(Driver, Kernel, AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
inet:getll(S)
end,
f_address = fun(S, Node) -> get_remote_id(Driver, S, Node) end,
- mf_tick = fun(S) -> tick(Driver, S) end,
- mf_getstat = fun ?MODULE:getstat/1
+ mf_tick = fun(S) -> ?MODULE:tick(Driver, S) end,
+ mf_getstat = fun ?MODULE:getstat/1,
+ mf_setopts = fun ?MODULE:setopts/2,
+ mf_getopts = fun ?MODULE:getopts/2
},
dist_util:handshake_other_started(HSData);
{false,IP} ->
@@ -320,6 +325,7 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
{packet, 4},
nodelay()])
end,
+
f_getll = fun inet:getll/1,
f_address =
fun(_,_) ->
@@ -329,9 +335,11 @@ do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
protocol = tcp,
family = AddressFamily}
end,
- mf_tick = fun(S) -> tick(Driver, S) end,
+ mf_tick = fun(S) -> ?MODULE:tick(Driver, S) end,
mf_getstat = fun ?MODULE:getstat/1,
- request_type = Type
+ request_type = Type,
+ mf_setopts = fun ?MODULE:setopts/2,
+ mf_getopts = fun ?MODULE:getopts/2
},
dist_util:handshake_we_started(HSData);
_ ->
@@ -492,3 +500,12 @@ split_stat([], R, W, P) ->
{ok, R, W, P}.
+setopts(S, Opts) ->
+ case [Opt || {K,_}=Opt <- Opts,
+ K =:= active orelse K =:= deliver orelse K =:= packet] of
+ [] -> inet:setopts(S,Opts);
+ Opts1 -> {error, {badopts,Opts1}}
+ end.
+
+getopts(S, Opts) ->
+ inet:getopts(S, Opts).
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index ac19f4935b..0c679e7349 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -59,6 +59,8 @@
connect_node/1,
monitor_nodes/1,
monitor_nodes/2,
+ setopts/2,
+ getopts/2,
start/1,
stop/0]).
@@ -111,7 +113,7 @@
}).
-record(listen, {
- listen, %% listen pid
+ listen, %% listen socket
accept, %% accepting pid
address, %% #net_address
module %% proto module
@@ -384,7 +386,7 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) ->
connections =
ets:new(sys_dist,[named_table,
protected,
- {keypos, 2}]),
+ {keypos, #connection.node}]),
listen = Listeners,
allowed = [],
verbose = 0
@@ -554,6 +556,38 @@ handle_call({new_ticktime,_T,_TP},
#state{tick = #tick_change{time = T}} = State) ->
async_reply({reply, {ongoing_change_to, T}, State}, From);
+handle_call({setopts, new, Opts}, From, State) ->
+ Ret = setopts_new(Opts, State),
+ async_reply({reply, Ret, State}, From);
+
+handle_call({setopts, Node, Opts}, From, State) ->
+ Return =
+ case ets:lookup(sys_dist, Node) of
+ [Conn] when Conn#connection.state =:= up ->
+ case call_owner(Conn#connection.owner, {setopts, Opts}) of
+ {ok, Ret} -> Ret;
+ _ -> {error, noconnection}
+ end;
+
+ _ ->
+ {error, noconnection}
+ end,
+ async_reply({reply, Return, State}, From);
+
+handle_call({getopts, Node, Opts}, From, State) ->
+ Return =
+ case ets:lookup(sys_dist, Node) of
+ [Conn] when Conn#connection.state =:= up ->
+ case call_owner(Conn#connection.owner, {getopts, Opts}) of
+ {ok, Ret} -> Ret;
+ _ -> {error, noconnection}
+ end;
+
+ _ ->
+ {error, noconnection}
+ end,
+ async_reply({reply, Return, State}, From);
+
handle_call(_Msg, _From, State) ->
{noreply, State}.
@@ -1608,3 +1642,93 @@ async_gen_server_reply(From, Msg) ->
{'EXIT', _} ->
ok
end.
+
+call_owner(Owner, Msg) ->
+ Mref = monitor(process, Owner),
+ Owner ! {self(), Mref, Msg},
+ receive
+ {Mref, Reply} ->
+ erlang:demonitor(Mref, [flush]),
+ {ok, Reply};
+ {'DOWN', Mref, _, _, _} ->
+ error
+ end.
+
+
+-spec setopts(Node, Options) -> ok | {error, Reason} | ignored when
+ Node :: node() | new,
+ Options :: [inet:socket_setopt()],
+ Reason :: inet:posix() | noconnection.
+
+setopts(Node, Opts) when is_atom(Node), is_list(Opts) ->
+ request({setopts, Node, Opts}).
+
+setopts_new(Opts, State) ->
+ %% First try setopts on listening socket(s)
+ %% Bail out on failure.
+ %% If successful, we are pretty sure Opts are ok
+ %% and we continue with config params and pending connections.
+ case setopts_on_listen(Opts, State#state.listen) of
+ ok ->
+ setopts_new_1(Opts);
+ Fail -> Fail
+ end.
+
+setopts_on_listen(_, []) -> ok;
+setopts_on_listen(Opts, [#listen {listen = LSocket, module = Mod} | T]) ->
+ try Mod:setopts(LSocket, Opts) of
+ ok ->
+ setopts_on_listen(Opts, T);
+ Fail -> Fail
+ catch
+ error:undef -> {error, enotsup}
+ end.
+
+setopts_new_1(Opts) ->
+ ConnectOpts = case application:get_env(kernel, inet_dist_connect_options) of
+ {ok, CO} -> CO;
+ _ -> []
+ end,
+ application:set_env(kernel, inet_dist_connect_options,
+ merge_opts(Opts,ConnectOpts)),
+ ListenOpts = case application:get_env(kernel, inet_dist_listen_options) of
+ {ok, LO} -> LO;
+ _ -> []
+ end,
+ application:set_env(kernel, inet_dist_listen_options,
+ merge_opts(Opts, ListenOpts)),
+ case lists:keyfind(nodelay, 1, Opts) of
+ {nodelay, ND} when is_boolean(ND) ->
+ application:set_env(kernel, dist_nodelay, ND);
+ _ -> ignore
+ end,
+
+ %% Update any pending connections
+ PendingConns = ets:select(sys_dist, [{'_',
+ [{'=/=',{element,#connection.state,'$_'},up}],
+ ['$_']}]),
+ lists:foreach(fun(#connection{state = pending, owner = Owner}) ->
+ call_owner(Owner, {setopts, Opts});
+ (#connection{state = up_pending, pending_owner = Owner}) ->
+ call_owner(Owner, {setopts, Opts});
+ (_) -> ignore
+ end, PendingConns),
+ ok.
+
+merge_opts([], B) ->
+ B;
+merge_opts([H|T], B0) ->
+ {Key, _} = H,
+ B1 = lists:filter(fun({K,_}) -> K =/= Key end, B0),
+ merge_opts(T, [H | B1]).
+
+-spec getopts(Node, Options) ->
+ {'ok', OptionValues} | {'error', Reason} | ignored when
+ Node :: node(),
+ Options :: [inet:socket_getopt()],
+ OptionValues :: [inet:socket_setopt()],
+ Reason :: inet:posix() | noconnection.
+
+getopts(Node, Opts) when is_atom(Node), is_list(Opts) ->
+ request({getopts, Node, Opts}).
+
diff --git a/lib/kernel/src/os.erl b/lib/kernel/src/os.erl
index f0ad26b1f2..05bbf1069e 100644
--- a/lib/kernel/src/os.erl
+++ b/lib/kernel/src/os.erl
@@ -226,11 +226,13 @@ extensions() ->
Command :: atom() | io_lib:chars().
cmd(Cmd) ->
validate(Cmd),
- {SpawnCmd, SpawnOpts, SpawnInput} = mk_cmd(os:type(), Cmd),
+ {SpawnCmd, SpawnOpts, SpawnInput, Eot} = mk_cmd(os:type(), Cmd),
Port = open_port({spawn, SpawnCmd}, [binary, stderr_to_stdout,
- stream, in, eof, hide | SpawnOpts]),
+ stream, in, hide | SpawnOpts]),
+ MonRef = erlang:monitor(port, Port),
true = port_command(Port, SpawnInput),
- Bytes = get_data(Port, []),
+ Bytes = get_data(Port, MonRef, Eot, []),
+ demonitor(MonRef, [flush]),
String = unicode:characters_to_list(Bytes),
if %% Convert to unicode list if possible otherwise return bytes
is_list(String) -> String;
@@ -243,7 +245,7 @@ mk_cmd({win32,Wtype}, Cmd) ->
{false,_} -> lists:concat(["cmd /c", Cmd]);
{Cspec,_} -> lists:concat([Cspec," /c",Cmd])
end,
- {Command, [], []};
+ {Command, [], [], <<>>};
mk_cmd(OsType,Cmd) when is_atom(Cmd) ->
mk_cmd(OsType, atom_to_list(Cmd));
mk_cmd(_,Cmd) ->
@@ -252,7 +254,8 @@ mk_cmd(_,Cmd) ->
{"/bin/sh -s unix:cmd", [out],
%% We insert a new line after the command, in case the command
%% contains a comment character.
- ["(", unicode:characters_to_binary(Cmd), "\n); exit\n"]}.
+ ["(", unicode:characters_to_binary(Cmd), "\n); echo \"\^D\"\n"],
+ <<$\^D>>}.
validate(Atom) when is_atom(Atom) ->
ok;
@@ -267,21 +270,44 @@ validate1([List|Rest]) when is_list(List) ->
validate1([]) ->
ok.
-get_data(Port, Sofar) ->
+get_data(Port, MonRef, Eot, Sofar) ->
receive
{Port, {data, Bytes}} ->
- get_data(Port, [Sofar,Bytes]);
- {Port, eof} ->
- Port ! {self(), close},
- receive
- {Port, closed} ->
- true
- end,
- receive
- {'EXIT', Port, _} ->
- ok
- after 1 -> % force context switch
- ok
- end,
+ case eot(Bytes, Eot) of
+ more ->
+ get_data(Port, MonRef, Eot, [Sofar,Bytes]);
+ Last ->
+ Port ! {self(), close},
+ flush_until_closed(Port),
+ flush_exit(Port),
+ iolist_to_binary([Sofar, Last])
+ end;
+ {'DOWN', MonRef, _, _ , _} ->
+ flush_exit(Port),
iolist_to_binary(Sofar)
end.
+
+eot(_Bs, <<>>) ->
+ more;
+eot(Bs, Eot) ->
+ case binary:match(Bs, Eot) of
+ nomatch -> more;
+ {Pos, _} ->
+ binary:part(Bs,{0, Pos})
+ end.
+
+flush_until_closed(Port) ->
+ receive
+ {Port, {data, _Bytes}} ->
+ flush_until_closed(Port);
+ {Port, closed} ->
+ true
+ end.
+
+flush_exit(Port) ->
+ receive
+ {'EXIT', Port, _} ->
+ ok
+ after 1 -> % force context switch
+ ok
+ end.