aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src')
-rw-r--r--lib/kernel/src/Makefile18
-rw-r--r--lib/kernel/src/code_server.erl27
-rw-r--r--lib/kernel/src/disk_log.erl4
-rw-r--r--lib/kernel/src/dist_util.erl135
-rw-r--r--lib/kernel/src/erl_boot_server.erl37
-rw-r--r--lib/kernel/src/error_handler.erl4
-rw-r--r--lib/kernel/src/erts_debug.erl12
-rw-r--r--lib/kernel/src/file.erl65
-rw-r--r--lib/kernel/src/file_int.hrl33
-rw-r--r--lib/kernel/src/file_io_server.erl73
-rw-r--r--lib/kernel/src/file_server.erl137
-rw-r--r--lib/kernel/src/group_history.erl4
-rw-r--r--lib/kernel/src/hipe_unified_loader.erl6
-rw-r--r--lib/kernel/src/inet.erl22
-rw-r--r--lib/kernel/src/inet_hosts.erl5
-rw-r--r--lib/kernel/src/inet_int.hrl8
-rw-r--r--lib/kernel/src/inet_res.erl28
-rw-r--r--lib/kernel/src/kernel.app.src9
-rw-r--r--lib/kernel/src/kernel.appup.src6
-rw-r--r--lib/kernel/src/kernel.erl11
-rw-r--r--lib/kernel/src/kernel_refc.erl139
-rw-r--r--lib/kernel/src/net_kernel.erl352
-rw-r--r--lib/kernel/src/os.erl117
-rw-r--r--lib/kernel/src/raw_file_io.erl75
-rw-r--r--lib/kernel/src/raw_file_io_compressed.erl134
-rw-r--r--lib/kernel/src/raw_file_io_deflate.erl159
-rw-r--r--lib/kernel/src/raw_file_io_delayed.erl320
-rw-r--r--lib/kernel/src/raw_file_io_inflate.erl261
-rw-r--r--lib/kernel/src/raw_file_io_list.erl128
-rw-r--r--lib/kernel/src/raw_file_io_raw.erl25
-rw-r--r--lib/kernel/src/rpc.erl15
31 files changed, 1895 insertions, 474 deletions
diff --git a/lib/kernel/src/Makefile b/lib/kernel/src/Makefile
index 5946620f0f..0bc9f121a0 100644
--- a/lib/kernel/src/Makefile
+++ b/lib/kernel/src/Makefile
@@ -106,6 +106,7 @@ MODULES = \
inet_sctp \
kernel \
kernel_config \
+ kernel_refc \
local_udp \
local_tcp \
net \
@@ -120,6 +121,13 @@ MODULES = \
user \
user_drv \
user_sup \
+ raw_file_io \
+ raw_file_io_compressed \
+ raw_file_io_inflate \
+ raw_file_io_deflate \
+ raw_file_io_delayed \
+ raw_file_io_list \
+ raw_file_io_raw \
wrap_log_reader
HRL_FILES= ../include/file.hrl ../include/inet.hrl ../include/inet_sctp.hrl \
@@ -226,7 +234,8 @@ $(EBIN)/disk_log_server.beam: disk_log.hrl
$(EBIN)/dist_util.beam: ../include/dist_util.hrl ../include/dist.hrl
$(EBIN)/erl_boot_server.beam: inet_boot.hrl
$(EBIN)/erl_epmd.beam: inet_int.hrl erl_epmd.hrl
-$(EBIN)/file.beam: ../include/file.hrl
+$(EBIN)/file.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/file_io_server.beam: ../include/file.hrl file_int.hrl
$(EBIN)/gen_tcp.beam: inet_int.hrl
$(EBIN)/gen_udp.beam: inet_int.hrl
$(EBIN)/gen_sctp.beam: ../include/inet_sctp.hrl
@@ -254,3 +263,10 @@ $(EBIN)/net_kernel.beam: ../include/net_address.hrl
$(EBIN)/os.beam: ../include/file.hrl
$(EBIN)/ram_file.beam: ../include/file.hrl
$(EBIN)/wrap_log_reader.beam: disk_log.hrl ../include/file.hrl
+$(EBIN)/raw_file_io.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_compressed.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_inflate.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_deflate.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_delayed.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_list.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_raw.beam: ../include/file.hrl file_int.hrl
diff --git a/lib/kernel/src/code_server.erl b/lib/kernel/src/code_server.erl
index 418b0c50e1..f5a890cb95 100644
--- a/lib/kernel/src/code_server.erl
+++ b/lib/kernel/src/code_server.erl
@@ -340,8 +340,7 @@ handle_call(all_loaded, _From, S) ->
{reply,all_loaded(Db),S};
handle_call({get_object_code,Mod}, _From, St) when is_atom(Mod) ->
- Path = St#state.path,
- case mod_to_bin(Path, Mod) of
+ case get_object_code(St, Mod) of
{_,Bin,FName} -> {reply,{Mod,Bin,FName},St};
Error -> {reply,Error,St}
end;
@@ -1182,19 +1181,28 @@ load_file(Mod, From, St0) ->
end,
handle_pending_on_load(Action, Mod, From, St0).
-load_file_1(Mod, From, #state{path=Path}=St) ->
- case mod_to_bin(Path, Mod) of
+load_file_1(Mod, From, St) ->
+ case get_object_code(St, Mod) of
error ->
{reply,{error,nofile},St};
{Mod,Binary,File} ->
try_load_module_1(File, Mod, Binary, From, St)
end.
-mod_to_bin([Dir|Tail], Mod) ->
- File = filename:append(Dir, atom_to_list(Mod) ++ objfile_extension()),
+get_object_code(#state{path=Path}, Mod) when is_atom(Mod) ->
+ ModStr = atom_to_list(Mod),
+ case erl_prim_loader:is_basename(ModStr) of
+ true ->
+ mod_to_bin(Path, Mod, ModStr ++ objfile_extension());
+ false ->
+ error
+ end.
+
+mod_to_bin([Dir|Tail], Mod, ModFile) ->
+ File = filename:append(Dir, ModFile),
case erl_prim_loader:get_file(File) of
error ->
- mod_to_bin(Tail, Mod);
+ mod_to_bin(Tail, Mod, ModFile);
{ok,Bin,_} ->
case filename:pathtype(File) of
absolute ->
@@ -1203,10 +1211,9 @@ mod_to_bin([Dir|Tail], Mod) ->
{Mod,Bin,absname(File)}
end
end;
-mod_to_bin([], Mod) ->
+mod_to_bin([], Mod, ModFile) ->
%% At last, try also erl_prim_loader's own method
- File = to_list(Mod) ++ objfile_extension(),
- case erl_prim_loader:get_file(File) of
+ case erl_prim_loader:get_file(ModFile) of
error ->
error; % No more alternatives !
{ok,Bin,FName} ->
diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl
index 70cbf1c87c..99ea8dc384 100644
--- a/lib/kernel/src/disk_log.erl
+++ b/lib/kernel/src/disk_log.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1997-2017. All Rights Reserved.
+%% Copyright Ericsson AB 1997-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -266,7 +266,7 @@ inc_wrap_file(Log) ->
Size :: dlog_size(),
Reason :: no_such_log | nonode | {read_only_mode, Log}
| {blocked_log, Log}
- | {new_size_too_small, CurrentSize :: pos_integer()}
+ | {new_size_too_small, Log, CurrentSize :: pos_integer()}
| {badarg, size}
| {file_error, file:filename(), file_error()}.
change_size(Log, NewSize) ->
diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl
index 08bd5946cd..3927b64b06 100644
--- a/lib/kernel/src/dist_util.erl
+++ b/lib/kernel/src/dist_util.erl
@@ -27,6 +27,7 @@
%%-compile(export_all).
-export([handshake_we_started/1, handshake_other_started/1,
+ strict_order_flags/0,
start_timer/1, setup_timer/2,
reset_timer/1, cancel_timer/1,
shutdown/3, shutdown/4]).
@@ -116,22 +117,8 @@ dflag2str(_) ->
"UNKNOWN".
-remove_flag(Flag, Flags) ->
- case Flags band Flag of
- 0 ->
- Flags;
- _ ->
- Flags - Flag
- end.
-
-adjust_flags(ThisFlags, OtherFlags, RejectFlags) ->
- case (?DFLAG_PUBLISHED band ThisFlags) band OtherFlags of
- 0 ->
- {remove_flag(?DFLAG_PUBLISHED, ThisFlags),
- remove_flag(?DFLAG_PUBLISHED, OtherFlags)};
- _ ->
- {ThisFlags, OtherFlags band (bnot RejectFlags)}
- end.
+adjust_flags(ThisFlags, OtherFlags) ->
+ ThisFlags band OtherFlags.
publish_flag(hidden, _) ->
0;
@@ -143,50 +130,35 @@ publish_flag(_, OtherNode) ->
0
end.
--define(DFLAGS_REMOVABLE,
- (?DFLAG_DIST_HDR_ATOM_CACHE
- bor ?DFLAG_HIDDEN_ATOM_CACHE
- bor ?DFLAG_ATOM_CACHE)).
-
--define(DFLAGS_ADDABLE,
- (?DFLAGS_ALL
- band (bnot (?DFLAG_PUBLISHED
- bor ?DFLAG_HIDDEN_ATOM_CACHE
- bor ?DFLAG_ATOM_CACHE)))).
-
--define(DFLAGS_THIS_DEFAULT,
- (?DFLAG_EXPORT_PTR_TAG
- bor ?DFLAG_EXTENDED_PIDS_PORTS
- bor ?DFLAG_EXTENDED_REFERENCES
- bor ?DFLAG_DIST_MONITOR
- bor ?DFLAG_FUN_TAGS
- bor ?DFLAG_DIST_MONITOR_NAME
- bor ?DFLAG_NEW_FUN_TAGS
- bor ?DFLAG_BIT_BINARIES
- bor ?DFLAG_NEW_FLOATS
- bor ?DFLAG_UNICODE_IO
- bor ?DFLAG_DIST_HDR_ATOM_CACHE
- bor ?DFLAG_SMALL_ATOM_TAGS
- bor ?DFLAG_UTF8_ATOMS
- bor ?DFLAG_MAP_TAG
- bor ?DFLAG_BIG_CREATION
- bor ?DFLAG_SEND_SENDER)).
-
-make_this_flags(RequestType, AddFlags, RemoveFlags, OtherNode) ->
- case RemoveFlags band (bnot ?DFLAGS_REMOVABLE) of
+
+%% Sync with dist.c
+-record(erts_dflags, {
+ default, % flags erts prefers
+ mandatory, % flags erts needs
+ addable, % flags local dist implementation is allowed to add
+ rejectable, % flags local dist implementation is allowed to reject
+ strict_order % flags for features needing strict order delivery
+}).
+
+-spec strict_order_flags() -> integer().
+strict_order_flags() ->
+ EDF = erts_internal:get_dflags(),
+ EDF#erts_dflags.strict_order.
+
+make_this_flags(RequestType, AddFlags, RejectFlags, OtherNode,
+ #erts_dflags{}=EDF) ->
+ case RejectFlags band (bnot EDF#erts_dflags.rejectable) of
0 -> ok;
Rerror -> exit({"Rejecting non rejectable flags", Rerror})
end,
- case AddFlags band (bnot ?DFLAGS_ADDABLE) of
+ case AddFlags band (bnot EDF#erts_dflags.addable) of
0 -> ok;
Aerror -> exit({"Adding non addable flags", Aerror})
end,
- Flgs0 = ?DFLAGS_THIS_DEFAULT,
+ Flgs0 = EDF#erts_dflags.default,
Flgs1 = Flgs0 bor publish_flag(RequestType, OtherNode),
Flgs2 = Flgs1 bor AddFlags,
- Flgs3 = Flgs2 band (bnot (?DFLAG_HIDDEN_ATOM_CACHE
- bor ?DFLAG_ATOM_CACHE)),
- Flgs3 band (bnot RemoveFlags).
+ Flgs2 band (bnot RejectFlags).
handshake_other_started(#hs_data{request_type=ReqType,
add_flags=AddFlgs0,
@@ -196,19 +168,18 @@ handshake_other_started(#hs_data{request_type=ReqType,
RejFlgs = convert_flags(RejFlgs0),
ReqFlgs = convert_flags(ReqFlgs0),
{PreOtherFlags,Node,Version} = recv_name(HSData0),
- PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node),
- {ThisFlags, OtherFlags} = adjust_flags(PreThisFlags,
- PreOtherFlags,
- RejFlgs),
- HSData = HSData0#hs_data{this_flags=ThisFlags,
- other_flags=OtherFlags,
+ EDF = erts_internal:get_dflags(),
+ PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node, EDF),
+ ChosenFlags = adjust_flags(PreThisFlags, PreOtherFlags),
+ HSData = HSData0#hs_data{this_flags=ChosenFlags,
+ other_flags=ChosenFlags,
other_version=Version,
other_node=Node,
other_started=true,
add_flags=AddFlgs,
reject_flags=RejFlgs,
require_flags=ReqFlgs},
- check_dflags(HSData),
+ check_dflags(HSData, EDF),
is_allowed(HSData),
?debug({"MD5 connection from ~p (V~p)~n",
[Node, HSData#hs_data.other_version]}),
@@ -247,13 +218,11 @@ is_allowed(#hs_data{other_node = Node,
check_dflags(#hs_data{other_node = Node,
other_flags = OtherFlags,
other_started = OtherStarted,
- require_flags = RequiredFlags} = HSData) ->
- Mandatory = ((?DFLAG_EXTENDED_REFERENCES
- bor ?DFLAG_EXTENDED_PIDS_PORTS
- bor ?DFLAG_UTF8_ATOMS)
- bor RequiredFlags),
- Missing = check_mandatory(0, ?DFLAGS_ALL, Mandatory,
- OtherFlags, []),
+ require_flags = RequiredFlags} = HSData,
+ #erts_dflags{}=EDF) ->
+
+ Mandatory = (EDF#erts_dflags.mandatory bor RequiredFlags),
+ Missing = check_mandatory(Mandatory, OtherFlags, []),
case Missing of
[] ->
ok;
@@ -273,21 +242,20 @@ check_dflags(#hs_data{other_node = Node,
?shutdown2(Node, {check_dflags_failed, Missing})
end.
-check_mandatory(_Bit, 0, _Mandatory, _OtherFlags, Missing) ->
+check_mandatory(0, _OtherFlags, Missing) ->
Missing;
-check_mandatory(Bit, Left, Mandatory, OtherFlags, Missing) ->
- DFlag = (1 bsl Bit),
- NewLeft = Left band (bnot DFlag),
- NewMissing = case {DFlag band Mandatory,
- DFlag band OtherFlags} of
- {DFlag, 0} ->
+check_mandatory(Mandatory, OtherFlags, Missing) ->
+ Left = Mandatory band (Mandatory - 1), % clear lowest set bit
+ DFlag = Mandatory bxor Left, % only lowest set bit
+ NewMissing = case DFlag band OtherFlags of
+ 0 ->
%% Mandatory and missing...
[dflag2str(DFlag) | Missing];
_ ->
- %% Not mandatory or present...
+ %% Mandatory and present...
Missing
end,
- check_mandatory(Bit+1, NewLeft, Mandatory, OtherFlags, NewMissing).
+ check_mandatory(Left, OtherFlags, NewMissing).
%% No nodedown will be sent if we fail before this process has
@@ -409,7 +377,8 @@ handshake_we_started(#hs_data{request_type=ReqType,
AddFlgs = convert_flags(AddFlgs0),
RejFlgs = convert_flags(RejFlgs0),
ReqFlgs = convert_flags(ReqFlgs0),
- PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node),
+ EDF = erts_internal:get_dflags(),
+ PreThisFlags = make_this_flags(ReqType, AddFlgs, RejFlgs, Node, EDF),
HSData = PreHSData#hs_data{this_flags = PreThisFlags,
add_flags = AddFlgs,
reject_flags = RejFlgs,
@@ -417,13 +386,11 @@ handshake_we_started(#hs_data{request_type=ReqType,
send_name(HSData),
recv_status(HSData),
{PreOtherFlags,ChallengeA} = recv_challenge(HSData),
- {ThisFlags,OtherFlags} = adjust_flags(PreThisFlags,
- PreOtherFlags,
- RejFlgs),
- NewHSData = HSData#hs_data{this_flags = ThisFlags,
- other_flags = OtherFlags,
+ ChosenFlags = adjust_flags(PreThisFlags, PreOtherFlags),
+ NewHSData = HSData#hs_data{this_flags = ChosenFlags,
+ other_flags = ChosenFlags,
other_started = false},
- check_dflags(NewHSData),
+ check_dflags(NewHSData, EDF),
MyChallenge = gen_challenge(),
{MyCookie,HisCookie} = get_cookies(Node),
send_challenge_reply(NewHSData,MyChallenge,
@@ -538,8 +505,8 @@ do_setnode(#hs_data{other_node = Node, socket = Socket,
"no table space left for node ~w ** ~n",
[Node]),
?shutdown(Node);
- error:Other ->
- exit({Other, erlang:get_stacktrace()})
+ error:Other:Stacktrace ->
+ exit({Other, Stacktrace})
end;
_ ->
error_msg("** Distribution connection error, "
diff --git a/lib/kernel/src/erl_boot_server.erl b/lib/kernel/src/erl_boot_server.erl
index 2a38266579..4ac945ce01 100644
--- a/lib/kernel/src/erl_boot_server.erl
+++ b/lib/kernel/src/erl_boot_server.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2017. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -58,13 +58,11 @@
-define(single_addr_mask, {255, 255, 255, 255}).
--type ip4_address() :: {0..255,0..255,0..255,0..255}.
-
--spec start(Slaves) -> {'ok', Pid} | {'error', What} when
+-spec start(Slaves) -> {'ok', Pid} | {'error', Reason} when
Slaves :: [Host],
- Host :: atom(),
+ Host :: inet:ip_address() | inet:hostname(),
Pid :: pid(),
- What :: any().
+ Reason :: {'badarg', Slaves}.
start(Slaves) ->
case check_arg(Slaves) of
@@ -74,11 +72,11 @@ start(Slaves) ->
{error, {badarg, Slaves}}
end.
--spec start_link(Slaves) -> {'ok', Pid} | {'error', What} when
+-spec start_link(Slaves) -> {'ok', Pid} | {'error', Reason} when
Slaves :: [Host],
- Host :: atom(),
+ Host :: inet:ip_address() | inet:hostname(),
Pid :: pid(),
- What :: any().
+ Reason :: {'badarg', Slaves}.
start_link(Slaves) ->
case check_arg(Slaves) of
@@ -104,10 +102,10 @@ check_arg([], Result) ->
check_arg(_, _Result) ->
error.
--spec add_slave(Slave) -> 'ok' | {'error', What} when
+-spec add_slave(Slave) -> 'ok' | {'error', Reason} when
Slave :: Host,
- Host :: atom(),
- What :: any().
+ Host :: inet:ip_address() | inet:hostname(),
+ Reason :: {'badarg', Slave}.
add_slave(Slave) ->
case inet:getaddr(Slave, inet) of
@@ -117,10 +115,10 @@ add_slave(Slave) ->
{error, {badarg, Slave}}
end.
--spec delete_slave(Slave) -> 'ok' | {'error', What} when
+-spec delete_slave(Slave) -> 'ok' | {'error', Reason} when
Slave :: Host,
- Host :: atom(),
- What :: any().
+ Host :: inet:ip_address() | inet:hostname(),
+ Reason :: {'badarg', Slave}.
delete_slave(Slave) ->
case inet:getaddr(Slave, inet) of
@@ -130,7 +128,7 @@ delete_slave(Slave) ->
{error, {badarg, Slave}}
end.
--spec add_subnet(Mask :: ip4_address(), Addr :: ip4_address()) ->
+-spec add_subnet(Netmask :: inet:ip_address(), Addr :: inet:ip_address()) ->
'ok' | {'error', any()}.
add_subnet(Mask, Addr) when is_tuple(Mask), is_tuple(Addr) ->
@@ -141,14 +139,15 @@ add_subnet(Mask, Addr) when is_tuple(Mask), is_tuple(Addr) ->
{error, empty_subnet}
end.
--spec delete_subnet(Mask :: ip4_address(), Addr :: ip4_address()) -> 'ok'.
+-spec delete_subnet(Netmask :: inet:ip_address(),
+ Addr :: inet:ip_address()) -> 'ok'.
delete_subnet(Mask, Addr) when is_tuple(Mask), is_tuple(Addr) ->
gen_server:call(boot_server, {delete, {Mask, Addr}}).
-spec which_slaves() -> Slaves when
- Slaves :: [Host],
- Host :: atom().
+ Slaves :: [Slave],
+ Slave :: {Netmask :: inet:ip_address(), Address :: inet:ip_address()}.
which_slaves() ->
gen_server:call(boot_server, which).
diff --git a/lib/kernel/src/error_handler.erl b/lib/kernel/src/error_handler.erl
index 59ca8e690d..a9582c6225 100644
--- a/lib/kernel/src/error_handler.erl
+++ b/lib/kernel/src/error_handler.erl
@@ -106,8 +106,8 @@ crash(M, F, A) ->
crash(Tuple) ->
try erlang:error(undef)
catch
- error:undef ->
- Stk = [Tuple|tl(erlang:get_stacktrace())],
+ error:undef:Stacktrace ->
+ Stk = [Tuple|tl(Stacktrace)],
erlang:raise(error, undef, Stk)
end.
diff --git a/lib/kernel/src/erts_debug.erl b/lib/kernel/src/erts_debug.erl
index 2887014c1c..3456c8511e 100644
--- a/lib/kernel/src/erts_debug.erl
+++ b/lib/kernel/src/erts_debug.erl
@@ -21,7 +21,7 @@
%% Low-level debugging support. EXPERIMENTAL!
--export([size/1,df/1,df/2,df/3,ic/1]).
+-export([size/1,df/1,df/2,df/3,dis_to_file/2,ic/1]).
%% This module contains the following *experimental* BIFs:
%% disassemble/1
@@ -378,6 +378,16 @@ df(Mod, Func, Arity) when is_atom(Mod), is_atom(Func) ->
catch _:_ -> {undef,Mod}
end.
+-spec dis_to_file(module(), file:filename()) -> df_ret().
+
+dis_to_file(Mod, Name) when is_atom(Mod) ->
+ try Mod:module_info(functions) of
+ Fs0 when is_list(Fs0) ->
+ Fs = [{Mod,Func,Arity} || {Func,Arity} <- Fs0],
+ dff(Name, Fs)
+ catch _:_ -> {undef,Mod}
+ end.
+
dff(Name, Fs) ->
case file:open(Name, [write,raw,delayed_write]) of
{ok,F} ->
diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl
index 933f2d5f65..c2df1ee288 100644
--- a/lib/kernel/src/file.erl
+++ b/lib/kernel/src/file.erl
@@ -72,7 +72,7 @@
io_device/0, name/0, name_all/0, posix/0]).
%%% Includes and defines
--include("file.hrl").
+-include("file_int.hrl").
-define(FILE_IO_SERVER_TABLE, file_io_servers).
@@ -454,41 +454,23 @@ raw_write_file_info(Name, #file_info{} = Info) ->
Reason :: posix() | badarg | system_limit.
open(Item, ModeList) when is_list(ModeList) ->
- case lists:member(raw, ModeList) of
- %% Raw file, use ?PRIM_FILE to handle this file
- true ->
+ case {lists:member(raw, ModeList), lists:member(ram, ModeList)} of
+ {false, false} ->
+ %% File server file
Args = [file_name(Item) | ModeList],
case check_args(Args) of
ok ->
[FileName | _] = Args,
- %% We rely on the returned Handle (in {ok, Handle})
- %% being a pid() or a #file_descriptor{}
- ?PRIM_FILE:open(FileName, ModeList);
+ call(open, [FileName, ModeList]);
Error ->
Error
- end;
- false ->
- case lists:member(ram, ModeList) of
- %% RAM file, use ?RAM_FILE to handle this file
- true ->
- case check_args(ModeList) of
- ok ->
- ?RAM_FILE:open(Item, ModeList);
- Error ->
- Error
- end;
- %% File server file
- false ->
- Args = [file_name(Item) | ModeList],
- case check_args(Args) of
- ok ->
- [FileName | _] = Args,
- call(open, [FileName, ModeList]);
- Error ->
- Error
- end
- end
+ end;
+ {true, _Either} ->
+ raw_file_io:open(file_name(Item), ModeList);
+ {false, true} ->
+ ram_file:open(Item, ModeList)
end;
+
%% Old obsolete mode specification in atom or 2-tuple format
open(Item, Mode) ->
open(Item, mode_list(Mode)).
@@ -1254,15 +1236,18 @@ sendfile(File, _Sock, _Offet, _Bytes, _Opts) when is_pid(File) ->
sendfile(File, Sock, Offset, Bytes, []) ->
sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, [], [], []);
sendfile(File, Sock, Offset, Bytes, Opts) ->
- ChunkSize0 = proplists:get_value(chunk_size, Opts, ?MAX_CHUNK_SIZE),
- ChunkSize = if ChunkSize0 > ?MAX_CHUNK_SIZE ->
- ?MAX_CHUNK_SIZE;
- true -> ChunkSize0
- end,
- %% Support for headers, trailers and options has been removed because the
- %% Darwin and BSD API for using it does not play nice with
- %% non-blocking sockets. See unix_efile.c for more info.
- sendfile(File, Sock, Offset, Bytes, ChunkSize, [], [], Opts).
+ try proplists:get_value(chunk_size, Opts, ?MAX_CHUNK_SIZE) of
+ ChunkSize0 when is_integer(ChunkSize0) ->
+ ChunkSize = erlang:min(ChunkSize0, ?MAX_CHUNK_SIZE),
+ %% Support for headers, trailers and options has been removed
+ %% because the Darwin and BSD API for using it does not play nice
+ %% with non-blocking sockets. See unix_efile.c for more info.
+ sendfile(File, Sock, Offset, Bytes, ChunkSize, [], [], Opts);
+ _Other ->
+ {error, badarg}
+ catch
+ error:_ -> {error, badarg}
+ end.
%% sendfile/2
-spec sendfile(Filename, Socket) ->
@@ -1397,8 +1382,8 @@ eval_stream2({ok,Form,EndLine}, Fd, H, Last, E, Bs0) ->
try erl_eval:exprs(Form, Bs0) of
{value,V,Bs} ->
eval_stream(Fd, H, EndLine, {V}, E, Bs)
- catch Class:Reason ->
- Error = {EndLine,?MODULE,{Class,Reason,erlang:get_stacktrace()}},
+ catch Class:Reason:StackTrace ->
+ Error = {EndLine,?MODULE,{Class,Reason,StackTrace}},
eval_stream(Fd, H, EndLine, Last, [Error|E], Bs0)
end;
eval_stream2({error,What,EndLine}, Fd, H, Last, E, Bs) ->
diff --git a/lib/kernel/src/file_int.hrl b/lib/kernel/src/file_int.hrl
new file mode 100644
index 0000000000..bafc330c04
--- /dev/null
+++ b/lib/kernel/src/file_int.hrl
@@ -0,0 +1,33 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Internal definitions for the 'file' module and friends.
+%%
+
+-ifndef(FILE_INTERNAL_HRL_).
+-define(FILE_INTERNAL_HRL_, 1).
+
+-include("file.hrl").
+
+-define(CALL_FD(Fd, Method, Args),
+ apply(Fd#file_descriptor.module, Method, [Fd | Args])).
+
+-endif.
diff --git a/lib/kernel/src/file_io_server.erl b/lib/kernel/src/file_io_server.erl
index deb7b315b1..34d5497a4a 100644
--- a/lib/kernel/src/file_io_server.erl
+++ b/lib/kernel/src/file_io_server.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2000-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2000-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -28,7 +28,8 @@
-record(state, {handle,owner,mref,buf,read_mode,unic}).
--define(PRIM_FILE, prim_file).
+-include("file_int.hrl").
+
-define(READ_SIZE_LIST, 128).
-define(READ_SIZE_BINARY, (8*1024)).
@@ -67,8 +68,9 @@ do_start(Spawn, Owner, FileName, ModeList) ->
erlang:dt_restore_tag(Utag),
%% process_flag(trap_exit, true),
case parse_options(ModeList) of
- {ReadMode, UnicodeMode, Opts} ->
- case ?PRIM_FILE:open(FileName, Opts) of
+ {ReadMode, UnicodeMode, Opts0} ->
+ Opts = maybe_add_read_ahead(ReadMode, Opts0),
+ case raw_file_io:open(FileName, [raw | Opts]) of
{error, Reason} = Error ->
Self ! {Ref, Error},
exit(Reason);
@@ -157,6 +159,24 @@ valid_enc({utf32,little}) ->
valid_enc(_Other) ->
{error,badarg}.
+%% Add a small read_ahead buffer if the file is opened for reading
+%% only in list mode and no read_ahead is already given.
+maybe_add_read_ahead(binary, Opts) ->
+ Opts;
+maybe_add_read_ahead(list, Opts) ->
+ P = fun(read_ahead) -> true;
+ ({read_ahead,_}) -> true;
+ (append) -> true;
+ (exclusive) -> true;
+ (write) -> true;
+ (_) -> false
+ end,
+ case lists:any(P, Opts) of
+ false ->
+ [{read_ahead, 4096}|Opts];
+ true ->
+ Opts
+ end.
server_loop(#state{mref = Mref} = State) ->
receive
@@ -205,7 +225,7 @@ io_reply(From, ReplyAs, Reply) ->
file_request({advise,Offset,Length,Advise},
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:advise(Handle, Offset, Length, Advise) of
+ case ?CALL_FD(Handle, advise, [Offset, Length, Advise]) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State};
Reply ->
@@ -213,7 +233,7 @@ file_request({advise,Offset,Length,Advise},
end;
file_request({allocate, Offset, Length},
#state{handle = Handle} = State) ->
- Reply = ?PRIM_FILE:allocate(Handle, Offset, Length),
+ Reply = ?CALL_FD(Handle, allocate, [Offset, Length]),
{reply, Reply, State};
file_request({pread,At,Sz}, State)
when At =:= cur;
@@ -256,7 +276,7 @@ file_request({pwrite,At,Data},
end;
file_request(datasync,
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:datasync(Handle) of
+ case ?CALL_FD(Handle, datasync, []) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State};
Reply ->
@@ -264,7 +284,7 @@ file_request(datasync,
end;
file_request(sync,
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:sync(Handle) of
+ case ?CALL_FD(Handle, sync, []) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State};
Reply ->
@@ -272,7 +292,7 @@ file_request(sync,
end;
file_request(close,
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:close(Handle) of
+ case ?CALL_FD(Handle, close, []) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State#state{buf= <<>>}};
Reply ->
@@ -288,7 +308,7 @@ file_request({position,At},
end;
file_request(truncate,
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:truncate(Handle) of
+ case ?CALL_FD(Handle, truncate, []) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State#state{buf= <<>>}};
Reply ->
@@ -398,7 +418,7 @@ io_request_loop([Request|Tail],
%%
put_chars(Chars, latin1, #state{handle=Handle, unic=latin1}=State) ->
NewState = State#state{buf = <<>>},
- case ?PRIM_FILE:write(Handle, Chars) of
+ case ?CALL_FD(Handle, write, [Chars]) of
{error,Reason}=Reply ->
{stop,Reason,Reply,NewState};
Reply ->
@@ -408,7 +428,7 @@ put_chars(Chars, InEncoding, #state{handle=Handle, unic=OutEncoding}=State) ->
NewState = State#state{buf = <<>>},
case unicode:characters_to_binary(Chars,InEncoding,OutEncoding) of
Bin when is_binary(Bin) ->
- case ?PRIM_FILE:write(Handle, Bin) of
+ case ?CALL_FD(Handle, write, [Bin]) of
{error,Reason}=Reply ->
{stop,Reason,Reply,NewState};
Reply ->
@@ -422,7 +442,7 @@ put_chars(Chars, InEncoding, #state{handle=Handle, unic=OutEncoding}=State) ->
get_line(S, {<<>>, Cont}, OutEnc,
#state{handle=Handle, read_mode=Mode, unic=InEnc}=State) ->
- case ?PRIM_FILE:read(Handle, read_size(Mode)) of
+ case ?CALL_FD(Handle, read, [read_size(Mode)]) of
{ok,Bin} ->
get_line(S, convert_enc([Cont, Bin], InEnc, OutEnc), OutEnc, State);
eof ->
@@ -472,7 +492,7 @@ get_chars(N, OutEnc,#state{handle=Handle,buf=Buf,read_mode=ReadMode,unic=latin1}
BufSize = byte_size(Buf),
NeedSize = N-BufSize,
Size = erlang:max(NeedSize, ?READ_SIZE_BINARY),
- case ?PRIM_FILE:read(Handle, Size) of
+ case ?CALL_FD(Handle, read, [Size]) of
{ok, B} ->
if BufSize+byte_size(B) < N ->
std_reply(cat(Buf, B, ReadMode,latin1,OutEnc), State);
@@ -504,7 +524,7 @@ get_chars(N, OutEnc,#state{handle=Handle,buf=Buf,read_mode=ReadMode,unic=InEncod
%% Need more, Try to read 4*needed in bytes...
NeedSize = (N - BufCount) * 4,
Size = erlang:max(NeedSize, ?READ_SIZE_BINARY),
- case ?PRIM_FILE:read(Handle, Size) of
+ case ?CALL_FD(Handle, read, [Size]) of
{ok, B} ->
NewBuf = list_to_binary([Buf,B]),
{NewCount,NewSplit} = count_and_find(NewBuf,N,InEncoding),
@@ -544,7 +564,7 @@ get_chars(Mod, Func, XtraArg, OutEnc, #state{buf=Buf}=State) ->
get_chars_empty(Mod, Func, XtraArg, S, latin1,
#state{handle=Handle,read_mode=ReadMode, unic=latin1}=State) ->
- case ?PRIM_FILE:read(Handle, read_size(ReadMode)) of
+ case ?CALL_FD(Handle, read, [read_size(ReadMode)]) of
{ok,Bin} ->
get_chars_apply(Mod, Func, XtraArg, S, latin1, State, Bin);
eof ->
@@ -554,7 +574,7 @@ get_chars_empty(Mod, Func, XtraArg, S, latin1,
end;
get_chars_empty(Mod, Func, XtraArg, S, OutEnc,
#state{handle=Handle,read_mode=ReadMode}=State) ->
- case ?PRIM_FILE:read(Handle, read_size(ReadMode)) of
+ case ?CALL_FD(Handle, read, [read_size(ReadMode)]) of
{ok,Bin} ->
get_chars_apply(Mod, Func, XtraArg, S, OutEnc, State, Bin);
eof ->
@@ -564,7 +584,7 @@ get_chars_empty(Mod, Func, XtraArg, S, OutEnc,
end.
get_chars_notempty(Mod, Func, XtraArg, S, OutEnc,
#state{handle=Handle,read_mode=ReadMode,buf = B}=State) ->
- case ?PRIM_FILE:read(Handle, read_size(ReadMode)) of
+ case ?CALL_FD(Handle, read, [read_size(ReadMode)]) of
{ok,Bin} ->
get_chars_apply(Mod, Func, XtraArg, S, OutEnc, State, list_to_binary([B,Bin]));
eof ->
@@ -918,13 +938,10 @@ cbv({utf32,little},_) ->
%% Compensates ?PRIM_FILE:position/2 for the number of bytes
%% we have buffered
position(Handle, At, Buf) ->
- ?PRIM_FILE:position(
- Handle,
- case At of
- cur ->
- {cur, -byte_size(Buf)};
- {cur, Offs} ->
- {cur, Offs-byte_size(Buf)};
- _ ->
- At
- end).
+ SeekTo =
+ case At of
+ {cur, Offs} -> {cur, Offs-byte_size(Buf)};
+ cur -> {cur, -byte_size(Buf)};
+ _ -> At
+ end,
+ ?CALL_FD(Handle, position, [SeekTo]).
diff --git a/lib/kernel/src/file_server.erl b/lib/kernel/src/file_server.erl
index 6e8f64d932..ecc1ffbdd6 100644
--- a/lib/kernel/src/file_server.erl
+++ b/lib/kernel/src/file_server.erl
@@ -63,7 +63,7 @@ stop() ->
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
--type state() :: port(). % Internal type
+-type state() :: term(). % Internal type
%%----------------------------------------------------------------------
%% Func: init/1
@@ -77,14 +77,8 @@ stop() ->
init([]) ->
process_flag(trap_exit, true),
- case ?PRIM_FILE:start() of
- {ok, Handle} ->
- ?FILE_IO_SERVER_TABLE =
- ets:new(?FILE_IO_SERVER_TABLE, [named_table]),
- {ok, Handle};
- {error, Reason} ->
- {stop, Reason}
- end.
+ ?FILE_IO_SERVER_TABLE = ets:new(?FILE_IO_SERVER_TABLE, [named_table]),
+ {ok, undefined}.
%%----------------------------------------------------------------------
%% Func: handle_call/3
@@ -101,7 +95,7 @@ init([]) ->
{'reply', 'eof' | 'ok' | {'error', term()} | {'ok', term()}, state()} |
{'stop', 'normal', 'stopped', state()}.
-handle_call({open, Name, ModeList}, {Pid, _Tag} = _From, Handle)
+handle_call({open, Name, ModeList}, {Pid, _Tag} = _From, State)
when is_list(ModeList) ->
Child = ?FILE_IO_SERVER:start_link(Pid, Name, ModeList),
case Child of
@@ -110,78 +104,78 @@ handle_call({open, Name, ModeList}, {Pid, _Tag} = _From, Handle)
_ ->
ok
end,
- {reply, Child, Handle};
+ {reply, Child, State};
-handle_call({open, _Name, _Mode}, _From, Handle) ->
- {reply, {error, einval}, Handle};
+handle_call({open, _Name, _Mode}, _From, State) ->
+ {reply, {error, einval}, State};
-handle_call({read_file, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_file(Name), Handle};
+handle_call({read_file, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_file(Name), State};
-handle_call({write_file, Name, Bin}, _From, Handle) ->
- {reply, ?PRIM_FILE:write_file(Name, Bin), Handle};
+handle_call({write_file, Name, Bin}, _From, State) ->
+ {reply, ?PRIM_FILE:write_file(Name, Bin), State};
-handle_call({set_cwd, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:set_cwd(Handle, Name), Handle};
+handle_call({set_cwd, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:set_cwd(Name), State};
-handle_call({delete, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:delete(Handle, Name), Handle};
+handle_call({delete, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:delete(Name), State};
-handle_call({rename, Fr, To}, _From, Handle) ->
- {reply, ?PRIM_FILE:rename(Handle, Fr, To), Handle};
+handle_call({rename, Fr, To}, _From, State) ->
+ {reply, ?PRIM_FILE:rename(Fr, To), State};
-handle_call({make_dir, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:make_dir(Handle, Name), Handle};
+handle_call({make_dir, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:make_dir(Name), State};
-handle_call({del_dir, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:del_dir(Handle, Name), Handle};
+handle_call({del_dir, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:del_dir(Name), State};
-handle_call({list_dir, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:list_dir(Handle, Name), Handle};
-handle_call({list_dir_all, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:list_dir_all(Handle, Name), Handle};
+handle_call({list_dir, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:list_dir(Name), State};
+handle_call({list_dir_all, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:list_dir_all(Name), State};
-handle_call(get_cwd, _From, Handle) ->
- {reply, ?PRIM_FILE:get_cwd(Handle), Handle};
-handle_call({get_cwd}, _From, Handle) ->
- {reply, ?PRIM_FILE:get_cwd(Handle), Handle};
-handle_call({get_cwd, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:get_cwd(Handle, Name), Handle};
+handle_call(get_cwd, _From, State) ->
+ {reply, ?PRIM_FILE:get_cwd(), State};
+handle_call({get_cwd}, _From, State) ->
+ {reply, ?PRIM_FILE:get_cwd(), State};
+handle_call({get_cwd, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:get_cwd(Name), State};
-handle_call({read_file_info, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_file_info(Handle, Name), Handle};
+handle_call({read_file_info, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_file_info(Name), State};
-handle_call({read_file_info, Name, Opts}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_file_info(Handle, Name, Opts), Handle};
+handle_call({read_file_info, Name, Opts}, _From, State) ->
+ {reply, ?PRIM_FILE:read_file_info(Name, Opts), State};
-handle_call({altname, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:altname(Handle, Name), Handle};
+handle_call({altname, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:altname(Name), State};
-handle_call({write_file_info, Name, Info}, _From, Handle) ->
- {reply, ?PRIM_FILE:write_file_info(Handle, Name, Info), Handle};
+handle_call({write_file_info, Name, Info}, _From, State) ->
+ {reply, ?PRIM_FILE:write_file_info(Name, Info), State};
-handle_call({write_file_info, Name, Info, Opts}, _From, Handle) ->
- {reply, ?PRIM_FILE:write_file_info(Handle, Name, Info, Opts), Handle};
+handle_call({write_file_info, Name, Info, Opts}, _From, State) ->
+ {reply, ?PRIM_FILE:write_file_info(Name, Info, Opts), State};
-handle_call({read_link_info, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_link_info(Handle, Name), Handle};
+handle_call({read_link_info, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_link_info(Name), State};
-handle_call({read_link_info, Name, Opts}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_link_info(Handle, Name, Opts), Handle};
+handle_call({read_link_info, Name, Opts}, _From, State) ->
+ {reply, ?PRIM_FILE:read_link_info(Name, Opts), State};
-handle_call({read_link, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_link(Handle, Name), Handle};
-handle_call({read_link_all, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_link_all(Handle, Name), Handle};
+handle_call({read_link, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_link(Name), State};
+handle_call({read_link_all, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_link_all(Name), State};
-handle_call({make_link, Old, New}, _From, Handle) ->
- {reply, ?PRIM_FILE:make_link(Handle, Old, New), Handle};
+handle_call({make_link, Old, New}, _From, State) ->
+ {reply, ?PRIM_FILE:make_link(Old, New), State};
-handle_call({make_symlink, Old, New}, _From, Handle) ->
- {reply, ?PRIM_FILE:make_symlink(Handle, Old, New), Handle};
+handle_call({make_symlink, Old, New}, _From, State) ->
+ {reply, ?PRIM_FILE:make_symlink(Old, New), State};
handle_call({copy, SourceName, SourceOpts, DestName, DestOpts, Length},
- _From, Handle) ->
+ _From, State) ->
Reply =
case ?PRIM_FILE:open(SourceName, [read, binary | SourceOpts]) of
{ok, Source} ->
@@ -201,14 +195,14 @@ handle_call({copy, SourceName, SourceOpts, DestName, DestOpts, Length},
{error, _} = Error ->
Error
end,
- {reply, Reply, Handle};
+ {reply, Reply, State};
-handle_call(stop, _From, Handle) ->
- {stop, normal, stopped, Handle};
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
-handle_call(Request, From, Handle) ->
+handle_call(Request, From, State) ->
error_logger:error_msg("handle_call(~tp, ~tp, _)", [Request, From]),
- {noreply, Handle}.
+ {noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
@@ -233,14 +227,9 @@ handle_cast(Msg, State) ->
-spec handle_info(term(), state()) ->
{'noreply', state()} | {'stop', 'normal', state()}.
-handle_info({'EXIT', Pid, _Reason}, Handle) when is_pid(Pid) ->
+handle_info({'EXIT', Pid, _Reason}, State) when is_pid(Pid) ->
ets:delete(?FILE_IO_SERVER_TABLE, Pid),
- {noreply, Handle};
-
-handle_info({'EXIT', Handle, _Reason}, Handle) ->
- error_logger:error_msg("Port controlling ~w terminated in ~w",
- [?FILE_SERVER, ?MODULE]),
- {stop, normal, Handle};
+ {noreply, State};
handle_info(Info, State) ->
error_logger:error_msg("handle_Info(~tp, _)", [Info]),
@@ -254,8 +243,8 @@ handle_info(Info, State) ->
-spec terminate(term(), state()) -> 'ok'.
-terminate(_Reason, Handle) ->
- ?PRIM_FILE:stop(Handle).
+terminate(_Reason, _State) ->
+ ok.
%%----------------------------------------------------------------------
%% Func: code_change/3
diff --git a/lib/kernel/src/group_history.erl b/lib/kernel/src/group_history.erl
index 91f3663cc5..9745848992 100644
--- a/lib/kernel/src/group_history.erl
+++ b/lib/kernel/src/group_history.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2017. All Rights Reserved.
+%% Copyright Ericsson AB 2017-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -260,7 +260,7 @@ resize_log(Name, _OldSize, NewSize) ->
ok ->
show('$#erlang-history-resize-result',
"ok~n", []);
- {error, {new_size_too_small, _}} ->
+ {error, {new_size_too_small, _, _}} ->
show('$#erlang-history-resize-result',
"failed (new size is too small)~n", []),
disable_history();
diff --git a/lib/kernel/src/hipe_unified_loader.erl b/lib/kernel/src/hipe_unified_loader.erl
index f4c7c277ed..fd06f0f7d8 100644
--- a/lib/kernel/src/hipe_unified_loader.erl
+++ b/lib/kernel/src/hipe_unified_loader.erl
@@ -236,9 +236,10 @@ load_common(Mod, Bin, Beam, Architecture) ->
lists:foreach(fun({FE, DestAddress}) ->
hipe_bifs:set_native_address_in_fe(FE, DestAddress)
end, erase(closures_to_patch)),
- ok = hipe_bifs:commit_patch_load(LoaderState),
set_beam_call_traps(FunDefs),
- ok;
+ export_funs(FunDefs),
+ ok = hipe_bifs:commit_patch_load(LoaderState),
+ ok;
BeamBinary when is_binary(BeamBinary) ->
%% Find all closures in the code.
[] = erase(closures_to_patch), %Clean up, assertion.
@@ -274,6 +275,7 @@ needs_trampolines(Architecture) ->
arm -> true;
powerpc -> true;
ppc64 -> true;
+ amd64 -> true;
_ -> false
end.
diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl
index dc20c21c77..4bad523dff 100644
--- a/lib/kernel/src/inet.erl
+++ b/lib/kernel/src/inet.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1997-2017. All Rights Reserved.
+%% Copyright Ericsson AB 1997-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -34,7 +34,8 @@
ip/1, stats/0, options/0,
pushf/3, popf/1, close/1, gethostname/0, gethostname/1,
parse_ipv4_address/1, parse_ipv6_address/1, parse_ipv4strict_address/1,
- parse_ipv6strict_address/1, parse_address/1, parse_strict_address/1, ntoa/1]).
+ parse_ipv6strict_address/1, parse_address/1, parse_strict_address/1,
+ ntoa/1, ipv4_mapped_ipv6_address/1]).
-export([connect_options/2, listen_options/2, udp_options/2, sctp_options/2]).
-export([udp_module/1, tcp_module/1, tcp_module/2, sctp_module/1]).
@@ -72,7 +73,7 @@
%% timer interface
-export([start_timer/1, timeout/1, timeout/2, stop_timer/1]).
--export_type([address_family/0, hostent/0, hostname/0, ip4_address/0,
+-export_type([address_family/0, socket_protocol/0, hostent/0, hostname/0, ip4_address/0,
ip6_address/0, ip_address/0, port_number/0,
local_address/0, socket_address/0, returned_non_ip_address/0,
socket_setopt/0, socket_getopt/0,
@@ -675,6 +676,14 @@ parse_address(Addr) ->
parse_strict_address(Addr) ->
inet_parse:strict_address(Addr).
+-spec ipv4_mapped_ipv6_address(ip_address()) -> ip_address().
+ipv4_mapped_ipv6_address({D1,D2,D3,D4})
+ when (D1 bor D2 bor D3 bor D4) < 256 ->
+ {0,0,0,0,0,16#ffff,(D1 bsl 8) bor D2,(D3 bsl 8) bor D4};
+ipv4_mapped_ipv6_address({D1,D2,D3,D4,D5,D6,D7,D8})
+ when (D1 bor D2 bor D3 bor D4 bor D5 bor D6 bor D7 bor D8) < 65536 ->
+ {D7 bsr 8,D7 band 255,D8 bsr 8,D8 band 255}.
+
%% Return a list of available options
options() ->
[
@@ -1244,9 +1253,7 @@ gethostbyname_string(Name, Type)
inet ->
inet_parse:ipv4_address(Name);
inet6 ->
- %% XXX should we really translate IPv4 addresses here
- %% even if we do not know if this host can do IPv6?
- inet_parse:ipv6_address(Name)
+ inet_parse:ipv6strict_address(Name)
end of
{ok,IP} ->
{ok,make_hostent(Name, [IP], [], Type)};
@@ -1452,11 +1459,14 @@ fdopen(Fd, Addr, Port, Opts, Protocol, Family, Type, Module) ->
%% socket stat
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+-spec i() -> ok.
i() -> i(tcp), i(udp), i(sctp).
+-spec i(socket_protocol()) -> ok.
i(Proto) -> i(Proto, [port, module, recv, sent, owner,
local_address, foreign_address, state, type]).
+-spec i(socket_protocol(), [atom()]) -> ok.
i(tcp, Fs) ->
ii(tcp_sockets(), Fs, tcp);
i(udp, Fs) ->
diff --git a/lib/kernel/src/inet_hosts.erl b/lib/kernel/src/inet_hosts.erl
index 0bdf00ac30..fc653bf0d3 100644
--- a/lib/kernel/src/inet_hosts.erl
+++ b/lib/kernel/src/inet_hosts.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1997-2016. All Rights Reserved.
+%% Copyright Ericsson AB 1997-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -72,9 +72,6 @@ gethostbyname(Name, Type, Byname, Byaddr) ->
gethostbyaddr({A,B,C,D}=IP) when ?ip(A,B,C,D) ->
gethostbyaddr(IP, inet);
-%% ipv4 only ipv6 address
-gethostbyaddr({0,0,0,0,0,16#ffff=F,G,H}) when ?ip6(0,0,0,0,0,F,G,H) ->
- gethostbyaddr({G bsr 8, G band 255, H bsr 8, H band 255});
gethostbyaddr({A,B,C,D,E,F,G,H}=IP) when ?ip6(A,B,C,D,E,F,G,H) ->
gethostbyaddr(IP, inet6);
gethostbyaddr(Addr) when is_list(Addr) ->
diff --git a/lib/kernel/src/inet_int.hrl b/lib/kernel/src/inet_int.hrl
index bc5b67f7bf..357e27826c 100644
--- a/lib/kernel/src/inet_int.hrl
+++ b/lib/kernel/src/inet_int.hrl
@@ -100,6 +100,8 @@
-define(TCP_REQ_RECV, 42).
-define(TCP_REQ_UNRECV, 43).
-define(TCP_REQ_SHUTDOWN, 44).
+-define(TCP_REQ_SENDFILE, 45).
+
%% UDP and SCTP requests
-define(PACKET_REQ_RECV, 60).
%%-define(SCTP_REQ_LISTEN, 61). MERGED
@@ -319,6 +321,12 @@
[((X) bsr 24) band 16#ff, ((X) bsr 16) band 16#ff,
((X) bsr 8) band 16#ff, (X) band 16#ff]).
+-define(int64(X),
+ [((X) bsr 56) band 16#ff, ((X) bsr 48) band 16#ff,
+ ((X) bsr 40) band 16#ff, ((X) bsr 32) band 16#ff,
+ ((X) bsr 24) band 16#ff, ((X) bsr 16) band 16#ff,
+ ((X) bsr 8) band 16#ff, (X) band 16#ff]).
+
-define(intAID(X), % For SCTP AssocID
?int32(X)).
diff --git a/lib/kernel/src/inet_res.erl b/lib/kernel/src/inet_res.erl
index 49aa5f8bda..6454802b04 100644
--- a/lib/kernel/src/inet_res.erl
+++ b/lib/kernel/src/inet_res.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1997-2016. All Rights Reserved.
+%% Copyright Ericsson AB 1997-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -349,9 +349,6 @@ gethostbyaddr_tm({A,B,C,D} = IP, Timer) when ?ip(A,B,C,D) ->
{ok, HEnt} -> {ok, HEnt};
_ -> res_gethostbyaddr(dn_in_addr_arpa(A,B,C,D), IP, Timer)
end;
-%% ipv4 only ipv6 address
-gethostbyaddr_tm({0,0,0,0,0,16#ffff,G,H},Timer) when is_integer(G+H) ->
- gethostbyaddr_tm({G div 256, G rem 256, H div 256, H rem 256},Timer);
gethostbyaddr_tm({A,B,C,D,E,F,G,H} = IP, Timer) when ?ip6(A,B,C,D,E,F,G,H) ->
inet_db:res_update_conf(),
case inet_db:gethostbyaddr(IP) of
@@ -431,28 +428,7 @@ gethostbyname(Name,Family,Timeout) ->
gethostbyname_tm(Name,inet,Timer) ->
getbyname_tm(Name,?S_A,Timer);
gethostbyname_tm(Name,inet6,Timer) ->
- case getbyname_tm(Name,?S_AAAA,Timer) of
- {ok,HEnt} -> {ok,HEnt};
- {error,nxdomain} ->
- case getbyname_tm(Name, ?S_A,Timer) of
- {ok, HEnt} ->
- %% rewrite to a ipv4 only ipv6 address
- {ok,
- HEnt#hostent {
- h_addrtype = inet6,
- h_length = 16,
- h_addr_list =
- lists:map(
- fun({A,B,C,D}) ->
- {0,0,0,0,0,16#ffff,A*256+B,C*256+D}
- end, HEnt#hostent.h_addr_list)
- }};
- Error ->
- Error
- end;
- Error ->
- Error
- end;
+ getbyname_tm(Name,?S_AAAA,Timer);
gethostbyname_tm(_Name, _Family, _Timer) ->
{error, einval}.
diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src
index 080b11fc4d..82a3571da9 100644
--- a/lib/kernel/src/kernel.app.src
+++ b/lib/kernel/src/kernel.app.src
@@ -57,6 +57,7 @@
inet_tcp_dist,
kernel,
kernel_config,
+ kernel_refc,
local_tcp,
local_udp,
net,
@@ -88,6 +89,13 @@
inet_udp,
inet_sctp,
pg2,
+ raw_file_io,
+ raw_file_io_compressed,
+ raw_file_io_deflate,
+ raw_file_io_delayed,
+ raw_file_io_inflate,
+ raw_file_io_list,
+ raw_file_io_raw,
seq_trace,
standard_error,
wrap_log_reader]},
@@ -107,6 +115,7 @@
heart,
init,
kernel_config,
+ kernel_refc,
kernel_sup,
net_kernel,
net_sup,
diff --git a/lib/kernel/src/kernel.appup.src b/lib/kernel/src/kernel.appup.src
index fc5417597f..4ee497bbbd 100644
--- a/lib/kernel/src/kernel.appup.src
+++ b/lib/kernel/src/kernel.appup.src
@@ -18,7 +18,9 @@
%% %CopyrightEnd%
{"%VSN%",
%% Up from - max one major revision back
- [{<<"5\\.3(\\.[0-9]+)*">>,[restart_new_emulator]}], % OTP-20.*
+ [{<<"5\\.[0-3](\\.[0-9]+)*">>,[restart_new_emulator]}, % OTP-19.*, OTP-20.0
+ {<<"5\\.4(\\.[0-9]+)*">>,[restart_new_emulator]}], % OTP-20.1+
%% Down to - max one major revision back
- [{<<"5\\.3(\\.[0-9]+)*">>,[restart_new_emulator]}] % OTP-20.*
+ [{<<"5\\.[0-3](\\.[0-9]+)*">>,[restart_new_emulator]}, % OTP-19.*, OTP-20.0
+ {<<"5\\.4(\\.[0-9]+)*">>,[restart_new_emulator]}] % OTP-20.1+
}.
diff --git a/lib/kernel/src/kernel.erl b/lib/kernel/src/kernel.erl
index cba57088ec..0382764b39 100644
--- a/lib/kernel/src/kernel.erl
+++ b/lib/kernel/src/kernel.erl
@@ -111,6 +111,13 @@ init([]) ->
type => worker,
modules => [kernel_config]},
+ RefC = #{id => kernel_refc,
+ start => {kernel_refc, start_link, []},
+ restart => permanent,
+ shutdown => 2000,
+ type => worker,
+ modules => [kernel_refc]},
+
Code = #{id => code_server,
start => {code, start_link, []},
restart => permanent,
@@ -148,7 +155,7 @@ init([]) ->
case init:get_argument(mode) of
{ok, [["minimal"]]} ->
- {ok, {SupFlags, [Code, File, StdError, User, Config, SafeSup]}};
+ {ok, {SupFlags, [Code, File, StdError, User, Config, RefC, SafeSup]}};
_ ->
Rpc = #{id => rex,
start => {rpc, start_link, []},
@@ -199,7 +206,7 @@ init([]) ->
{ok, {SupFlags,
[Code, Rpc, Global, InetDb | DistAC] ++
[NetSup, GlGroup, File, SigSrv,
- StdError, User, Config, SafeSup] ++ Timer}}
+ StdError, User, Config, RefC, SafeSup] ++ Timer}}
end;
init(safe) ->
SupFlags = #{strategy => one_for_one,
diff --git a/lib/kernel/src/kernel_refc.erl b/lib/kernel/src/kernel_refc.erl
new file mode 100644
index 0000000000..05076dc885
--- /dev/null
+++ b/lib/kernel/src/kernel_refc.erl
@@ -0,0 +1,139 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(kernel_refc).
+
+-behaviour(gen_server).
+
+%% External exports
+-export([start_link/0, scheduler_wall_time/1]).
+%% Internal exports
+-export([init/1, handle_info/2, terminate/2]).
+-export([handle_call/3, handle_cast/2, code_change/3]).
+
+%%%-----------------------------------------------------------------
+%%% This module implements a process that handles reference counters for
+%%% various erts or other kernel resources which needs reference counting.
+%%%
+%%% Should not be documented nor used directly by user applications.
+%%%-----------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local,kernel_refc}, kernel_refc, [], []).
+
+-spec scheduler_wall_time(boolean()) -> boolean().
+scheduler_wall_time(Bool) ->
+ gen_server:call(kernel_refc, {scheduler_wall_time, self(), Bool}, infinity).
+
+%%-----------------------------------------------------------------
+%% Callback functions from gen_server
+%%-----------------------------------------------------------------
+
+-spec init([]) -> {'ok', map()} | {'stop', term()}.
+
+init([]) ->
+ resource(scheduler_wall_time, false),
+ {ok, #{scheduler_wall_time=>#{}}}.
+
+-spec handle_call(term(), term(), State) -> {'reply', term(), State}.
+handle_call({What, Who, false}, _From, State) ->
+ {Reply, Cnt} = do_stop(What, maps:get(What, State), Who),
+ {reply, Reply, State#{What:=Cnt}};
+handle_call({What, Who, true}, _From, State) ->
+ {Reply, Cnt} = do_start(What, maps:get(What, State), Who),
+ {reply, Reply, State#{What:=Cnt}};
+handle_call(_, _From, State) ->
+ {reply, badarg, State}.
+
+-spec handle_cast(term(), State) -> {'noreply', State}.
+handle_cast(_, State) ->
+ {noreply, State}.
+
+-spec handle_info(term(), State) -> {'noreply', State}.
+handle_info({'DOWN', _Ref, process, Pid, _}, State) ->
+ Cleanup = fun(Resource, Cnts) ->
+ cleanup(Resource, Cnts, Pid)
+ end,
+ {noreply, maps:map(Cleanup, State)};
+handle_info(_, State) ->
+ {noreply, State}.
+
+-spec terminate(term(), term()) -> 'ok'.
+terminate(_Reason, _State) ->
+ ok.
+
+-spec code_change(term(), State, term()) -> {'ok', State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%-----------------------------------------------------------------
+%% Internal functions
+%%-----------------------------------------------------------------
+
+do_start(Resource, Cnt, Pid) ->
+ case maps:get(Pid, Cnt, undefined) of
+ undefined ->
+ Ref = erlang:monitor(process, Pid),
+ case any(Cnt) of
+ true ->
+ {true, Cnt#{Pid=>{1, Ref}}};
+ false ->
+ resource(Resource, true),
+ {false, Cnt#{Pid=>{1, Ref}}}
+ end;
+ {N, Ref} ->
+ {true, Cnt#{Pid=>{N+1, Ref}}}
+ end.
+
+do_stop(Resource, Cnt0, Pid) ->
+ case maps:get(Pid, Cnt0, undefined) of
+ undefined ->
+ {any(Cnt0), Cnt0};
+ {1, Ref} ->
+ erlang:demonitor(Ref, [flush]),
+ Cnt = maps:remove(Pid, Cnt0),
+ case any(Cnt) of
+ true ->
+ {true, Cnt};
+ false ->
+ resource(Resource, false),
+ {true, Cnt}
+ end;
+ {N, Ref} ->
+ {true, Cnt0#{Pid=>{N-1, Ref}}}
+ end.
+
+cleanup(Resource, Cnt0, Pid) ->
+ case maps:is_key(Pid, Cnt0) of
+ true ->
+ Cnt = maps:remove(Pid, Cnt0),
+ case any(Cnt) of
+ true ->
+ Cnt;
+ false ->
+ resource(Resource, false),
+ Cnt
+ end;
+ false ->
+ Cnt0
+ end.
+
+any(Cnt) -> maps:size(Cnt) > 0.
+
+resource(scheduler_wall_time, Enable) ->
+ _ = erts_internal:scheduler_wall_time(Enable).
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index f36b4f1e6a..f38989d103 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -70,8 +70,8 @@
protocol_childspecs/0,
epmd_module/0]).
--export([connect/1, disconnect/1, hidden_connect/1, passive_cnct/1]).
--export([hidden_connect_node/1]). %% explicit connect
+-export([disconnect/1, passive_cnct/1]).
+-export([hidden_connect_node/1]).
-export([set_net_ticktime/1, set_net_ticktime/2, get_net_ticktime/0]).
-export([node_info/1, node_info/2, nodes_info/0,
@@ -122,6 +122,7 @@
-record(connection, {
node, %% remote node name
+ conn_id, %% Connection identity
state, %% pending | up | up_pending
owner, %% owner pid
pending_owner, %% possible new owner
@@ -247,14 +248,15 @@ ticktime_res(A) when is_atom(A) -> A.
%% Called though BIF's
-connect(Node) -> do_connect(Node, normal, false).
%%% Long timeout if blocked (== barred), only affects nodes with
%%% {dist_auto_connect, once} set.
-passive_cnct(Node) -> do_connect(Node, normal, true).
-disconnect(Node) -> request({disconnect, Node}).
+passive_cnct(Node) ->
+ case request({passive_cnct, Node}) of
+ ignored -> false;
+ Other -> Other
+ end.
-%% connect but not seen
-hidden_connect(Node) -> do_connect(Node, hidden, false).
+disconnect(Node) -> request({disconnect, Node}).
%% Should this node publish itself on Node?
publish_on_node(Node) when is_atom(Node) ->
@@ -272,67 +274,30 @@ connect_node(Node) when is_atom(Node) ->
hidden_connect_node(Node) when is_atom(Node) ->
request({connect, hidden, Node}).
-do_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden
- case catch ets:lookup(sys_dist, Node) of
- {'EXIT', _} ->
- ?connect_failure(Node,{table_missing, sys_dist}),
- false;
- [#barred_connection{}] ->
- case WaitForBarred of
- false ->
- false;
- true ->
- Pid = spawn(?MODULE,passive_connect_monitor,[self(),Node]),
- receive
- {Pid, true} ->
- %%io:format("Net Kernel: barred connection (~p) "
- %% "connected from other end.~n",[Node]),
- true;
- {Pid, false} ->
- ?connect_failure(Node,{barred_connection,
- ets:lookup(sys_dist, Node)}),
- %%io:format("Net Kernel: barred connection (~p) "
- %% "- failure.~n",[Node]),
- false
- end
- end;
- Else ->
- case application:get_env(kernel, dist_auto_connect) of
- {ok, never} ->
- ?connect_failure(Node,{dist_auto_connect,never}),
- false;
- % This might happen due to connection close
- % not beeing propagated to user space yet.
- % Save the day by just not connecting...
- {ok, once} when Else =/= [],
- (hd(Else))#connection.state =:= up ->
- ?connect_failure(Node,{barred_connection,
- ets:lookup(sys_dist, Node)}),
- false;
- _ ->
- request({connect, Type, Node})
- end
- end.
-passive_connect_monitor(Parent, Node) ->
+passive_connect_monitor(From, Node) ->
ok = monitor_nodes(true,[{node_type,all}]),
- case lists:member(Node,nodes([connected])) of
- true ->
- ok = monitor_nodes(false,[{node_type,all}]),
- Parent ! {self(),true};
- _ ->
- Ref = make_ref(),
- Tref = erlang:send_after(connecttime(),self(),Ref),
- receive
- Ref ->
- ok = monitor_nodes(false,[{node_type,all}]),
- Parent ! {self(), false};
- {nodeup,Node,_} ->
- ok = monitor_nodes(false,[{node_type,all}]),
- _ = erlang:cancel_timer(Tref),
- Parent ! {self(),true}
- end
- end.
+ Reply = case lists:member(Node,nodes([connected])) of
+ true ->
+ io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]),
+ true;
+ _ ->
+ receive
+ {nodeup,Node,_} ->
+ io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]),
+ true
+ after connecttime() ->
+ io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]),
+ false
+ end
+ end,
+ ok = monitor_nodes(false,[{node_type,all}]),
+ io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]),
+ {Pid, Tag} = From,
+ io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]),
+ erlang:send(Pid, {Tag, Reply}),
+ io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]).
+
%% If the net_kernel isn't running we ignore all requests to the
%% kernel, thus basically accepting them :-)
@@ -394,40 +359,135 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) ->
end.
+do_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) ->
+ ConnLookup = ets:lookup(sys_dist, Node),
+
+ case ConnLookup of
+ [#barred_connection{}] ->
+ case WaitForBarred of
+ false ->
+ {reply, false, State};
+ true ->
+ spawn(?MODULE,passive_connect_monitor,[From,Node]),
+ {noreply, State}
+ end;
+
+ [#connection{conn_id=ConnId, state = up}] ->
+ {reply, true, State};
+ [#connection{conn_id=ConnId, waiting=Waiting}=Conn] ->
+ case From of
+ noreply -> ok;
+ _ -> ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]})
+ end,
+ {noreply, State};
+
+ _ ->
+ case application:get_env(kernel, dist_auto_connect) of
+ {ok, never} ->
+ ?connect_failure(Node,{dist_auto_connect,never}),
+ {reply, false, State};
+
+ %% This might happen due to connection close
+ %% not beeing propagated to user space yet.
+ %% Save the day by just not connecting...
+ {ok, once} when ConnLookup =/= [],
+ (hd(ConnLookup))#connection.state =:= up ->
+ ?connect_failure(Node,{barred_connection,
+ ets:lookup(sys_dist, Node)}),
+ {reply, false, State};
+ _ ->
+ case setup(ConnLookup, Node,ConnId,Type,From,State) of
+ {ok, SetupPid} ->
+ Owners = [{SetupPid, Node} | State#state.conn_owners],
+ {noreply,State#state{conn_owners=Owners}};
+ _Error ->
+ ?connect_failure(Node, {setup_call, failed, _Error}),
+ {reply, false, State}
+ end
+ end
+ end.
+
+
+do_explicit_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) ->
+ {reply, true, State};
+do_explicit_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State)
+ when Conn#connection.state =:= pending;
+ Conn#connection.state =:= up_pending ->
+ Waiting = Conn#connection.waiting,
+ ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
+ {noreply, State};
+do_explicit_connect([#barred_connection{}], Type, Node, ConnId, From , State) ->
+ %% Barred connection only affects auto_connect, ignore it.
+ do_explicit_connect([], Type, Node, ConnId, From , State);
+do_explicit_connect(ConnLookup, Type, Node, ConnId, From , State) ->
+ case setup(ConnLookup, Node,ConnId,Type,From,State) of
+ {ok, SetupPid} ->
+ Owners = [{SetupPid, Node} | State#state.conn_owners],
+ {noreply,State#state{conn_owners=Owners}};
+ _Error ->
+ ?connect_failure(Node, {setup_call, failed, _Error}),
+ {reply, false, State}
+ end.
+
+-define(ERTS_DIST_CON_ID_MASK, 16#ffffff). % also in external.h
+
+verify_new_conn_id([], {Nr,_DHandle})
+ when (Nr band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 ->
+ true;
+verify_new_conn_id([#connection{conn_id = {Old,_}}], {New,_})
+ when New =:= ((Old+1) band ?ERTS_DIST_CON_ID_MASK) ->
+ true;
+verify_new_conn_id(_, _) ->
+ false.
+
+
+
%% ------------------------------------------------------------
%% handle_call.
%% ------------------------------------------------------------
%%
-%% Set up a connection to Node.
-%% The response is delayed until the connection is up and
-%% running.
+%% Passive auto-connect to Node.
+%% The response is delayed until the connection is up and running.
%%
-handle_call({connect, _, Node}, From, State) when Node =:= node() ->
+handle_call({passive_cnct, Node}, From, State) when Node =:= node() ->
+ async_reply({reply, true, State}, From);
+handle_call({passive_cnct, Node}, From, State) ->
+ verbose({passive_cnct, Node}, 1, State),
+ Type = normal,
+ WaitForBarred = true,
+ R = case (catch erts_internal:new_connection(Node)) of
+ {Nr,_DHandle}=ConnId when is_integer(Nr) ->
+ do_auto_connect(Type, Node, ConnId, WaitForBarred, From, State);
+
+ _Error ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ {reply, false, State}
+ end,
+
+ return_call(R, From);
+
+%%
+%% Explicit connect
+%% The response is delayed until the connection is up and running.
+%%
+handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() ->
async_reply({reply, true, State}, From);
handle_call({connect, Type, Node}, From, State) ->
verbose({connect, Type, Node}, 1, State),
- case ets:lookup(sys_dist, Node) of
- [Conn] when Conn#connection.state =:= up ->
- async_reply({reply, true, State}, From);
- [Conn] when Conn#connection.state =:= pending ->
- Waiting = Conn#connection.waiting,
- ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
- {noreply, State};
- [Conn] when Conn#connection.state =:= up_pending ->
- Waiting = Conn#connection.waiting,
- ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
- {noreply, State};
- _ ->
- case setup(Node,Type,From,State) of
- {ok, SetupPid} ->
- Owners = [{SetupPid, Node} | State#state.conn_owners],
- {noreply,State#state{conn_owners=Owners}};
- _Error ->
- ?connect_failure(Node, {setup_call, failed, _Error}),
- async_reply({reply, false, State}, From)
- end
- end;
+ ConnLookup = ets:lookup(sys_dist, Node),
+ R = case (catch erts_internal:new_connection(Node)) of
+ {Nr,_DHandle}=ConnId when is_integer(Nr) ->
+ do_explicit_connect(ConnLookup, Type, Node, ConnId, From, State);
+
+ _Error ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ {reply, false, State}
+ end,
+ return_call(R, From);
+
%%
%% Close the connection to Node.
@@ -634,6 +694,26 @@ terminate(_Reason, State) ->
%% ------------------------------------------------------------
%%
+%% Asynchronous auto connect request
+%%
+handle_info({auto_connect,Node, Nr, DHandle}, State) ->
+ verbose({auto_connect, Node, Nr, DHandle}, 1, State),
+ ConnId = {Nr, DHandle},
+ NewState =
+ case do_auto_connect(normal, Node, ConnId, false, noreply, State) of
+ {noreply, S} -> %% Pending connection
+ S;
+
+ {reply, true, S} -> %% Already connected
+ S;
+
+ {reply, false, S} -> %% Connection refused
+ erts_internal:abort_connection(Node, ConnId),
+ S
+ end,
+ {noreply, NewState};
+
+%%
%% accept a new connection.
%%
handle_info({accept,AcceptPid,Socket,Family,Proto}, State) ->
@@ -713,14 +793,23 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) ->
AcceptPid ! {self(), {accept_pending, already_pending}},
{noreply, State};
_ ->
- ets:insert(sys_dist, #connection{node = Node,
- state = pending,
- owner = AcceptPid,
- address = Address,
- type = Type}),
- AcceptPid ! {self(),{accept_pending,ok}},
- Owners = [{AcceptPid,Node} | State#state.conn_owners],
- {noreply, State#state{conn_owners = Owners}}
+ case (catch erts_internal:new_connection(Node)) of
+ {Nr,_DHandle}=ConnId when is_integer(Nr) ->
+ ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
+ state = pending,
+ owner = AcceptPid,
+ address = Address,
+ type = Type}),
+ AcceptPid ! {self(),{accept_pending,ok}},
+ Owners = [{AcceptPid,Node} | State#state.conn_owners],
+ {noreply, State#state{conn_owners = Owners}};
+
+ _ ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ AcceptPid ! {self(),{accept_pending,nok_pending}}
+ end
end;
handle_info({SetupPid, {is_pending, Node}}, State) ->
@@ -906,6 +995,7 @@ pending_nodedown(Conn, Node, Type, State) ->
% Don't bar connections that have never been alive
%mark_sys_dist_nodedown(Node),
% - instead just delete the node:
+ erts_internal:abort_connection(Node, Conn#connection.conn_id),
ets:delete(sys_dist, Node),
reply_waiting(Node,Conn#connection.waiting, false),
case Type of
@@ -920,7 +1010,9 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->
AcceptPid = Conn#connection.pending_owner,
Owners = State#state.conn_owners,
Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners),
+ erts_internal:abort_connection(Node, Conn#connection.conn_id),
Conn1 = Conn#connection { owner = AcceptPid,
+ conn_id = erts_internal:new_connection(Node),
pending_owner = undefined,
state = pending },
ets:insert(sys_dist, Conn1),
@@ -928,15 +1020,16 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->
State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}.
-up_nodedown(_Conn, Node, _Reason, Type, State) ->
- mark_sys_dist_nodedown(Node),
+up_nodedown(Conn, Node, _Reason, Type, State) ->
+ mark_sys_dist_nodedown(Conn, Node),
case Type of
normal -> ?nodedown(Node, State);
_ -> ok
end,
State.
-mark_sys_dist_nodedown(Node) ->
+mark_sys_dist_nodedown(Conn, Node) ->
+ erts_internal:abort_connection(Node, Conn#connection.conn_id),
case application:get_env(kernel, dist_auto_connect) of
{ok, once} ->
ets:insert(sys_dist, #barred_connection{node = Node});
@@ -1179,15 +1272,8 @@ spawn_func(_,{From,Tag},M,F,A,Gleader) ->
%% Set up connection to a new node.
%% -----------------------------------------------------------
-setup(Node,Type,From,State) ->
- Allowed = State#state.allowed,
- case lists:member(Node, Allowed) of
- false when Allowed =/= [] ->
- error_msg("** Connection attempt with "
- "disallowed node ~w ** ~n", [Node]),
- {error, bad_node};
- _ ->
- case select_mod(Node, State#state.listen) of
+setup(ConnLookup, Node,ConnId,Type,From,State) ->
+ case setup_check(ConnLookup, Node, ConnId, State) of
{ok, L} ->
Mod = L#listen.module,
LAddr = L#listen.address,
@@ -1200,18 +1286,45 @@ setup(Node,Type,From,State) ->
Addr = LAddr#net_address {
address = undefined,
host = undefined },
+ Waiting = case From of
+ noreply -> [];
+ _ -> [From]
+ end,
ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
state = pending,
owner = Pid,
- waiting = [From],
+ waiting = Waiting,
address = Addr,
type = normal}),
{ok, Pid};
Error ->
Error
- end
end.
+setup_check(ConnLookup, Node, ConnId, State) ->
+ Allowed = State#state.allowed,
+ case lists:member(Node, Allowed) of
+ false when Allowed =/= [] ->
+ error_msg("** Connection attempt with "
+ "disallowed node ~w ** ~n", [Node]),
+ {error, bad_node};
+ _ ->
+ case verify_new_conn_id(ConnLookup, ConnId) of
+ false ->
+ error_msg("** Connection attempt to ~w with "
+ "bad connection id ~w ** ~n", [Node, ConnId]),
+ {error, bad_conn_id};
+ true ->
+ case select_mod(Node, State#state.listen) of
+ {ok, _L}=OK -> OK;
+ Error -> Error
+ end
+ end
+ end.
+
+
+
%%
%% Find a module that is willing to handle connection setup to Node
%%
@@ -1652,6 +1765,11 @@ verbose(_, _, _) ->
getnode(P) when is_pid(P) -> node(P);
getnode(P) -> P.
+return_call({noreply, _State}=R, _From) ->
+ R;
+return_call(R, From) ->
+ async_reply(R, From).
+
async_reply({reply, Msg, State}, From) ->
async_gen_server_reply(From, Msg),
{noreply, State}.
@@ -1659,16 +1777,16 @@ async_reply({reply, Msg, State}, From) ->
async_gen_server_reply(From, Msg) ->
{Pid, Tag} = From,
M = {Tag, Msg},
- case catch erlang:send(Pid, M, [nosuspend, noconnect]) of
+ try erlang:send(Pid, M, [nosuspend, noconnect]) of
ok ->
ok;
nosuspend ->
_ = spawn(fun() -> catch erlang:send(Pid, M, [noconnect]) end),
ok;
noconnect ->
- ok; % The gen module takes care of this case.
- {'EXIT', _} ->
- ok
+ ok % The gen module takes care of this case.
+ catch
+ _:_ -> ok
end.
call_owner(Owner, Msg) ->
diff --git a/lib/kernel/src/os.erl b/lib/kernel/src/os.erl
index b5f19d4b99..77c883f57f 100644
--- a/lib/kernel/src/os.erl
+++ b/lib/kernel/src/os.erl
@@ -21,18 +21,24 @@
%% Provides a common operating system interface.
--export([type/0, version/0, cmd/1, find_executable/1, find_executable/2]).
+-export([type/0, version/0, cmd/1, cmd/2, find_executable/1, find_executable/2]).
-include("file.hrl").
--export_type([env_var_name/0, env_var_value/0, env_var_name_value/0, command_input/0]).
+-export_type([env_var_name/0, env_var_value/0, env_var_name_value/0]).
+
+-export([getenv/0, getenv/1, getenv/2, putenv/2, unsetenv/1]).
%%% BIFs
--export([getenv/0, getenv/1, getenv/2, getpid/0,
- perf_counter/0, perf_counter/1,
- putenv/2, set_signal/2, system_time/0, system_time/1,
- timestamp/0, unsetenv/1]).
+-export([get_env_var/1, getpid/0, list_env_vars/0, perf_counter/0,
+ perf_counter/1, set_env_var/2, set_signal/2, system_time/0,
+ system_time/1, timestamp/0, unset_env_var/1]).
+
+-type os_command() :: atom() | io_lib:chars().
+-type os_command_opts() :: #{ max_size => non_neg_integer() | infinity }.
+
+-export_type([os_command/0, os_command_opts/0]).
-type env_var_name() :: nonempty_string().
@@ -40,31 +46,15 @@
-type env_var_name_value() :: nonempty_string().
--type command_input() :: atom() | io_lib:chars().
-
--spec getenv() -> [env_var_name_value()].
-
-getenv() -> erlang:nif_error(undef).
-
--spec getenv(VarName) -> Value | false when
- VarName :: env_var_name(),
- Value :: env_var_value().
-
-getenv(_) ->
+-spec list_env_vars() -> [{env_var_name(), env_var_value()}].
+list_env_vars() ->
erlang:nif_error(undef).
--spec getenv(VarName, DefaultValue) -> Value when
+-spec get_env_var(VarName) -> Value | false when
VarName :: env_var_name(),
- DefaultValue :: env_var_value(),
Value :: env_var_value().
-
-getenv(VarName, DefaultValue) ->
- case os:getenv(VarName) of
- false ->
- DefaultValue;
- Value ->
- Value
- end.
+get_env_var(_VarName) ->
+ erlang:nif_error(undef).
-spec getpid() -> Value when
Value :: string().
@@ -84,11 +74,10 @@ perf_counter() ->
perf_counter(Unit) ->
erlang:convert_time_unit(os:perf_counter(), perf_counter, Unit).
--spec putenv(VarName, Value) -> true when
+-spec set_env_var(VarName, Value) -> true when
VarName :: env_var_name(),
Value :: env_var_value().
-
-putenv(_, _) ->
+set_env_var(_, _) ->
erlang:nif_error(undef).
-spec system_time() -> integer().
@@ -108,10 +97,9 @@ system_time(_Unit) ->
timestamp() ->
erlang:nif_error(undef).
--spec unsetenv(VarName) -> true when
+-spec unset_env_var(VarName) -> true when
VarName :: env_var_name().
-
-unsetenv(_) ->
+unset_env_var(_) ->
erlang:nif_error(undef).
-spec set_signal(Signal, Option) -> 'ok' when
@@ -125,6 +113,39 @@ set_signal(_Signal, _Option) ->
%%% End of BIFs
+-spec getenv() -> [env_var_name_value()].
+getenv() ->
+ [lists:flatten([Key, $=, Value]) || {Key, Value} <- os:list_env_vars() ].
+
+-spec getenv(VarName) -> Value | false when
+ VarName :: env_var_name(),
+ Value :: env_var_value().
+getenv(VarName) ->
+ os:get_env_var(VarName).
+
+-spec getenv(VarName, DefaultValue) -> Value when
+ VarName :: env_var_name(),
+ DefaultValue :: env_var_value(),
+ Value :: env_var_value().
+getenv(VarName, DefaultValue) ->
+ case os:getenv(VarName) of
+ false ->
+ DefaultValue;
+ Value ->
+ Value
+ end.
+
+-spec putenv(VarName, Value) -> true when
+ VarName :: env_var_name(),
+ Value :: env_var_value().
+putenv(VarName, Value) ->
+ os:set_env_var(VarName, Value).
+
+-spec unsetenv(VarName) -> true when
+ VarName :: env_var_name().
+unsetenv(VarName) ->
+ os:unset_env_var(VarName).
+
-spec type() -> {Osfamily, Osname} when
Osfamily :: unix | win32,
Osname :: atom().
@@ -242,14 +263,20 @@ extensions() ->
%% Executes the given command in the default shell for the operating system.
-spec cmd(Command) -> string() when
- Command :: os:command_input().
+ Command :: os_command().
cmd(Cmd) ->
+ cmd(Cmd, #{ }).
+
+-spec cmd(Command, Options) -> string() when
+ Command :: os_command(),
+ Options :: os_command_opts().
+cmd(Cmd, Opts) ->
{SpawnCmd, SpawnOpts, SpawnInput, Eot} = mk_cmd(os:type(), validate(Cmd)),
Port = open_port({spawn, SpawnCmd}, [binary, stderr_to_stdout,
stream, in, hide | SpawnOpts]),
MonRef = erlang:monitor(port, Port),
true = port_command(Port, SpawnInput),
- Bytes = get_data(Port, MonRef, Eot, []),
+ Bytes = get_data(Port, MonRef, Eot, [], 0, maps:get(max_size, Opts, infinity)),
demonitor(MonRef, [flush]),
String = unicode:characters_to_list(Bytes),
if %% Convert to unicode list if possible otherwise return bytes
@@ -314,12 +341,13 @@ validate2([List|Rest]) when is_list(List) ->
validate2(List),
validate2(Rest).
-get_data(Port, MonRef, Eot, Sofar) ->
+get_data(Port, MonRef, Eot, Sofar, Size, Max) ->
receive
{Port, {data, Bytes}} ->
- case eot(Bytes, Eot) of
+ case eot(Bytes, Eot, Size, Max) of
more ->
- get_data(Port, MonRef, Eot, [Sofar,Bytes]);
+ get_data(Port, MonRef, Eot, [Sofar, Bytes],
+ Size + byte_size(Bytes), Max);
Last ->
catch port_close(Port),
flush_until_down(Port, MonRef),
@@ -330,13 +358,16 @@ get_data(Port, MonRef, Eot, Sofar) ->
iolist_to_binary(Sofar)
end.
-eot(_Bs, <<>>) ->
+eot(Bs, <<>>, Size, Max) when Size + byte_size(Bs) < Max ->
more;
-eot(Bs, Eot) ->
+eot(Bs, <<>>, Size, Max) ->
+ binary:part(Bs, {0, Max - Size});
+eot(Bs, Eot, Size, Max) ->
case binary:match(Bs, Eot) of
- nomatch -> more;
- {Pos, _} ->
- binary:part(Bs,{0, Pos})
+ {Pos, _} when Size + Pos < Max ->
+ binary:part(Bs,{0, Pos});
+ _ ->
+ eot(Bs, <<>>, Size, Max)
end.
%% When port_close returns we know that all the
diff --git a/lib/kernel/src/raw_file_io.erl b/lib/kernel/src/raw_file_io.erl
new file mode 100644
index 0000000000..e3c07c8f78
--- /dev/null
+++ b/lib/kernel/src/raw_file_io.erl
@@ -0,0 +1,75 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io).
+
+-export([open/2]).
+
+open(Filename, Modes) ->
+ %% Layers are applied in this order, and the listed modules will call this
+ %% function again as necessary. eg. a raw compressed delayed file in list
+ %% mode will walk through [_list -> _compressed -> _delayed -> _raw].
+ ModuleOrder = [{raw_file_io_list, fun match_list/1},
+ {raw_file_io_compressed, fun match_compressed/1},
+ {raw_file_io_delayed, fun match_delayed/1},
+ {raw_file_io_raw, fun match_raw/1}],
+ open_1(ModuleOrder, Filename, add_implicit_modes(Modes)).
+open_1([], _Filename, _Modes) ->
+ error(badarg);
+open_1([{Module, Match} | Rest], Filename, Modes) ->
+ case lists:any(Match, Modes) of
+ true ->
+ {Options, ChildModes} =
+ lists:partition(fun(Mode) -> Match(Mode) end, Modes),
+ Module:open_layer(Filename, ChildModes, Options);
+ false ->
+ open_1(Rest, Filename, Modes)
+ end.
+
+%% 'read' and 'list' mode are enabled unless disabled by another option, so
+%% we'll explicitly add them to avoid duplicating this logic in child layers.
+add_implicit_modes(Modes0) ->
+ Modes1 = add_unless_matched(Modes0, fun match_writable/1, read),
+ add_unless_matched(Modes1, fun match_binary/1, list).
+add_unless_matched(Modes, Match, Default) ->
+ case lists:any(Match, Modes) of
+ false -> [Default | Modes];
+ true -> Modes
+ end.
+
+match_list(list) -> true;
+match_list(_Other) -> false.
+
+match_compressed(compressed) -> true;
+match_compressed(_Other) -> false.
+
+match_delayed({delayed_write, _Size, _Timeout}) -> true;
+match_delayed(delayed_write) -> true;
+match_delayed(_Other) -> false.
+
+match_raw(raw) -> true;
+match_raw(_Other) -> false.
+
+match_writable(write) -> true;
+match_writable(append) -> true;
+match_writable(exclusive) -> true;
+match_writable(_Other) -> false.
+
+match_binary(binary) -> true;
+match_binary(_Other) -> false.
diff --git a/lib/kernel/src/raw_file_io_compressed.erl b/lib/kernel/src/raw_file_io_compressed.erl
new file mode 100644
index 0000000000..d5ab042d25
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_compressed.erl
@@ -0,0 +1,134 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_compressed).
+
+-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3,
+ position/2, write/2, pwrite/2, pwrite/3,
+ read_line/1, read/2, pread/2, pread/3]).
+
+%% OTP internal.
+-export([ipread_s32bu_p32bu/3, sendfile/8]).
+
+-export([open_layer/3]).
+
+-include("file_int.hrl").
+
+open_layer(Filename, Modes, Options) ->
+ IsAppend = lists:member(append, Modes),
+ IsDeflate = lists:member(write, Modes),
+ IsInflate = lists:member(read, Modes),
+ if
+ IsDeflate, IsInflate; IsAppend ->
+ {error, einval};
+ IsDeflate, not IsInflate ->
+ start_server_module(raw_file_io_deflate, Filename, Modes, Options);
+ IsInflate ->
+ start_server_module(raw_file_io_inflate, Filename, Modes, Options)
+ end.
+
+start_server_module(Module, Filename, Modes, Options) ->
+ Secret = make_ref(),
+ case gen_statem:start(Module, {self(), Secret, Options}, []) of
+ {ok, Pid} -> open_next_layer(Pid, Secret, Filename, Modes);
+ Other -> Other
+ end.
+
+open_next_layer(Pid, Secret, Filename, Modes) ->
+ case gen_statem:call(Pid, {'$open', Secret, Filename, Modes}, infinity) of
+ ok ->
+ PublicFd = #file_descriptor{
+ module = raw_file_io_compressed, data = {self(), Pid} },
+ {ok, PublicFd};
+ Other -> Other
+ end.
+
+close(Fd) ->
+ wrap_call(Fd, [close]).
+
+sync(Fd) ->
+ wrap_call(Fd, [sync]).
+datasync(Fd) ->
+ wrap_call(Fd, [datasync]).
+
+truncate(Fd) ->
+ wrap_call(Fd, [truncate]).
+
+advise(Fd, Offset, Length, Advise) ->
+ wrap_call(Fd, [advise, Offset, Length, Advise]).
+allocate(Fd, Offset, Length) ->
+ wrap_call(Fd, [allocate, Offset, Length]).
+
+position(Fd, Mark) ->
+ wrap_call(Fd, [position, Mark]).
+
+write(Fd, IOData) ->
+ try
+ CompactedData = erlang:iolist_to_iovec(IOData),
+ wrap_call(Fd, [write, CompactedData])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+
+pwrite(Fd, Offset, IOData) ->
+ try
+ CompactedData = erlang:iolist_to_iovec(IOData),
+ wrap_call(Fd, [pwrite, Offset, CompactedData])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+pwrite(Fd, LocBytes) ->
+ try
+ CompactedLocBytes =
+ [ {Offset, erlang:iolist_to_iovec(IOData)} ||
+ {Offset, IOData} <- LocBytes ],
+ wrap_call(Fd, [pwrite, CompactedLocBytes])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+
+read_line(Fd) ->
+ wrap_call(Fd, [read_line]).
+read(Fd, Size) ->
+ wrap_call(Fd, [read, Size]).
+pread(Fd, Offset, Size) ->
+ wrap_call(Fd, [pread, Offset, Size]).
+pread(Fd, LocNums) ->
+ wrap_call(Fd, [pread, LocNums]).
+
+ipread_s32bu_p32bu(Fd, Offset, MaxSize) ->
+ wrap_call(Fd, [ipread_s32bu_p32bu, Offset, MaxSize]).
+
+sendfile(_,_,_,_,_,_,_,_) ->
+ {error, enotsup}.
+
+wrap_call(Fd, Command) ->
+ {_Owner, Pid} = get_fd_data(Fd),
+ try gen_statem:call(Pid, Command, infinity) of
+ Result -> Result
+ catch
+ exit:{noproc, _StackTrace} -> {error, einval}
+ end.
+
+get_fd_data(#file_descriptor{ data = Data }) ->
+ {Owner, _ServerPid} = Data,
+ case self() of
+ Owner -> Data;
+ _ -> error(not_on_controlling_process)
+ end.
diff --git a/lib/kernel/src/raw_file_io_deflate.erl b/lib/kernel/src/raw_file_io_deflate.erl
new file mode 100644
index 0000000000..acfc546743
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_deflate.erl
@@ -0,0 +1,159 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_deflate).
+
+-behavior(gen_statem).
+
+-export([init/1, callback_mode/0, terminate/3]).
+-export([opening/3, opened/3]).
+
+-include("file_int.hrl").
+
+-define(GZIP_WBITS, 16 + 15).
+
+callback_mode() -> state_functions.
+
+init({Owner, Secret, [compressed]}) ->
+ Monitor = monitor(process, Owner),
+ Z = zlib:open(),
+ ok = zlib:deflateInit(Z, default, deflated, ?GZIP_WBITS, 8, default),
+ Data =
+ #{ owner => Owner,
+ monitor => Monitor,
+ secret => Secret,
+ position => 0,
+ zlib => Z },
+ {ok, opening, Data}.
+
+opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
+ case raw_file_io:open(Filename, Modes) of
+ {ok, PrivateFd} ->
+ NewData = Data#{ handle => PrivateFd },
+ {next_state, opened, NewData, [{reply, From, ok}]};
+ Other ->
+ {stop_and_reply, normal, [{reply, From, Other}]}
+ end;
+opening(_Event, _Contents, _Data) ->
+ {keep_state_and_data, [postpone]}.
+
+%%
+
+opened(info, {'DOWN', Monitor, process, _Owner, Reason}, #{ monitor := Monitor } = Data) ->
+ if
+ Reason =/= kill -> flush_deflate_state(Data);
+ Reason =:= kill -> ignored
+ end,
+ {stop, shutdown};
+
+opened(info, _Message, _Data) ->
+ keep_state_and_data;
+
+opened({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
+ #{ handle := PrivateFd } = Data,
+ Response =
+ case flush_deflate_state(Data) of
+ ok -> ?CALL_FD(PrivateFd, close, []);
+ Other -> Other
+ end,
+ {stop_and_reply, normal, [{reply, From, Response}]};
+
+opened({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
+ case position(Data, Mark) of
+ {ok, NewData, Result} ->
+ Response = {ok, Result},
+ {keep_state, NewData, [{reply, From, Response}]};
+ Other ->
+ {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened({call, {Owner, _Tag} = From}, [write, IOVec], #{ owner := Owner } = Data) ->
+ case write(Data, IOVec) of
+ {ok, NewData} -> {keep_state, NewData, [{reply, From, ok}]};
+ Other -> {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened({call, {Owner, _Tag} = From}, [read, _Size], #{ owner := Owner }) ->
+ Response = {error, ebadf},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner }) ->
+ Response = {error, ebadf},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, {Owner, _Tag} = From}, _Command, #{ owner := Owner }) ->
+ Response = {error, enotsup},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, _From}, _Command, _Data) ->
+ %% The client functions filter this out, so we'll crash if the user does
+ %% anything stupid on purpose.
+ {shutdown, protocol_violation};
+
+opened(_Event, _Request, _Data) ->
+ keep_state_and_data.
+
+write(Data, IOVec) ->
+ #{ handle := PrivateFd, position := Position, zlib := Z } = Data,
+ UncompressedSize = iolist_size(IOVec),
+ case ?CALL_FD(PrivateFd, write, [zlib:deflate(Z, IOVec)]) of
+ ok -> {ok, Data#{ position := (Position + UncompressedSize) }};
+ Other -> Other
+ end.
+
+%%
+%% We support "seeking" forward as long as it isn't relative to EOF.
+%%
+%% Seeking is a bit of a misnomer as it's really just compressing zeroes until
+%% we reach the desired point, but it has always behaved like this.
+%%
+
+position(Data, Mark) when is_atom(Mark) ->
+ position(Data, {Mark, 0});
+position(Data, Offset) when is_integer(Offset) ->
+ position(Data, {bof, Offset});
+position(Data, {bof, Offset}) when is_integer(Offset) ->
+ position_1(Data, Offset);
+position(Data, {cur, Offset}) when is_integer(Offset) ->
+ #{ position := Position } = Data,
+ position_1(Data, Position + Offset);
+position(_Data, {eof, Offset}) when is_integer(Offset) ->
+ {error, einval};
+position(_Data, _Any) ->
+ {error, badarg}.
+
+position_1(#{ position := Desired } = Data, Desired) ->
+ {ok, Data, Desired};
+position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
+ BytesToWrite = min(Desired - Current, 4 bsl 20),
+ case write(Data, <<0:(BytesToWrite)/unit:8>>) of
+ {ok, NewData} -> position_1(NewData, Desired);
+ Other -> Other
+ end;
+position_1(#{ position := Current }, Desired) when Current > Desired ->
+ {error, einval}.
+
+flush_deflate_state(#{ handle := PrivateFd, zlib := Z }) ->
+ case ?CALL_FD(PrivateFd, write, [zlib:deflate(Z, [], finish)]) of
+ ok -> ok;
+ Other -> Other
+ end.
+
+terminate(_Reason, _State, _Data) ->
+ ok.
diff --git a/lib/kernel/src/raw_file_io_delayed.erl b/lib/kernel/src/raw_file_io_delayed.erl
new file mode 100644
index 0000000000..d2ad7550a1
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_delayed.erl
@@ -0,0 +1,320 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_delayed).
+
+-behavior(gen_statem).
+
+-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3,
+ position/2, write/2, pwrite/2, pwrite/3,
+ read_line/1, read/2, pread/2, pread/3]).
+
+%% OTP internal.
+-export([ipread_s32bu_p32bu/3, sendfile/8]).
+
+-export([open_layer/3]).
+
+-export([init/1, callback_mode/0, terminate/3]).
+-export([opening/3, opened/3]).
+
+-include("file_int.hrl").
+
+open_layer(Filename, Modes, Options) ->
+ Secret = make_ref(),
+ case gen_statem:start(?MODULE, {self(), Secret, Options}, []) of
+ {ok, Pid} ->
+ gen_statem:call(Pid, {'$open', Secret, Filename, Modes}, infinity);
+ Other ->
+ Other
+ end.
+
+callback_mode() -> state_functions.
+
+init({Owner, Secret, Options}) ->
+ Monitor = monitor(process, Owner),
+ Defaults =
+ #{ owner => Owner,
+ monitor => Monitor,
+ secret => Secret,
+ timer => none,
+ pid => self(),
+ buffer => prim_buffer:new(),
+ delay_size => 64 bsl 10,
+ delay_time => 2000 },
+ Data = fill_delay_values(Defaults, Options),
+ {ok, opening, Data}.
+
+fill_delay_values(Data, []) ->
+ Data;
+fill_delay_values(Data, [{delayed_write, Size, Time} | Options]) ->
+ fill_delay_values(Data#{ delay_size => Size, delay_time => Time }, Options);
+fill_delay_values(Data, [_ | Options]) ->
+ fill_delay_values(Data, Options).
+
+opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
+ case raw_file_io:open(Filename, Modes) of
+ {ok, PrivateFd} ->
+ PublicData = maps:with([owner, buffer, delay_size, pid], Data),
+ PublicFd = #file_descriptor{ module = ?MODULE, data = PublicData },
+
+ NewData = Data#{ handle => PrivateFd },
+ Response = {ok, PublicFd},
+ {next_state, opened, NewData, [{reply, From, Response}]};
+ Other ->
+ {stop_and_reply, normal, [{reply, From, Other}]}
+ end;
+opening(_Event, _Contents, _Data) ->
+ {keep_state_and_data, [postpone]}.
+
+%%
+
+opened(info, {'$timed_out', Secret}, #{ secret := Secret } = Data) ->
+ %% If the user writes something at this exact moment, the flush will fail
+ %% and the timer won't reset on the next write since the buffer won't be
+ %% empty (Unless we collided on a flush). We therefore reset the timeout to
+ %% ensure that data won't sit idle for extended periods of time.
+ case try_flush_write_buffer(Data) of
+ busy -> gen_statem:cast(self(), '$reset_timeout');
+ ok -> ok
+ end,
+ {keep_state, Data#{ timer => none }, []};
+
+opened(info, {'DOWN', Monitor, process, _Owner, Reason}, #{ monitor := Monitor } = Data) ->
+ if
+ Reason =/= kill -> try_flush_write_buffer(Data);
+ Reason =:= kill -> ignored
+ end,
+ {stop, shutdown};
+
+opened(info, _Message, _Data) ->
+ keep_state_and_data;
+
+opened({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
+ case flush_write_buffer(Data) of
+ ok ->
+ #{ handle := PrivateFd } = Data,
+ Response = ?CALL_FD(PrivateFd, close, []),
+ {stop_and_reply, normal, [{reply, From, Response}]};
+ Other ->
+ {stop_and_reply, normal, [{reply, From, Other}]}
+ end;
+
+opened({call, {Owner, _Tag} = From}, '$wait', #{ owner := Owner }) ->
+ %% Used in write/2 to synchronize writes on lock conflicts.
+ {keep_state_and_data, [{reply, From, ok}]};
+
+opened({call, {Owner, _Tag} = From}, '$synchronous_flush', #{ owner := Owner } = Data) ->
+ cancel_flush_timeout(Data),
+ Response = flush_write_buffer(Data),
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, {Owner, _Tag} = From}, Command, #{ owner := Owner } = Data) ->
+ Response =
+ case flush_write_buffer(Data) of
+ ok -> dispatch_command(Data, Command);
+ Other -> Other
+ end,
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, _From}, _Command, _Data) ->
+ %% The client functions filter this out, so we'll crash if the user does
+ %% anything stupid on purpose.
+ {shutdown, protocol_violation};
+
+opened(cast, '$reset_timeout', #{ delay_time := Timeout, secret := Secret } = Data) ->
+ cancel_flush_timeout(Data),
+ Timer = erlang:send_after(Timeout, self(), {'$timed_out', Secret}),
+ {keep_state, Data#{ timer => Timer }, []};
+
+opened(cast, _Message, _Data) ->
+ {keep_state_and_data, []}.
+
+dispatch_command(Data, [Function | Args]) ->
+ #{ handle := Handle } = Data,
+ Module = Handle#file_descriptor.module,
+ apply(Module, Function, [Handle | Args]).
+
+cancel_flush_timeout(#{ timer := none }) ->
+ ok;
+cancel_flush_timeout(#{ timer := Timer }) ->
+ _ = erlang:cancel_timer(Timer, [{async, true}]),
+ ok.
+
+try_flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) ->
+ case prim_buffer:try_lock(Buffer) of
+ acquired ->
+ flush_write_buffer_1(Buffer, PrivateFd),
+ prim_buffer:unlock(Buffer),
+ ok;
+ busy ->
+ busy
+ end.
+
+%% This is only safe to use when there is no chance of conflict with the owner
+%% process, or in other words, "during synchronous calls outside of the locked
+%% section of write/2"
+flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) ->
+ acquired = prim_buffer:try_lock(Buffer),
+ Result = flush_write_buffer_1(Buffer, PrivateFd),
+ prim_buffer:unlock(Buffer),
+ Result.
+
+flush_write_buffer_1(Buffer, PrivateFd) ->
+ case prim_buffer:size(Buffer) of
+ Size when Size > 0 ->
+ ?CALL_FD(PrivateFd, write, [prim_buffer:read_iovec(Buffer, Size)]);
+ 0 ->
+ ok
+ end.
+
+terminate(_Reason, _State, _Data) ->
+ ok.
+
+%% Client functions
+
+write(Fd, IOData) ->
+ try
+ enqueue_write(Fd, erlang:iolist_to_iovec(IOData))
+ catch
+ error:badarg -> {error, badarg}
+ end.
+enqueue_write(_Fd, []) ->
+ ok;
+enqueue_write(Fd, IOVec) ->
+ %% get_fd_data will reject everyone except the process that opened the Fd,
+ %% so we can't race with anyone except the wrapper process.
+ #{ delay_size := DelaySize,
+ buffer := Buffer,
+ pid := Pid } = get_fd_data(Fd),
+ case prim_buffer:try_lock(Buffer) of
+ acquired ->
+ %% (The wrapper process will exit without flushing if we're killed
+ %% while holding the lock).
+ enqueue_write_locked(Pid, Buffer, DelaySize, IOVec);
+ busy ->
+ %% This can only happen while we're processing a timeout in the
+ %% wrapper process, so we perform a bogus call to get a completion
+ %% notification before trying again.
+ gen_statem:call(Pid, '$wait'),
+ enqueue_write(Fd, IOVec)
+ end.
+enqueue_write_locked(Pid, Buffer, DelaySize, IOVec) ->
+ %% The synchronous operations (write, forced flush) are safe since we're
+ %% running on the only process that can fill the buffer; a timeout being
+ %% processed just before $synchronous_flush will cause the flush to nop,
+ %% and a timeout sneaking in just before a synchronous write won't do
+ %% anything since the buffer is guaranteed to be empty at that point.
+ BufSize = prim_buffer:size(Buffer),
+ case is_iovec_smaller_than(IOVec, DelaySize - BufSize) of
+ true when BufSize > 0 ->
+ prim_buffer:write(Buffer, IOVec),
+ prim_buffer:unlock(Buffer);
+ true ->
+ prim_buffer:write(Buffer, IOVec),
+ prim_buffer:unlock(Buffer),
+ gen_statem:cast(Pid, '$reset_timeout');
+ false when BufSize > 0 ->
+ prim_buffer:write(Buffer, IOVec),
+ prim_buffer:unlock(Buffer),
+ gen_statem:call(Pid, '$synchronous_flush');
+ false ->
+ prim_buffer:unlock(Buffer),
+ gen_statem:call(Pid, [write, IOVec])
+ end.
+
+%% iolist_size/1 will always look through the entire list to get a precise
+%% amount, which is pretty inefficient since we only need to know whether we've
+%% hit the buffer threshold or not.
+%%
+%% We only handle the binary case since write/2 forcibly translates input to
+%% erlang:iovec().
+is_iovec_smaller_than(IOVec, Max) ->
+ is_iovec_smaller_than_1(IOVec, Max, 0).
+is_iovec_smaller_than_1(_IOVec, Max, Acc) when Acc >= Max ->
+ false;
+is_iovec_smaller_than_1([], _Max, _Acc) ->
+ true;
+is_iovec_smaller_than_1([Binary | Rest], Max, Acc) when is_binary(Binary) ->
+ is_iovec_smaller_than_1(Rest, Max, Acc + byte_size(Binary)).
+
+close(Fd) ->
+ wrap_call(Fd, [close]).
+
+sync(Fd) ->
+ wrap_call(Fd, [sync]).
+datasync(Fd) ->
+ wrap_call(Fd, [datasync]).
+
+truncate(Fd) ->
+ wrap_call(Fd, [truncate]).
+
+advise(Fd, Offset, Length, Advise) ->
+ wrap_call(Fd, [advise, Offset, Length, Advise]).
+allocate(Fd, Offset, Length) ->
+ wrap_call(Fd, [allocate, Offset, Length]).
+
+position(Fd, Mark) ->
+ wrap_call(Fd, [position, Mark]).
+
+pwrite(Fd, Offset, IOData) ->
+ try
+ CompactedData = erlang:iolist_to_iovec(IOData),
+ wrap_call(Fd, [pwrite, Offset, CompactedData])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+pwrite(Fd, LocBytes) ->
+ try
+ CompactedLocBytes =
+ [ {Offset, erlang:iolist_to_iovec(IOData)} ||
+ {Offset, IOData} <- LocBytes ],
+ wrap_call(Fd, [pwrite, CompactedLocBytes])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+
+read_line(Fd) ->
+ wrap_call(Fd, [read_line]).
+read(Fd, Size) ->
+ wrap_call(Fd, [read, Size]).
+pread(Fd, Offset, Size) ->
+ wrap_call(Fd, [pread, Offset, Size]).
+pread(Fd, LocNums) ->
+ wrap_call(Fd, [pread, LocNums]).
+
+ipread_s32bu_p32bu(Fd, Offset, MaxSize) ->
+ wrap_call(Fd, [ipread_s32bu_p32bu, Offset, MaxSize]).
+
+sendfile(_,_,_,_,_,_,_,_) ->
+ {error, enotsup}.
+
+wrap_call(Fd, Command) ->
+ #{ pid := Pid } = get_fd_data(Fd),
+ try gen_statem:call(Pid, Command, infinity) of
+ Result -> Result
+ catch
+ exit:{noproc, _StackTrace} -> {error, einval}
+ end.
+
+get_fd_data(#file_descriptor{ data = Data }) ->
+ #{ owner := Owner } = Data,
+ case self() of
+ Owner -> Data;
+ _ -> error(not_on_controlling_process)
+ end.
diff --git a/lib/kernel/src/raw_file_io_inflate.erl b/lib/kernel/src/raw_file_io_inflate.erl
new file mode 100644
index 0000000000..7e9780310c
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_inflate.erl
@@ -0,0 +1,261 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_inflate).
+
+-behavior(gen_statem).
+
+-export([init/1, callback_mode/0, terminate/3]).
+-export([opening/3, opened_gzip/3, opened_passthrough/3]).
+
+-include("file_int.hrl").
+
+-define(INFLATE_CHUNK_SIZE, (1 bsl 10)).
+-define(GZIP_WBITS, (16 + 15)).
+
+callback_mode() -> state_functions.
+
+init({Owner, Secret, [compressed]}) ->
+ Monitor = monitor(process, Owner),
+ %% We're using the undocumented inflateInit/3 to open the stream in
+ %% 'reset mode', which resets the inflate state at the end of every stream,
+ %% allowing us to read concatenated gzip files.
+ Z = zlib:open(),
+ ok = zlib:inflateInit(Z, ?GZIP_WBITS, reset),
+ Data =
+ #{ owner => Owner,
+ monitor => Monitor,
+ secret => Secret,
+ position => 0,
+ buffer => prim_buffer:new(),
+ zlib => Z },
+ {ok, opening, Data}.
+
+%% The old driver fell back to plain reads if the file didn't start with the
+%% magic gzip bytes.
+choose_decompression_state(PrivateFd) ->
+ State =
+ case ?CALL_FD(PrivateFd, read, [2]) of
+ {ok, <<16#1F, 16#8B>>} -> opened_gzip;
+ _Other -> opened_passthrough
+ end,
+ {ok, 0} = ?CALL_FD(PrivateFd, position, [0]),
+ State.
+
+opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
+ case raw_file_io:open(Filename, Modes) of
+ {ok, PrivateFd} ->
+ NextState = choose_decompression_state(PrivateFd),
+ NewData = Data#{ handle => PrivateFd },
+ {next_state, NextState, NewData, [{reply, From, ok}]};
+ Other ->
+ {stop_and_reply, normal, [{reply, From, Other}]}
+ end;
+opening(_Event, _Contents, _Data) ->
+ {keep_state_and_data, [postpone]}.
+
+internal_close(From, Data) ->
+ #{ handle := PrivateFd } = Data,
+ Response = ?CALL_FD(PrivateFd, close, []),
+ {stop_and_reply, normal, [{reply, From, Response}]}.
+
+opened_passthrough(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
+ {stop, shutdown};
+
+opened_passthrough(info, _Message, _Data) ->
+ keep_state_and_data;
+
+opened_passthrough({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
+ internal_close(From, Data);
+
+opened_passthrough({call, {Owner, _Tag} = From}, [Method | Args], #{ owner := Owner } = Data) ->
+ #{ handle := PrivateFd } = Data,
+ Response = ?CALL_FD(PrivateFd, Method, Args),
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened_passthrough({call, _From}, _Command, _Data) ->
+ %% The client functions filter this out, so we'll crash if the user does
+ %% anything stupid on purpose.
+ {shutdown, protocol_violation};
+
+opened_passthrough(_Event, _Request, _Data) ->
+ keep_state_and_data.
+
+%%
+
+opened_gzip(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
+ {stop, shutdown};
+
+opened_gzip(info, _Message, _Data) ->
+ keep_state_and_data;
+
+opened_gzip({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
+ internal_close(From, Data);
+
+opened_gzip({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
+ case position(Data, Mark) of
+ {ok, NewData, Result} ->
+ Response = {ok, Result},
+ {keep_state, NewData, [{reply, From, Response}]};
+ Other ->
+ {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened_gzip({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Data) ->
+ case read(Data, Size) of
+ {ok, NewData, Result} ->
+ Response = {ok, Result},
+ {keep_state, NewData, [{reply, From, Response}]};
+ Other ->
+ {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened_gzip({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Data) ->
+ case read_line(Data) of
+ {ok, NewData, Result} ->
+ Response = {ok, Result},
+ {keep_state, NewData, [{reply, From, Response}]};
+ Other ->
+ {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened_gzip({call, {Owner, _Tag} = From}, [write, _IOData], #{ owner := Owner }) ->
+ Response = {error, ebadf},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened_gzip({call, {Owner, _Tag} = From}, _Request, #{ owner := Owner }) ->
+ Response = {error, enotsup},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened_gzip({call, _From}, _Request, _Data) ->
+ %% The client functions filter this out, so we'll crash if the user does
+ %% anything stupid on purpose.
+ {shutdown, protocol_violation};
+
+opened_gzip(_Event, _Request, _Data) ->
+ keep_state_and_data.
+
+%%
+
+read(#{ buffer := Buffer } = Data, Size) ->
+ try read_1(Data, Buffer, prim_buffer:size(Buffer), Size) of
+ Result -> Result
+ catch
+ error:badarg -> {error, badarg};
+ error:_ -> {error, eio}
+ end.
+read_1(Data, Buffer, BufferSize, ReadSize) when BufferSize >= ReadSize ->
+ #{ position := Position } = Data,
+ Decompressed = prim_buffer:read(Buffer, ReadSize),
+ {ok, Data#{ position => (Position + ReadSize) }, Decompressed};
+read_1(Data, Buffer, BufferSize, ReadSize) when BufferSize < ReadSize ->
+ #{ handle := PrivateFd } = Data,
+ case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
+ {ok, Compressed} ->
+ #{ zlib := Z } = Data,
+ Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
+ prim_buffer:write(Buffer, Uncompressed),
+ read_1(Data, Buffer, prim_buffer:size(Buffer), ReadSize);
+ eof when BufferSize > 0 ->
+ read_1(Data, Buffer, BufferSize, BufferSize);
+ Other ->
+ Other
+ end.
+
+read_line(#{ buffer := Buffer } = Data) ->
+ try read_line_1(Data, Buffer, prim_buffer:find_byte_index(Buffer, $\n)) of
+ {ok, NewData, Decompressed} -> {ok, NewData, Decompressed};
+ Other -> Other
+ catch
+ error:badarg -> {error, badarg};
+ error:_ -> {error, eio}
+ end.
+
+read_line_1(Data, Buffer, not_found) ->
+ #{ handle := PrivateFd, zlib := Z } = Data,
+ case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
+ {ok, Compressed} ->
+ Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
+ prim_buffer:write(Buffer, Uncompressed),
+ read_line_1(Data, Buffer, prim_buffer:find_byte_index(Buffer, $\n));
+ eof ->
+ case prim_buffer:size(Buffer) of
+ Size when Size > 0 -> {ok, prim_buffer:read(Buffer, Size)};
+ Size when Size =:= 0 -> eof
+ end;
+ Error ->
+ Error
+ end;
+read_line_1(Data, Buffer, {ok, LFIndex}) ->
+ %% Translate CRLF into just LF, completely ignoring which encoding is used,
+ %% but treat the file position as including CR.
+ #{ position := Position } = Data,
+ NewData = Data#{ position => (Position + LFIndex + 1) },
+ CRIndex = (LFIndex - 1),
+ TranslatedLine =
+ case prim_buffer:read(Buffer, LFIndex + 1) of
+ <<Line:CRIndex/binary, "\r\n">> -> <<Line/binary, "\n">>;
+ Line -> Line
+ end,
+ {ok, NewData, TranslatedLine}.
+
+%%
+%% We support seeking in both directions as long as it isn't relative to EOF.
+%%
+%% Seeking backwards is extremely inefficient since we have to seek to the very
+%% beginning and then decompress up to the desired point.
+%%
+
+position(Data, Mark) when is_atom(Mark) ->
+ position(Data, {Mark, 0});
+position(Data, Offset) when is_integer(Offset) ->
+ position(Data, {bof, Offset});
+position(Data, {bof, Offset}) when is_integer(Offset) ->
+ position_1(Data, Offset);
+position(Data, {cur, Offset}) when is_integer(Offset) ->
+ #{ position := Position } = Data,
+ position_1(Data, Position + Offset);
+position(_Data, {eof, Offset}) when is_integer(Offset) ->
+ {error, einval};
+position(_Data, _Other) ->
+ {error, badarg}.
+
+position_1(_Data, Desired) when Desired < 0 ->
+ {error, einval};
+position_1(#{ position := Desired } = Data, Desired) ->
+ {ok, Data, Desired};
+position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
+ case read(Data, min(Desired - Current, ?INFLATE_CHUNK_SIZE)) of
+ {ok, NewData, _Data} -> position_1(NewData, Desired);
+ eof -> {ok, Data, Current};
+ Other -> Other
+ end;
+position_1(#{ position := Current } = Data, Desired) when Current > Desired ->
+ #{ handle := PrivateFd, buffer := Buffer, zlib := Z } = Data,
+ case ?CALL_FD(PrivateFd, position, [bof]) of
+ {ok, 0} ->
+ ok = zlib:inflateReset(Z),
+ prim_buffer:wipe(Buffer),
+ position_1(Data#{ position => 0 }, Desired);
+ Other ->
+ Other
+ end.
+
+terminate(_Reason, _State, _Data) ->
+ ok.
diff --git a/lib/kernel/src/raw_file_io_list.erl b/lib/kernel/src/raw_file_io_list.erl
new file mode 100644
index 0000000000..2e16e63f0e
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_list.erl
@@ -0,0 +1,128 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_list).
+
+-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3,
+ position/2, write/2, pwrite/2, pwrite/3,
+ read_line/1, read/2, pread/2, pread/3]).
+
+%% OTP internal.
+-export([ipread_s32bu_p32bu/3, sendfile/8]).
+
+-export([open_layer/3]).
+
+-include("file_int.hrl").
+
+open_layer(Filename, Modes, [list]) ->
+ case raw_file_io:open(Filename, [binary | Modes]) of
+ {ok, PrivateFd} -> {ok, make_public_fd(PrivateFd, Modes)};
+ Other -> Other
+ end.
+
+%% We can skip wrapping the file if it's write-only since only read operations
+%% are affected by list mode. Since raw_file_io fills in all implicit options
+%% for us, all we need to do is check whether 'read' is among them.
+make_public_fd(PrivateFd, Modes) ->
+ case lists:member(read, Modes) of
+ true -> #file_descriptor{ module = ?MODULE, data = PrivateFd };
+ false -> PrivateFd
+ end.
+
+close(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, close, []).
+
+sync(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, sync, []).
+datasync(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, datasync, []).
+
+truncate(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, truncate, []).
+
+advise(Fd, Offset, Length, Advise) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, advise, [Offset, Length, Advise]).
+allocate(Fd, Offset, Length) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, allocate, [Offset, Length]).
+
+position(Fd, Mark) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, position, [Mark]).
+
+write(Fd, IOData) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, write, [IOData]).
+
+pwrite(Fd, Offset, IOData) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, pwrite, [Offset, IOData]).
+pwrite(Fd, LocBytes) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, pwrite, [LocBytes]).
+
+read_line(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, read_line, []) of
+ {ok, Binary} -> {ok, binary_to_list(Binary)};
+ Other -> Other
+ end.
+read(Fd, Size) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, read, [Size]) of
+ {ok, Binary} -> {ok, binary_to_list(Binary)};
+ Other -> Other
+ end.
+pread(Fd, Offset, Size) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, pread, [Offset, Size]) of
+ {ok, Binary} -> {ok, binary_to_list(Binary)};
+ Other -> Other
+ end.
+pread(Fd, LocNums) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, pread, [LocNums]) of
+ {ok, LocResults} ->
+ TranslatedResults =
+ [ case Result of
+ Result when is_binary(Result) -> binary_to_list(Result);
+ eof -> eof
+ end || Result <- LocResults ],
+ {ok, TranslatedResults};
+ Other -> Other
+ end.
+
+ipread_s32bu_p32bu(Fd, Offset, MaxSize) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, ipread_s32bu_p32bu, [Offset, MaxSize]) of
+ {ok, {Size, Pointer, Binary}} when is_binary(Binary) ->
+ {ok, {Size, Pointer, binary_to_list(Binary)}};
+ Other ->
+ Other
+ end.
+
+sendfile(Fd, Dest, Offset, Bytes, ChunkSize, Headers, Trailers, Flags) ->
+ Args = [Dest, Offset, Bytes, ChunkSize, Headers, Trailers, Flags],
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, sendfile, Args).
diff --git a/lib/kernel/src/raw_file_io_raw.erl b/lib/kernel/src/raw_file_io_raw.erl
new file mode 100644
index 0000000000..9a9fe78eb1
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_raw.erl
@@ -0,0 +1,25 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_raw).
+
+-export([open_layer/3]).
+
+open_layer(Filename, Modes, [raw]) ->
+ prim_file:open(Filename, [raw | Modes]).
diff --git a/lib/kernel/src/rpc.erl b/lib/kernel/src/rpc.erl
index 0e0b7dffa3..d197de942f 100644
--- a/lib/kernel/src/rpc.erl
+++ b/lib/kernel/src/rpc.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2017. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -418,10 +418,7 @@ abcast(Name, Mess) ->
abcast([Node|Tail], Name, Mess) ->
Dest = {Name,Node},
- case catch erlang:send(Dest, Mess, [noconnect]) of
- noconnect -> spawn(erlang, send, [Dest,Mess]), ok;
- _ -> ok
- end,
+ try erlang:send(Dest, Mess) catch error:_ -> ok end,
abcast(Tail, Name, Mess);
abcast([], _,_) -> abcast.
@@ -498,7 +495,7 @@ start_monitor(Node, Name) ->
Module :: module(),
Function :: atom(),
Args :: [term()],
- ResL :: [term()],
+ ResL :: [Res :: term() | {'badrpc', Reason :: term()}],
BadNodes :: [node()].
multicall(M, F, A) ->
@@ -509,14 +506,14 @@ multicall(M, F, A) ->
Module :: module(),
Function :: atom(),
Args :: [term()],
- ResL :: [term()],
+ ResL :: [Res :: term() | {'badrpc', Reason :: term()}],
BadNodes :: [node()];
(Module, Function, Args, Timeout) -> {ResL, BadNodes} when
Module :: module(),
Function :: atom(),
Args :: [term()],
Timeout :: timeout(),
- ResL :: [term()],
+ ResL :: [Res :: term() | {'badrpc', Reason :: term()}],
BadNodes :: [node()].
multicall(Nodes, M, F, A) when is_list(Nodes) ->
@@ -531,7 +528,7 @@ multicall(M, F, A, Timeout) ->
Function :: atom(),
Args :: [term()],
Timeout :: timeout(),
- ResL :: [term()],
+ ResL :: [Res :: term() | {'badrpc', Reason :: term()}],
BadNodes :: [node()].
multicall(Nodes, M, F, A, infinity)