aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel')
-rw-r--r--lib/kernel/src/Makefile17
-rw-r--r--lib/kernel/src/file.erl40
-rw-r--r--lib/kernel/src/file_int.hrl33
-rw-r--r--lib/kernel/src/file_io_server.erl52
-rw-r--r--lib/kernel/src/file_server.erl137
-rw-r--r--lib/kernel/src/inet_int.hrl8
-rw-r--r--lib/kernel/src/kernel.app.src7
-rw-r--r--lib/kernel/src/raw_file_io.erl75
-rw-r--r--lib/kernel/src/raw_file_io_compressed.erl134
-rw-r--r--lib/kernel/src/raw_file_io_deflate.erl159
-rw-r--r--lib/kernel/src/raw_file_io_delayed.erl320
-rw-r--r--lib/kernel/src/raw_file_io_inflate.erl261
-rw-r--r--lib/kernel/src/raw_file_io_list.erl128
-rw-r--r--lib/kernel/src/raw_file_io_raw.erl25
14 files changed, 1265 insertions, 131 deletions
diff --git a/lib/kernel/src/Makefile b/lib/kernel/src/Makefile
index 5946620f0f..4a713b2a99 100644
--- a/lib/kernel/src/Makefile
+++ b/lib/kernel/src/Makefile
@@ -120,6 +120,13 @@ MODULES = \
user \
user_drv \
user_sup \
+ raw_file_io \
+ raw_file_io_compressed \
+ raw_file_io_inflate \
+ raw_file_io_deflate \
+ raw_file_io_delayed \
+ raw_file_io_list \
+ raw_file_io_raw \
wrap_log_reader
HRL_FILES= ../include/file.hrl ../include/inet.hrl ../include/inet_sctp.hrl \
@@ -226,7 +233,8 @@ $(EBIN)/disk_log_server.beam: disk_log.hrl
$(EBIN)/dist_util.beam: ../include/dist_util.hrl ../include/dist.hrl
$(EBIN)/erl_boot_server.beam: inet_boot.hrl
$(EBIN)/erl_epmd.beam: inet_int.hrl erl_epmd.hrl
-$(EBIN)/file.beam: ../include/file.hrl
+$(EBIN)/file.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/file_io_server.beam: ../include/file.hrl file_int.hrl
$(EBIN)/gen_tcp.beam: inet_int.hrl
$(EBIN)/gen_udp.beam: inet_int.hrl
$(EBIN)/gen_sctp.beam: ../include/inet_sctp.hrl
@@ -254,3 +262,10 @@ $(EBIN)/net_kernel.beam: ../include/net_address.hrl
$(EBIN)/os.beam: ../include/file.hrl
$(EBIN)/ram_file.beam: ../include/file.hrl
$(EBIN)/wrap_log_reader.beam: disk_log.hrl ../include/file.hrl
+$(EBIN)/raw_file_io.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_compressed.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_inflate.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_deflate.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_delayed.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_list.beam: ../include/file.hrl file_int.hrl
+$(EBIN)/raw_file_io_raw.beam: ../include/file.hrl file_int.hrl
diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl
index 933f2d5f65..b5a51c3410 100644
--- a/lib/kernel/src/file.erl
+++ b/lib/kernel/src/file.erl
@@ -72,7 +72,7 @@
io_device/0, name/0, name_all/0, posix/0]).
%%% Includes and defines
--include("file.hrl").
+-include("file_int.hrl").
-define(FILE_IO_SERVER_TABLE, file_io_servers).
@@ -454,41 +454,23 @@ raw_write_file_info(Name, #file_info{} = Info) ->
Reason :: posix() | badarg | system_limit.
open(Item, ModeList) when is_list(ModeList) ->
- case lists:member(raw, ModeList) of
- %% Raw file, use ?PRIM_FILE to handle this file
- true ->
+ case {lists:member(raw, ModeList), lists:member(ram, ModeList)} of
+ {false, false} ->
+ %% File server file
Args = [file_name(Item) | ModeList],
case check_args(Args) of
ok ->
[FileName | _] = Args,
- %% We rely on the returned Handle (in {ok, Handle})
- %% being a pid() or a #file_descriptor{}
- ?PRIM_FILE:open(FileName, ModeList);
+ call(open, [FileName, ModeList]);
Error ->
Error
- end;
- false ->
- case lists:member(ram, ModeList) of
- %% RAM file, use ?RAM_FILE to handle this file
- true ->
- case check_args(ModeList) of
- ok ->
- ?RAM_FILE:open(Item, ModeList);
- Error ->
- Error
- end;
- %% File server file
- false ->
- Args = [file_name(Item) | ModeList],
- case check_args(Args) of
- ok ->
- [FileName | _] = Args,
- call(open, [FileName, ModeList]);
- Error ->
- Error
- end
- end
+ end;
+ {true, _Either} ->
+ raw_file_io:open(file_name(Item), ModeList);
+ {false, true} ->
+ ram_file:open(Item, ModeList)
end;
+
%% Old obsolete mode specification in atom or 2-tuple format
open(Item, Mode) ->
open(Item, mode_list(Mode)).
diff --git a/lib/kernel/src/file_int.hrl b/lib/kernel/src/file_int.hrl
new file mode 100644
index 0000000000..bafc330c04
--- /dev/null
+++ b/lib/kernel/src/file_int.hrl
@@ -0,0 +1,33 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Internal definitions for the 'file' module and friends.
+%%
+
+-ifndef(FILE_INTERNAL_HRL_).
+-define(FILE_INTERNAL_HRL_, 1).
+
+-include("file.hrl").
+
+-define(CALL_FD(Fd, Method, Args),
+ apply(Fd#file_descriptor.module, Method, [Fd | Args])).
+
+-endif.
diff --git a/lib/kernel/src/file_io_server.erl b/lib/kernel/src/file_io_server.erl
index deb7b315b1..2b35d2acfb 100644
--- a/lib/kernel/src/file_io_server.erl
+++ b/lib/kernel/src/file_io_server.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2000-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2000-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -28,7 +28,8 @@
-record(state, {handle,owner,mref,buf,read_mode,unic}).
--define(PRIM_FILE, prim_file).
+-include("file_int.hrl").
+
-define(READ_SIZE_LIST, 128).
-define(READ_SIZE_BINARY, (8*1024)).
@@ -68,7 +69,7 @@ do_start(Spawn, Owner, FileName, ModeList) ->
%% process_flag(trap_exit, true),
case parse_options(ModeList) of
{ReadMode, UnicodeMode, Opts} ->
- case ?PRIM_FILE:open(FileName, Opts) of
+ case raw_file_io:open(FileName, [raw | Opts]) of
{error, Reason} = Error ->
Self ! {Ref, Error},
exit(Reason);
@@ -205,7 +206,7 @@ io_reply(From, ReplyAs, Reply) ->
file_request({advise,Offset,Length,Advise},
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:advise(Handle, Offset, Length, Advise) of
+ case ?CALL_FD(Handle, advise, [Offset, Length, Advise]) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State};
Reply ->
@@ -213,7 +214,7 @@ file_request({advise,Offset,Length,Advise},
end;
file_request({allocate, Offset, Length},
#state{handle = Handle} = State) ->
- Reply = ?PRIM_FILE:allocate(Handle, Offset, Length),
+ Reply = ?CALL_FD(Handle, allocate, [Offset, Length]),
{reply, Reply, State};
file_request({pread,At,Sz}, State)
when At =:= cur;
@@ -256,7 +257,7 @@ file_request({pwrite,At,Data},
end;
file_request(datasync,
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:datasync(Handle) of
+ case ?CALL_FD(Handle, datasync, []) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State};
Reply ->
@@ -264,7 +265,7 @@ file_request(datasync,
end;
file_request(sync,
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:sync(Handle) of
+ case ?CALL_FD(Handle, sync, []) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State};
Reply ->
@@ -272,7 +273,7 @@ file_request(sync,
end;
file_request(close,
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:close(Handle) of
+ case ?CALL_FD(Handle, close, []) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State#state{buf= <<>>}};
Reply ->
@@ -288,7 +289,7 @@ file_request({position,At},
end;
file_request(truncate,
#state{handle=Handle}=State) ->
- case ?PRIM_FILE:truncate(Handle) of
+ case ?CALL_FD(Handle, truncate, []) of
{error,Reason}=Reply ->
{stop,Reason,Reply,State#state{buf= <<>>}};
Reply ->
@@ -398,7 +399,7 @@ io_request_loop([Request|Tail],
%%
put_chars(Chars, latin1, #state{handle=Handle, unic=latin1}=State) ->
NewState = State#state{buf = <<>>},
- case ?PRIM_FILE:write(Handle, Chars) of
+ case ?CALL_FD(Handle, write, [Chars]) of
{error,Reason}=Reply ->
{stop,Reason,Reply,NewState};
Reply ->
@@ -408,7 +409,7 @@ put_chars(Chars, InEncoding, #state{handle=Handle, unic=OutEncoding}=State) ->
NewState = State#state{buf = <<>>},
case unicode:characters_to_binary(Chars,InEncoding,OutEncoding) of
Bin when is_binary(Bin) ->
- case ?PRIM_FILE:write(Handle, Bin) of
+ case ?CALL_FD(Handle, write, [Bin]) of
{error,Reason}=Reply ->
{stop,Reason,Reply,NewState};
Reply ->
@@ -422,7 +423,7 @@ put_chars(Chars, InEncoding, #state{handle=Handle, unic=OutEncoding}=State) ->
get_line(S, {<<>>, Cont}, OutEnc,
#state{handle=Handle, read_mode=Mode, unic=InEnc}=State) ->
- case ?PRIM_FILE:read(Handle, read_size(Mode)) of
+ case ?CALL_FD(Handle, read, [read_size(Mode)]) of
{ok,Bin} ->
get_line(S, convert_enc([Cont, Bin], InEnc, OutEnc), OutEnc, State);
eof ->
@@ -472,7 +473,7 @@ get_chars(N, OutEnc,#state{handle=Handle,buf=Buf,read_mode=ReadMode,unic=latin1}
BufSize = byte_size(Buf),
NeedSize = N-BufSize,
Size = erlang:max(NeedSize, ?READ_SIZE_BINARY),
- case ?PRIM_FILE:read(Handle, Size) of
+ case ?CALL_FD(Handle, read, [Size]) of
{ok, B} ->
if BufSize+byte_size(B) < N ->
std_reply(cat(Buf, B, ReadMode,latin1,OutEnc), State);
@@ -504,7 +505,7 @@ get_chars(N, OutEnc,#state{handle=Handle,buf=Buf,read_mode=ReadMode,unic=InEncod
%% Need more, Try to read 4*needed in bytes...
NeedSize = (N - BufCount) * 4,
Size = erlang:max(NeedSize, ?READ_SIZE_BINARY),
- case ?PRIM_FILE:read(Handle, Size) of
+ case ?CALL_FD(Handle, read, [Size]) of
{ok, B} ->
NewBuf = list_to_binary([Buf,B]),
{NewCount,NewSplit} = count_and_find(NewBuf,N,InEncoding),
@@ -544,7 +545,7 @@ get_chars(Mod, Func, XtraArg, OutEnc, #state{buf=Buf}=State) ->
get_chars_empty(Mod, Func, XtraArg, S, latin1,
#state{handle=Handle,read_mode=ReadMode, unic=latin1}=State) ->
- case ?PRIM_FILE:read(Handle, read_size(ReadMode)) of
+ case ?CALL_FD(Handle, read, [read_size(ReadMode)]) of
{ok,Bin} ->
get_chars_apply(Mod, Func, XtraArg, S, latin1, State, Bin);
eof ->
@@ -554,7 +555,7 @@ get_chars_empty(Mod, Func, XtraArg, S, latin1,
end;
get_chars_empty(Mod, Func, XtraArg, S, OutEnc,
#state{handle=Handle,read_mode=ReadMode}=State) ->
- case ?PRIM_FILE:read(Handle, read_size(ReadMode)) of
+ case ?CALL_FD(Handle, read, [read_size(ReadMode)]) of
{ok,Bin} ->
get_chars_apply(Mod, Func, XtraArg, S, OutEnc, State, Bin);
eof ->
@@ -564,7 +565,7 @@ get_chars_empty(Mod, Func, XtraArg, S, OutEnc,
end.
get_chars_notempty(Mod, Func, XtraArg, S, OutEnc,
#state{handle=Handle,read_mode=ReadMode,buf = B}=State) ->
- case ?PRIM_FILE:read(Handle, read_size(ReadMode)) of
+ case ?CALL_FD(Handle, read, [read_size(ReadMode)]) of
{ok,Bin} ->
get_chars_apply(Mod, Func, XtraArg, S, OutEnc, State, list_to_binary([B,Bin]));
eof ->
@@ -918,13 +919,10 @@ cbv({utf32,little},_) ->
%% Compensates ?PRIM_FILE:position/2 for the number of bytes
%% we have buffered
position(Handle, At, Buf) ->
- ?PRIM_FILE:position(
- Handle,
- case At of
- cur ->
- {cur, -byte_size(Buf)};
- {cur, Offs} ->
- {cur, Offs-byte_size(Buf)};
- _ ->
- At
- end).
+ SeekTo =
+ case At of
+ {cur, Offs} -> {cur, Offs-byte_size(Buf)};
+ cur -> {cur, -byte_size(Buf)};
+ _ -> At
+ end,
+ ?CALL_FD(Handle, position, [SeekTo]).
diff --git a/lib/kernel/src/file_server.erl b/lib/kernel/src/file_server.erl
index 6e8f64d932..ecc1ffbdd6 100644
--- a/lib/kernel/src/file_server.erl
+++ b/lib/kernel/src/file_server.erl
@@ -63,7 +63,7 @@ stop() ->
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
--type state() :: port(). % Internal type
+-type state() :: term(). % Internal type
%%----------------------------------------------------------------------
%% Func: init/1
@@ -77,14 +77,8 @@ stop() ->
init([]) ->
process_flag(trap_exit, true),
- case ?PRIM_FILE:start() of
- {ok, Handle} ->
- ?FILE_IO_SERVER_TABLE =
- ets:new(?FILE_IO_SERVER_TABLE, [named_table]),
- {ok, Handle};
- {error, Reason} ->
- {stop, Reason}
- end.
+ ?FILE_IO_SERVER_TABLE = ets:new(?FILE_IO_SERVER_TABLE, [named_table]),
+ {ok, undefined}.
%%----------------------------------------------------------------------
%% Func: handle_call/3
@@ -101,7 +95,7 @@ init([]) ->
{'reply', 'eof' | 'ok' | {'error', term()} | {'ok', term()}, state()} |
{'stop', 'normal', 'stopped', state()}.
-handle_call({open, Name, ModeList}, {Pid, _Tag} = _From, Handle)
+handle_call({open, Name, ModeList}, {Pid, _Tag} = _From, State)
when is_list(ModeList) ->
Child = ?FILE_IO_SERVER:start_link(Pid, Name, ModeList),
case Child of
@@ -110,78 +104,78 @@ handle_call({open, Name, ModeList}, {Pid, _Tag} = _From, Handle)
_ ->
ok
end,
- {reply, Child, Handle};
+ {reply, Child, State};
-handle_call({open, _Name, _Mode}, _From, Handle) ->
- {reply, {error, einval}, Handle};
+handle_call({open, _Name, _Mode}, _From, State) ->
+ {reply, {error, einval}, State};
-handle_call({read_file, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_file(Name), Handle};
+handle_call({read_file, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_file(Name), State};
-handle_call({write_file, Name, Bin}, _From, Handle) ->
- {reply, ?PRIM_FILE:write_file(Name, Bin), Handle};
+handle_call({write_file, Name, Bin}, _From, State) ->
+ {reply, ?PRIM_FILE:write_file(Name, Bin), State};
-handle_call({set_cwd, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:set_cwd(Handle, Name), Handle};
+handle_call({set_cwd, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:set_cwd(Name), State};
-handle_call({delete, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:delete(Handle, Name), Handle};
+handle_call({delete, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:delete(Name), State};
-handle_call({rename, Fr, To}, _From, Handle) ->
- {reply, ?PRIM_FILE:rename(Handle, Fr, To), Handle};
+handle_call({rename, Fr, To}, _From, State) ->
+ {reply, ?PRIM_FILE:rename(Fr, To), State};
-handle_call({make_dir, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:make_dir(Handle, Name), Handle};
+handle_call({make_dir, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:make_dir(Name), State};
-handle_call({del_dir, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:del_dir(Handle, Name), Handle};
+handle_call({del_dir, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:del_dir(Name), State};
-handle_call({list_dir, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:list_dir(Handle, Name), Handle};
-handle_call({list_dir_all, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:list_dir_all(Handle, Name), Handle};
+handle_call({list_dir, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:list_dir(Name), State};
+handle_call({list_dir_all, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:list_dir_all(Name), State};
-handle_call(get_cwd, _From, Handle) ->
- {reply, ?PRIM_FILE:get_cwd(Handle), Handle};
-handle_call({get_cwd}, _From, Handle) ->
- {reply, ?PRIM_FILE:get_cwd(Handle), Handle};
-handle_call({get_cwd, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:get_cwd(Handle, Name), Handle};
+handle_call(get_cwd, _From, State) ->
+ {reply, ?PRIM_FILE:get_cwd(), State};
+handle_call({get_cwd}, _From, State) ->
+ {reply, ?PRIM_FILE:get_cwd(), State};
+handle_call({get_cwd, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:get_cwd(Name), State};
-handle_call({read_file_info, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_file_info(Handle, Name), Handle};
+handle_call({read_file_info, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_file_info(Name), State};
-handle_call({read_file_info, Name, Opts}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_file_info(Handle, Name, Opts), Handle};
+handle_call({read_file_info, Name, Opts}, _From, State) ->
+ {reply, ?PRIM_FILE:read_file_info(Name, Opts), State};
-handle_call({altname, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:altname(Handle, Name), Handle};
+handle_call({altname, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:altname(Name), State};
-handle_call({write_file_info, Name, Info}, _From, Handle) ->
- {reply, ?PRIM_FILE:write_file_info(Handle, Name, Info), Handle};
+handle_call({write_file_info, Name, Info}, _From, State) ->
+ {reply, ?PRIM_FILE:write_file_info(Name, Info), State};
-handle_call({write_file_info, Name, Info, Opts}, _From, Handle) ->
- {reply, ?PRIM_FILE:write_file_info(Handle, Name, Info, Opts), Handle};
+handle_call({write_file_info, Name, Info, Opts}, _From, State) ->
+ {reply, ?PRIM_FILE:write_file_info(Name, Info, Opts), State};
-handle_call({read_link_info, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_link_info(Handle, Name), Handle};
+handle_call({read_link_info, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_link_info(Name), State};
-handle_call({read_link_info, Name, Opts}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_link_info(Handle, Name, Opts), Handle};
+handle_call({read_link_info, Name, Opts}, _From, State) ->
+ {reply, ?PRIM_FILE:read_link_info(Name, Opts), State};
-handle_call({read_link, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_link(Handle, Name), Handle};
-handle_call({read_link_all, Name}, _From, Handle) ->
- {reply, ?PRIM_FILE:read_link_all(Handle, Name), Handle};
+handle_call({read_link, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_link(Name), State};
+handle_call({read_link_all, Name}, _From, State) ->
+ {reply, ?PRIM_FILE:read_link_all(Name), State};
-handle_call({make_link, Old, New}, _From, Handle) ->
- {reply, ?PRIM_FILE:make_link(Handle, Old, New), Handle};
+handle_call({make_link, Old, New}, _From, State) ->
+ {reply, ?PRIM_FILE:make_link(Old, New), State};
-handle_call({make_symlink, Old, New}, _From, Handle) ->
- {reply, ?PRIM_FILE:make_symlink(Handle, Old, New), Handle};
+handle_call({make_symlink, Old, New}, _From, State) ->
+ {reply, ?PRIM_FILE:make_symlink(Old, New), State};
handle_call({copy, SourceName, SourceOpts, DestName, DestOpts, Length},
- _From, Handle) ->
+ _From, State) ->
Reply =
case ?PRIM_FILE:open(SourceName, [read, binary | SourceOpts]) of
{ok, Source} ->
@@ -201,14 +195,14 @@ handle_call({copy, SourceName, SourceOpts, DestName, DestOpts, Length},
{error, _} = Error ->
Error
end,
- {reply, Reply, Handle};
+ {reply, Reply, State};
-handle_call(stop, _From, Handle) ->
- {stop, normal, stopped, Handle};
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
-handle_call(Request, From, Handle) ->
+handle_call(Request, From, State) ->
error_logger:error_msg("handle_call(~tp, ~tp, _)", [Request, From]),
- {noreply, Handle}.
+ {noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
@@ -233,14 +227,9 @@ handle_cast(Msg, State) ->
-spec handle_info(term(), state()) ->
{'noreply', state()} | {'stop', 'normal', state()}.
-handle_info({'EXIT', Pid, _Reason}, Handle) when is_pid(Pid) ->
+handle_info({'EXIT', Pid, _Reason}, State) when is_pid(Pid) ->
ets:delete(?FILE_IO_SERVER_TABLE, Pid),
- {noreply, Handle};
-
-handle_info({'EXIT', Handle, _Reason}, Handle) ->
- error_logger:error_msg("Port controlling ~w terminated in ~w",
- [?FILE_SERVER, ?MODULE]),
- {stop, normal, Handle};
+ {noreply, State};
handle_info(Info, State) ->
error_logger:error_msg("handle_Info(~tp, _)", [Info]),
@@ -254,8 +243,8 @@ handle_info(Info, State) ->
-spec terminate(term(), state()) -> 'ok'.
-terminate(_Reason, Handle) ->
- ?PRIM_FILE:stop(Handle).
+terminate(_Reason, _State) ->
+ ok.
%%----------------------------------------------------------------------
%% Func: code_change/3
diff --git a/lib/kernel/src/inet_int.hrl b/lib/kernel/src/inet_int.hrl
index bc5b67f7bf..357e27826c 100644
--- a/lib/kernel/src/inet_int.hrl
+++ b/lib/kernel/src/inet_int.hrl
@@ -100,6 +100,8 @@
-define(TCP_REQ_RECV, 42).
-define(TCP_REQ_UNRECV, 43).
-define(TCP_REQ_SHUTDOWN, 44).
+-define(TCP_REQ_SENDFILE, 45).
+
%% UDP and SCTP requests
-define(PACKET_REQ_RECV, 60).
%%-define(SCTP_REQ_LISTEN, 61). MERGED
@@ -319,6 +321,12 @@
[((X) bsr 24) band 16#ff, ((X) bsr 16) band 16#ff,
((X) bsr 8) band 16#ff, (X) band 16#ff]).
+-define(int64(X),
+ [((X) bsr 56) band 16#ff, ((X) bsr 48) band 16#ff,
+ ((X) bsr 40) band 16#ff, ((X) bsr 32) band 16#ff,
+ ((X) bsr 24) band 16#ff, ((X) bsr 16) band 16#ff,
+ ((X) bsr 8) band 16#ff, (X) band 16#ff]).
+
-define(intAID(X), % For SCTP AssocID
?int32(X)).
diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src
index 080b11fc4d..e4852a6e75 100644
--- a/lib/kernel/src/kernel.app.src
+++ b/lib/kernel/src/kernel.app.src
@@ -88,6 +88,13 @@
inet_udp,
inet_sctp,
pg2,
+ raw_file_io,
+ raw_file_io_compressed,
+ raw_file_io_deflate,
+ raw_file_io_delayed,
+ raw_file_io_inflate,
+ raw_file_io_list,
+ raw_file_io_raw,
seq_trace,
standard_error,
wrap_log_reader]},
diff --git a/lib/kernel/src/raw_file_io.erl b/lib/kernel/src/raw_file_io.erl
new file mode 100644
index 0000000000..e3c07c8f78
--- /dev/null
+++ b/lib/kernel/src/raw_file_io.erl
@@ -0,0 +1,75 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io).
+
+-export([open/2]).
+
+open(Filename, Modes) ->
+ %% Layers are applied in this order, and the listed modules will call this
+ %% function again as necessary. eg. a raw compressed delayed file in list
+ %% mode will walk through [_list -> _compressed -> _delayed -> _raw].
+ ModuleOrder = [{raw_file_io_list, fun match_list/1},
+ {raw_file_io_compressed, fun match_compressed/1},
+ {raw_file_io_delayed, fun match_delayed/1},
+ {raw_file_io_raw, fun match_raw/1}],
+ open_1(ModuleOrder, Filename, add_implicit_modes(Modes)).
+open_1([], _Filename, _Modes) ->
+ error(badarg);
+open_1([{Module, Match} | Rest], Filename, Modes) ->
+ case lists:any(Match, Modes) of
+ true ->
+ {Options, ChildModes} =
+ lists:partition(fun(Mode) -> Match(Mode) end, Modes),
+ Module:open_layer(Filename, ChildModes, Options);
+ false ->
+ open_1(Rest, Filename, Modes)
+ end.
+
+%% 'read' and 'list' mode are enabled unless disabled by another option, so
+%% we'll explicitly add them to avoid duplicating this logic in child layers.
+add_implicit_modes(Modes0) ->
+ Modes1 = add_unless_matched(Modes0, fun match_writable/1, read),
+ add_unless_matched(Modes1, fun match_binary/1, list).
+add_unless_matched(Modes, Match, Default) ->
+ case lists:any(Match, Modes) of
+ false -> [Default | Modes];
+ true -> Modes
+ end.
+
+match_list(list) -> true;
+match_list(_Other) -> false.
+
+match_compressed(compressed) -> true;
+match_compressed(_Other) -> false.
+
+match_delayed({delayed_write, _Size, _Timeout}) -> true;
+match_delayed(delayed_write) -> true;
+match_delayed(_Other) -> false.
+
+match_raw(raw) -> true;
+match_raw(_Other) -> false.
+
+match_writable(write) -> true;
+match_writable(append) -> true;
+match_writable(exclusive) -> true;
+match_writable(_Other) -> false.
+
+match_binary(binary) -> true;
+match_binary(_Other) -> false.
diff --git a/lib/kernel/src/raw_file_io_compressed.erl b/lib/kernel/src/raw_file_io_compressed.erl
new file mode 100644
index 0000000000..d5ab042d25
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_compressed.erl
@@ -0,0 +1,134 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_compressed).
+
+-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3,
+ position/2, write/2, pwrite/2, pwrite/3,
+ read_line/1, read/2, pread/2, pread/3]).
+
+%% OTP internal.
+-export([ipread_s32bu_p32bu/3, sendfile/8]).
+
+-export([open_layer/3]).
+
+-include("file_int.hrl").
+
+open_layer(Filename, Modes, Options) ->
+ IsAppend = lists:member(append, Modes),
+ IsDeflate = lists:member(write, Modes),
+ IsInflate = lists:member(read, Modes),
+ if
+ IsDeflate, IsInflate; IsAppend ->
+ {error, einval};
+ IsDeflate, not IsInflate ->
+ start_server_module(raw_file_io_deflate, Filename, Modes, Options);
+ IsInflate ->
+ start_server_module(raw_file_io_inflate, Filename, Modes, Options)
+ end.
+
+start_server_module(Module, Filename, Modes, Options) ->
+ Secret = make_ref(),
+ case gen_statem:start(Module, {self(), Secret, Options}, []) of
+ {ok, Pid} -> open_next_layer(Pid, Secret, Filename, Modes);
+ Other -> Other
+ end.
+
+open_next_layer(Pid, Secret, Filename, Modes) ->
+ case gen_statem:call(Pid, {'$open', Secret, Filename, Modes}, infinity) of
+ ok ->
+ PublicFd = #file_descriptor{
+ module = raw_file_io_compressed, data = {self(), Pid} },
+ {ok, PublicFd};
+ Other -> Other
+ end.
+
+close(Fd) ->
+ wrap_call(Fd, [close]).
+
+sync(Fd) ->
+ wrap_call(Fd, [sync]).
+datasync(Fd) ->
+ wrap_call(Fd, [datasync]).
+
+truncate(Fd) ->
+ wrap_call(Fd, [truncate]).
+
+advise(Fd, Offset, Length, Advise) ->
+ wrap_call(Fd, [advise, Offset, Length, Advise]).
+allocate(Fd, Offset, Length) ->
+ wrap_call(Fd, [allocate, Offset, Length]).
+
+position(Fd, Mark) ->
+ wrap_call(Fd, [position, Mark]).
+
+write(Fd, IOData) ->
+ try
+ CompactedData = erlang:iolist_to_iovec(IOData),
+ wrap_call(Fd, [write, CompactedData])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+
+pwrite(Fd, Offset, IOData) ->
+ try
+ CompactedData = erlang:iolist_to_iovec(IOData),
+ wrap_call(Fd, [pwrite, Offset, CompactedData])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+pwrite(Fd, LocBytes) ->
+ try
+ CompactedLocBytes =
+ [ {Offset, erlang:iolist_to_iovec(IOData)} ||
+ {Offset, IOData} <- LocBytes ],
+ wrap_call(Fd, [pwrite, CompactedLocBytes])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+
+read_line(Fd) ->
+ wrap_call(Fd, [read_line]).
+read(Fd, Size) ->
+ wrap_call(Fd, [read, Size]).
+pread(Fd, Offset, Size) ->
+ wrap_call(Fd, [pread, Offset, Size]).
+pread(Fd, LocNums) ->
+ wrap_call(Fd, [pread, LocNums]).
+
+ipread_s32bu_p32bu(Fd, Offset, MaxSize) ->
+ wrap_call(Fd, [ipread_s32bu_p32bu, Offset, MaxSize]).
+
+sendfile(_,_,_,_,_,_,_,_) ->
+ {error, enotsup}.
+
+wrap_call(Fd, Command) ->
+ {_Owner, Pid} = get_fd_data(Fd),
+ try gen_statem:call(Pid, Command, infinity) of
+ Result -> Result
+ catch
+ exit:{noproc, _StackTrace} -> {error, einval}
+ end.
+
+get_fd_data(#file_descriptor{ data = Data }) ->
+ {Owner, _ServerPid} = Data,
+ case self() of
+ Owner -> Data;
+ _ -> error(not_on_controlling_process)
+ end.
diff --git a/lib/kernel/src/raw_file_io_deflate.erl b/lib/kernel/src/raw_file_io_deflate.erl
new file mode 100644
index 0000000000..acfc546743
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_deflate.erl
@@ -0,0 +1,159 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_deflate).
+
+-behavior(gen_statem).
+
+-export([init/1, callback_mode/0, terminate/3]).
+-export([opening/3, opened/3]).
+
+-include("file_int.hrl").
+
+-define(GZIP_WBITS, 16 + 15).
+
+callback_mode() -> state_functions.
+
+init({Owner, Secret, [compressed]}) ->
+ Monitor = monitor(process, Owner),
+ Z = zlib:open(),
+ ok = zlib:deflateInit(Z, default, deflated, ?GZIP_WBITS, 8, default),
+ Data =
+ #{ owner => Owner,
+ monitor => Monitor,
+ secret => Secret,
+ position => 0,
+ zlib => Z },
+ {ok, opening, Data}.
+
+opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
+ case raw_file_io:open(Filename, Modes) of
+ {ok, PrivateFd} ->
+ NewData = Data#{ handle => PrivateFd },
+ {next_state, opened, NewData, [{reply, From, ok}]};
+ Other ->
+ {stop_and_reply, normal, [{reply, From, Other}]}
+ end;
+opening(_Event, _Contents, _Data) ->
+ {keep_state_and_data, [postpone]}.
+
+%%
+
+opened(info, {'DOWN', Monitor, process, _Owner, Reason}, #{ monitor := Monitor } = Data) ->
+ if
+ Reason =/= kill -> flush_deflate_state(Data);
+ Reason =:= kill -> ignored
+ end,
+ {stop, shutdown};
+
+opened(info, _Message, _Data) ->
+ keep_state_and_data;
+
+opened({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
+ #{ handle := PrivateFd } = Data,
+ Response =
+ case flush_deflate_state(Data) of
+ ok -> ?CALL_FD(PrivateFd, close, []);
+ Other -> Other
+ end,
+ {stop_and_reply, normal, [{reply, From, Response}]};
+
+opened({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
+ case position(Data, Mark) of
+ {ok, NewData, Result} ->
+ Response = {ok, Result},
+ {keep_state, NewData, [{reply, From, Response}]};
+ Other ->
+ {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened({call, {Owner, _Tag} = From}, [write, IOVec], #{ owner := Owner } = Data) ->
+ case write(Data, IOVec) of
+ {ok, NewData} -> {keep_state, NewData, [{reply, From, ok}]};
+ Other -> {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened({call, {Owner, _Tag} = From}, [read, _Size], #{ owner := Owner }) ->
+ Response = {error, ebadf},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner }) ->
+ Response = {error, ebadf},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, {Owner, _Tag} = From}, _Command, #{ owner := Owner }) ->
+ Response = {error, enotsup},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, _From}, _Command, _Data) ->
+ %% The client functions filter this out, so we'll crash if the user does
+ %% anything stupid on purpose.
+ {shutdown, protocol_violation};
+
+opened(_Event, _Request, _Data) ->
+ keep_state_and_data.
+
+write(Data, IOVec) ->
+ #{ handle := PrivateFd, position := Position, zlib := Z } = Data,
+ UncompressedSize = iolist_size(IOVec),
+ case ?CALL_FD(PrivateFd, write, [zlib:deflate(Z, IOVec)]) of
+ ok -> {ok, Data#{ position := (Position + UncompressedSize) }};
+ Other -> Other
+ end.
+
+%%
+%% We support "seeking" forward as long as it isn't relative to EOF.
+%%
+%% Seeking is a bit of a misnomer as it's really just compressing zeroes until
+%% we reach the desired point, but it has always behaved like this.
+%%
+
+position(Data, Mark) when is_atom(Mark) ->
+ position(Data, {Mark, 0});
+position(Data, Offset) when is_integer(Offset) ->
+ position(Data, {bof, Offset});
+position(Data, {bof, Offset}) when is_integer(Offset) ->
+ position_1(Data, Offset);
+position(Data, {cur, Offset}) when is_integer(Offset) ->
+ #{ position := Position } = Data,
+ position_1(Data, Position + Offset);
+position(_Data, {eof, Offset}) when is_integer(Offset) ->
+ {error, einval};
+position(_Data, _Any) ->
+ {error, badarg}.
+
+position_1(#{ position := Desired } = Data, Desired) ->
+ {ok, Data, Desired};
+position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
+ BytesToWrite = min(Desired - Current, 4 bsl 20),
+ case write(Data, <<0:(BytesToWrite)/unit:8>>) of
+ {ok, NewData} -> position_1(NewData, Desired);
+ Other -> Other
+ end;
+position_1(#{ position := Current }, Desired) when Current > Desired ->
+ {error, einval}.
+
+flush_deflate_state(#{ handle := PrivateFd, zlib := Z }) ->
+ case ?CALL_FD(PrivateFd, write, [zlib:deflate(Z, [], finish)]) of
+ ok -> ok;
+ Other -> Other
+ end.
+
+terminate(_Reason, _State, _Data) ->
+ ok.
diff --git a/lib/kernel/src/raw_file_io_delayed.erl b/lib/kernel/src/raw_file_io_delayed.erl
new file mode 100644
index 0000000000..d2ad7550a1
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_delayed.erl
@@ -0,0 +1,320 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_delayed).
+
+-behavior(gen_statem).
+
+-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3,
+ position/2, write/2, pwrite/2, pwrite/3,
+ read_line/1, read/2, pread/2, pread/3]).
+
+%% OTP internal.
+-export([ipread_s32bu_p32bu/3, sendfile/8]).
+
+-export([open_layer/3]).
+
+-export([init/1, callback_mode/0, terminate/3]).
+-export([opening/3, opened/3]).
+
+-include("file_int.hrl").
+
+open_layer(Filename, Modes, Options) ->
+ Secret = make_ref(),
+ case gen_statem:start(?MODULE, {self(), Secret, Options}, []) of
+ {ok, Pid} ->
+ gen_statem:call(Pid, {'$open', Secret, Filename, Modes}, infinity);
+ Other ->
+ Other
+ end.
+
+callback_mode() -> state_functions.
+
+init({Owner, Secret, Options}) ->
+ Monitor = monitor(process, Owner),
+ Defaults =
+ #{ owner => Owner,
+ monitor => Monitor,
+ secret => Secret,
+ timer => none,
+ pid => self(),
+ buffer => prim_buffer:new(),
+ delay_size => 64 bsl 10,
+ delay_time => 2000 },
+ Data = fill_delay_values(Defaults, Options),
+ {ok, opening, Data}.
+
+fill_delay_values(Data, []) ->
+ Data;
+fill_delay_values(Data, [{delayed_write, Size, Time} | Options]) ->
+ fill_delay_values(Data#{ delay_size => Size, delay_time => Time }, Options);
+fill_delay_values(Data, [_ | Options]) ->
+ fill_delay_values(Data, Options).
+
+opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
+ case raw_file_io:open(Filename, Modes) of
+ {ok, PrivateFd} ->
+ PublicData = maps:with([owner, buffer, delay_size, pid], Data),
+ PublicFd = #file_descriptor{ module = ?MODULE, data = PublicData },
+
+ NewData = Data#{ handle => PrivateFd },
+ Response = {ok, PublicFd},
+ {next_state, opened, NewData, [{reply, From, Response}]};
+ Other ->
+ {stop_and_reply, normal, [{reply, From, Other}]}
+ end;
+opening(_Event, _Contents, _Data) ->
+ {keep_state_and_data, [postpone]}.
+
+%%
+
+opened(info, {'$timed_out', Secret}, #{ secret := Secret } = Data) ->
+ %% If the user writes something at this exact moment, the flush will fail
+ %% and the timer won't reset on the next write since the buffer won't be
+ %% empty (Unless we collided on a flush). We therefore reset the timeout to
+ %% ensure that data won't sit idle for extended periods of time.
+ case try_flush_write_buffer(Data) of
+ busy -> gen_statem:cast(self(), '$reset_timeout');
+ ok -> ok
+ end,
+ {keep_state, Data#{ timer => none }, []};
+
+opened(info, {'DOWN', Monitor, process, _Owner, Reason}, #{ monitor := Monitor } = Data) ->
+ if
+ Reason =/= kill -> try_flush_write_buffer(Data);
+ Reason =:= kill -> ignored
+ end,
+ {stop, shutdown};
+
+opened(info, _Message, _Data) ->
+ keep_state_and_data;
+
+opened({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
+ case flush_write_buffer(Data) of
+ ok ->
+ #{ handle := PrivateFd } = Data,
+ Response = ?CALL_FD(PrivateFd, close, []),
+ {stop_and_reply, normal, [{reply, From, Response}]};
+ Other ->
+ {stop_and_reply, normal, [{reply, From, Other}]}
+ end;
+
+opened({call, {Owner, _Tag} = From}, '$wait', #{ owner := Owner }) ->
+ %% Used in write/2 to synchronize writes on lock conflicts.
+ {keep_state_and_data, [{reply, From, ok}]};
+
+opened({call, {Owner, _Tag} = From}, '$synchronous_flush', #{ owner := Owner } = Data) ->
+ cancel_flush_timeout(Data),
+ Response = flush_write_buffer(Data),
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, {Owner, _Tag} = From}, Command, #{ owner := Owner } = Data) ->
+ Response =
+ case flush_write_buffer(Data) of
+ ok -> dispatch_command(Data, Command);
+ Other -> Other
+ end,
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened({call, _From}, _Command, _Data) ->
+ %% The client functions filter this out, so we'll crash if the user does
+ %% anything stupid on purpose.
+ {shutdown, protocol_violation};
+
+opened(cast, '$reset_timeout', #{ delay_time := Timeout, secret := Secret } = Data) ->
+ cancel_flush_timeout(Data),
+ Timer = erlang:send_after(Timeout, self(), {'$timed_out', Secret}),
+ {keep_state, Data#{ timer => Timer }, []};
+
+opened(cast, _Message, _Data) ->
+ {keep_state_and_data, []}.
+
+dispatch_command(Data, [Function | Args]) ->
+ #{ handle := Handle } = Data,
+ Module = Handle#file_descriptor.module,
+ apply(Module, Function, [Handle | Args]).
+
+cancel_flush_timeout(#{ timer := none }) ->
+ ok;
+cancel_flush_timeout(#{ timer := Timer }) ->
+ _ = erlang:cancel_timer(Timer, [{async, true}]),
+ ok.
+
+try_flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) ->
+ case prim_buffer:try_lock(Buffer) of
+ acquired ->
+ flush_write_buffer_1(Buffer, PrivateFd),
+ prim_buffer:unlock(Buffer),
+ ok;
+ busy ->
+ busy
+ end.
+
+%% This is only safe to use when there is no chance of conflict with the owner
+%% process, or in other words, "during synchronous calls outside of the locked
+%% section of write/2"
+flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) ->
+ acquired = prim_buffer:try_lock(Buffer),
+ Result = flush_write_buffer_1(Buffer, PrivateFd),
+ prim_buffer:unlock(Buffer),
+ Result.
+
+flush_write_buffer_1(Buffer, PrivateFd) ->
+ case prim_buffer:size(Buffer) of
+ Size when Size > 0 ->
+ ?CALL_FD(PrivateFd, write, [prim_buffer:read_iovec(Buffer, Size)]);
+ 0 ->
+ ok
+ end.
+
+terminate(_Reason, _State, _Data) ->
+ ok.
+
+%% Client functions
+
+write(Fd, IOData) ->
+ try
+ enqueue_write(Fd, erlang:iolist_to_iovec(IOData))
+ catch
+ error:badarg -> {error, badarg}
+ end.
+enqueue_write(_Fd, []) ->
+ ok;
+enqueue_write(Fd, IOVec) ->
+ %% get_fd_data will reject everyone except the process that opened the Fd,
+ %% so we can't race with anyone except the wrapper process.
+ #{ delay_size := DelaySize,
+ buffer := Buffer,
+ pid := Pid } = get_fd_data(Fd),
+ case prim_buffer:try_lock(Buffer) of
+ acquired ->
+ %% (The wrapper process will exit without flushing if we're killed
+ %% while holding the lock).
+ enqueue_write_locked(Pid, Buffer, DelaySize, IOVec);
+ busy ->
+ %% This can only happen while we're processing a timeout in the
+ %% wrapper process, so we perform a bogus call to get a completion
+ %% notification before trying again.
+ gen_statem:call(Pid, '$wait'),
+ enqueue_write(Fd, IOVec)
+ end.
+enqueue_write_locked(Pid, Buffer, DelaySize, IOVec) ->
+ %% The synchronous operations (write, forced flush) are safe since we're
+ %% running on the only process that can fill the buffer; a timeout being
+ %% processed just before $synchronous_flush will cause the flush to nop,
+ %% and a timeout sneaking in just before a synchronous write won't do
+ %% anything since the buffer is guaranteed to be empty at that point.
+ BufSize = prim_buffer:size(Buffer),
+ case is_iovec_smaller_than(IOVec, DelaySize - BufSize) of
+ true when BufSize > 0 ->
+ prim_buffer:write(Buffer, IOVec),
+ prim_buffer:unlock(Buffer);
+ true ->
+ prim_buffer:write(Buffer, IOVec),
+ prim_buffer:unlock(Buffer),
+ gen_statem:cast(Pid, '$reset_timeout');
+ false when BufSize > 0 ->
+ prim_buffer:write(Buffer, IOVec),
+ prim_buffer:unlock(Buffer),
+ gen_statem:call(Pid, '$synchronous_flush');
+ false ->
+ prim_buffer:unlock(Buffer),
+ gen_statem:call(Pid, [write, IOVec])
+ end.
+
+%% iolist_size/1 will always look through the entire list to get a precise
+%% amount, which is pretty inefficient since we only need to know whether we've
+%% hit the buffer threshold or not.
+%%
+%% We only handle the binary case since write/2 forcibly translates input to
+%% erlang:iovec().
+is_iovec_smaller_than(IOVec, Max) ->
+ is_iovec_smaller_than_1(IOVec, Max, 0).
+is_iovec_smaller_than_1(_IOVec, Max, Acc) when Acc >= Max ->
+ false;
+is_iovec_smaller_than_1([], _Max, _Acc) ->
+ true;
+is_iovec_smaller_than_1([Binary | Rest], Max, Acc) when is_binary(Binary) ->
+ is_iovec_smaller_than_1(Rest, Max, Acc + byte_size(Binary)).
+
+close(Fd) ->
+ wrap_call(Fd, [close]).
+
+sync(Fd) ->
+ wrap_call(Fd, [sync]).
+datasync(Fd) ->
+ wrap_call(Fd, [datasync]).
+
+truncate(Fd) ->
+ wrap_call(Fd, [truncate]).
+
+advise(Fd, Offset, Length, Advise) ->
+ wrap_call(Fd, [advise, Offset, Length, Advise]).
+allocate(Fd, Offset, Length) ->
+ wrap_call(Fd, [allocate, Offset, Length]).
+
+position(Fd, Mark) ->
+ wrap_call(Fd, [position, Mark]).
+
+pwrite(Fd, Offset, IOData) ->
+ try
+ CompactedData = erlang:iolist_to_iovec(IOData),
+ wrap_call(Fd, [pwrite, Offset, CompactedData])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+pwrite(Fd, LocBytes) ->
+ try
+ CompactedLocBytes =
+ [ {Offset, erlang:iolist_to_iovec(IOData)} ||
+ {Offset, IOData} <- LocBytes ],
+ wrap_call(Fd, [pwrite, CompactedLocBytes])
+ catch
+ error:badarg -> {error, badarg}
+ end.
+
+read_line(Fd) ->
+ wrap_call(Fd, [read_line]).
+read(Fd, Size) ->
+ wrap_call(Fd, [read, Size]).
+pread(Fd, Offset, Size) ->
+ wrap_call(Fd, [pread, Offset, Size]).
+pread(Fd, LocNums) ->
+ wrap_call(Fd, [pread, LocNums]).
+
+ipread_s32bu_p32bu(Fd, Offset, MaxSize) ->
+ wrap_call(Fd, [ipread_s32bu_p32bu, Offset, MaxSize]).
+
+sendfile(_,_,_,_,_,_,_,_) ->
+ {error, enotsup}.
+
+wrap_call(Fd, Command) ->
+ #{ pid := Pid } = get_fd_data(Fd),
+ try gen_statem:call(Pid, Command, infinity) of
+ Result -> Result
+ catch
+ exit:{noproc, _StackTrace} -> {error, einval}
+ end.
+
+get_fd_data(#file_descriptor{ data = Data }) ->
+ #{ owner := Owner } = Data,
+ case self() of
+ Owner -> Data;
+ _ -> error(not_on_controlling_process)
+ end.
diff --git a/lib/kernel/src/raw_file_io_inflate.erl b/lib/kernel/src/raw_file_io_inflate.erl
new file mode 100644
index 0000000000..7e9780310c
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_inflate.erl
@@ -0,0 +1,261 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_inflate).
+
+-behavior(gen_statem).
+
+-export([init/1, callback_mode/0, terminate/3]).
+-export([opening/3, opened_gzip/3, opened_passthrough/3]).
+
+-include("file_int.hrl").
+
+-define(INFLATE_CHUNK_SIZE, (1 bsl 10)).
+-define(GZIP_WBITS, (16 + 15)).
+
+callback_mode() -> state_functions.
+
+init({Owner, Secret, [compressed]}) ->
+ Monitor = monitor(process, Owner),
+ %% We're using the undocumented inflateInit/3 to open the stream in
+ %% 'reset mode', which resets the inflate state at the end of every stream,
+ %% allowing us to read concatenated gzip files.
+ Z = zlib:open(),
+ ok = zlib:inflateInit(Z, ?GZIP_WBITS, reset),
+ Data =
+ #{ owner => Owner,
+ monitor => Monitor,
+ secret => Secret,
+ position => 0,
+ buffer => prim_buffer:new(),
+ zlib => Z },
+ {ok, opening, Data}.
+
+%% The old driver fell back to plain reads if the file didn't start with the
+%% magic gzip bytes.
+choose_decompression_state(PrivateFd) ->
+ State =
+ case ?CALL_FD(PrivateFd, read, [2]) of
+ {ok, <<16#1F, 16#8B>>} -> opened_gzip;
+ _Other -> opened_passthrough
+ end,
+ {ok, 0} = ?CALL_FD(PrivateFd, position, [0]),
+ State.
+
+opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
+ case raw_file_io:open(Filename, Modes) of
+ {ok, PrivateFd} ->
+ NextState = choose_decompression_state(PrivateFd),
+ NewData = Data#{ handle => PrivateFd },
+ {next_state, NextState, NewData, [{reply, From, ok}]};
+ Other ->
+ {stop_and_reply, normal, [{reply, From, Other}]}
+ end;
+opening(_Event, _Contents, _Data) ->
+ {keep_state_and_data, [postpone]}.
+
+internal_close(From, Data) ->
+ #{ handle := PrivateFd } = Data,
+ Response = ?CALL_FD(PrivateFd, close, []),
+ {stop_and_reply, normal, [{reply, From, Response}]}.
+
+opened_passthrough(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
+ {stop, shutdown};
+
+opened_passthrough(info, _Message, _Data) ->
+ keep_state_and_data;
+
+opened_passthrough({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
+ internal_close(From, Data);
+
+opened_passthrough({call, {Owner, _Tag} = From}, [Method | Args], #{ owner := Owner } = Data) ->
+ #{ handle := PrivateFd } = Data,
+ Response = ?CALL_FD(PrivateFd, Method, Args),
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened_passthrough({call, _From}, _Command, _Data) ->
+ %% The client functions filter this out, so we'll crash if the user does
+ %% anything stupid on purpose.
+ {shutdown, protocol_violation};
+
+opened_passthrough(_Event, _Request, _Data) ->
+ keep_state_and_data.
+
+%%
+
+opened_gzip(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
+ {stop, shutdown};
+
+opened_gzip(info, _Message, _Data) ->
+ keep_state_and_data;
+
+opened_gzip({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
+ internal_close(From, Data);
+
+opened_gzip({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
+ case position(Data, Mark) of
+ {ok, NewData, Result} ->
+ Response = {ok, Result},
+ {keep_state, NewData, [{reply, From, Response}]};
+ Other ->
+ {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened_gzip({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Data) ->
+ case read(Data, Size) of
+ {ok, NewData, Result} ->
+ Response = {ok, Result},
+ {keep_state, NewData, [{reply, From, Response}]};
+ Other ->
+ {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened_gzip({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Data) ->
+ case read_line(Data) of
+ {ok, NewData, Result} ->
+ Response = {ok, Result},
+ {keep_state, NewData, [{reply, From, Response}]};
+ Other ->
+ {keep_state_and_data, [{reply, From, Other}]}
+ end;
+
+opened_gzip({call, {Owner, _Tag} = From}, [write, _IOData], #{ owner := Owner }) ->
+ Response = {error, ebadf},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened_gzip({call, {Owner, _Tag} = From}, _Request, #{ owner := Owner }) ->
+ Response = {error, enotsup},
+ {keep_state_and_data, [{reply, From, Response}]};
+
+opened_gzip({call, _From}, _Request, _Data) ->
+ %% The client functions filter this out, so we'll crash if the user does
+ %% anything stupid on purpose.
+ {shutdown, protocol_violation};
+
+opened_gzip(_Event, _Request, _Data) ->
+ keep_state_and_data.
+
+%%
+
+read(#{ buffer := Buffer } = Data, Size) ->
+ try read_1(Data, Buffer, prim_buffer:size(Buffer), Size) of
+ Result -> Result
+ catch
+ error:badarg -> {error, badarg};
+ error:_ -> {error, eio}
+ end.
+read_1(Data, Buffer, BufferSize, ReadSize) when BufferSize >= ReadSize ->
+ #{ position := Position } = Data,
+ Decompressed = prim_buffer:read(Buffer, ReadSize),
+ {ok, Data#{ position => (Position + ReadSize) }, Decompressed};
+read_1(Data, Buffer, BufferSize, ReadSize) when BufferSize < ReadSize ->
+ #{ handle := PrivateFd } = Data,
+ case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
+ {ok, Compressed} ->
+ #{ zlib := Z } = Data,
+ Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
+ prim_buffer:write(Buffer, Uncompressed),
+ read_1(Data, Buffer, prim_buffer:size(Buffer), ReadSize);
+ eof when BufferSize > 0 ->
+ read_1(Data, Buffer, BufferSize, BufferSize);
+ Other ->
+ Other
+ end.
+
+read_line(#{ buffer := Buffer } = Data) ->
+ try read_line_1(Data, Buffer, prim_buffer:find_byte_index(Buffer, $\n)) of
+ {ok, NewData, Decompressed} -> {ok, NewData, Decompressed};
+ Other -> Other
+ catch
+ error:badarg -> {error, badarg};
+ error:_ -> {error, eio}
+ end.
+
+read_line_1(Data, Buffer, not_found) ->
+ #{ handle := PrivateFd, zlib := Z } = Data,
+ case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
+ {ok, Compressed} ->
+ Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
+ prim_buffer:write(Buffer, Uncompressed),
+ read_line_1(Data, Buffer, prim_buffer:find_byte_index(Buffer, $\n));
+ eof ->
+ case prim_buffer:size(Buffer) of
+ Size when Size > 0 -> {ok, prim_buffer:read(Buffer, Size)};
+ Size when Size =:= 0 -> eof
+ end;
+ Error ->
+ Error
+ end;
+read_line_1(Data, Buffer, {ok, LFIndex}) ->
+ %% Translate CRLF into just LF, completely ignoring which encoding is used,
+ %% but treat the file position as including CR.
+ #{ position := Position } = Data,
+ NewData = Data#{ position => (Position + LFIndex + 1) },
+ CRIndex = (LFIndex - 1),
+ TranslatedLine =
+ case prim_buffer:read(Buffer, LFIndex + 1) of
+ <<Line:CRIndex/binary, "\r\n">> -> <<Line/binary, "\n">>;
+ Line -> Line
+ end,
+ {ok, NewData, TranslatedLine}.
+
+%%
+%% We support seeking in both directions as long as it isn't relative to EOF.
+%%
+%% Seeking backwards is extremely inefficient since we have to seek to the very
+%% beginning and then decompress up to the desired point.
+%%
+
+position(Data, Mark) when is_atom(Mark) ->
+ position(Data, {Mark, 0});
+position(Data, Offset) when is_integer(Offset) ->
+ position(Data, {bof, Offset});
+position(Data, {bof, Offset}) when is_integer(Offset) ->
+ position_1(Data, Offset);
+position(Data, {cur, Offset}) when is_integer(Offset) ->
+ #{ position := Position } = Data,
+ position_1(Data, Position + Offset);
+position(_Data, {eof, Offset}) when is_integer(Offset) ->
+ {error, einval};
+position(_Data, _Other) ->
+ {error, badarg}.
+
+position_1(_Data, Desired) when Desired < 0 ->
+ {error, einval};
+position_1(#{ position := Desired } = Data, Desired) ->
+ {ok, Data, Desired};
+position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
+ case read(Data, min(Desired - Current, ?INFLATE_CHUNK_SIZE)) of
+ {ok, NewData, _Data} -> position_1(NewData, Desired);
+ eof -> {ok, Data, Current};
+ Other -> Other
+ end;
+position_1(#{ position := Current } = Data, Desired) when Current > Desired ->
+ #{ handle := PrivateFd, buffer := Buffer, zlib := Z } = Data,
+ case ?CALL_FD(PrivateFd, position, [bof]) of
+ {ok, 0} ->
+ ok = zlib:inflateReset(Z),
+ prim_buffer:wipe(Buffer),
+ position_1(Data#{ position => 0 }, Desired);
+ Other ->
+ Other
+ end.
+
+terminate(_Reason, _State, _Data) ->
+ ok.
diff --git a/lib/kernel/src/raw_file_io_list.erl b/lib/kernel/src/raw_file_io_list.erl
new file mode 100644
index 0000000000..2e16e63f0e
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_list.erl
@@ -0,0 +1,128 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_list).
+
+-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3,
+ position/2, write/2, pwrite/2, pwrite/3,
+ read_line/1, read/2, pread/2, pread/3]).
+
+%% OTP internal.
+-export([ipread_s32bu_p32bu/3, sendfile/8]).
+
+-export([open_layer/3]).
+
+-include("file_int.hrl").
+
+open_layer(Filename, Modes, [list]) ->
+ case raw_file_io:open(Filename, [binary | Modes]) of
+ {ok, PrivateFd} -> {ok, make_public_fd(PrivateFd, Modes)};
+ Other -> Other
+ end.
+
+%% We can skip wrapping the file if it's write-only since only read operations
+%% are affected by list mode. Since raw_file_io fills in all implicit options
+%% for us, all we need to do is check whether 'read' is among them.
+make_public_fd(PrivateFd, Modes) ->
+ case lists:member(read, Modes) of
+ true -> #file_descriptor{ module = ?MODULE, data = PrivateFd };
+ false -> PrivateFd
+ end.
+
+close(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, close, []).
+
+sync(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, sync, []).
+datasync(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, datasync, []).
+
+truncate(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, truncate, []).
+
+advise(Fd, Offset, Length, Advise) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, advise, [Offset, Length, Advise]).
+allocate(Fd, Offset, Length) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, allocate, [Offset, Length]).
+
+position(Fd, Mark) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, position, [Mark]).
+
+write(Fd, IOData) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, write, [IOData]).
+
+pwrite(Fd, Offset, IOData) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, pwrite, [Offset, IOData]).
+pwrite(Fd, LocBytes) ->
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, pwrite, [LocBytes]).
+
+read_line(Fd) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, read_line, []) of
+ {ok, Binary} -> {ok, binary_to_list(Binary)};
+ Other -> Other
+ end.
+read(Fd, Size) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, read, [Size]) of
+ {ok, Binary} -> {ok, binary_to_list(Binary)};
+ Other -> Other
+ end.
+pread(Fd, Offset, Size) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, pread, [Offset, Size]) of
+ {ok, Binary} -> {ok, binary_to_list(Binary)};
+ Other -> Other
+ end.
+pread(Fd, LocNums) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, pread, [LocNums]) of
+ {ok, LocResults} ->
+ TranslatedResults =
+ [ case Result of
+ Result when is_binary(Result) -> binary_to_list(Result);
+ eof -> eof
+ end || Result <- LocResults ],
+ {ok, TranslatedResults};
+ Other -> Other
+ end.
+
+ipread_s32bu_p32bu(Fd, Offset, MaxSize) ->
+ PrivateFd = Fd#file_descriptor.data,
+ case ?CALL_FD(PrivateFd, ipread_s32bu_p32bu, [Offset, MaxSize]) of
+ {ok, {Size, Pointer, Binary}} when is_binary(Binary) ->
+ {ok, {Size, Pointer, binary_to_list(Binary)}};
+ Other ->
+ Other
+ end.
+
+sendfile(Fd, Dest, Offset, Bytes, ChunkSize, Headers, Trailers, Flags) ->
+ Args = [Dest, Offset, Bytes, ChunkSize, Headers, Trailers, Flags],
+ PrivateFd = Fd#file_descriptor.data,
+ ?CALL_FD(PrivateFd, sendfile, Args).
diff --git a/lib/kernel/src/raw_file_io_raw.erl b/lib/kernel/src/raw_file_io_raw.erl
new file mode 100644
index 0000000000..9a9fe78eb1
--- /dev/null
+++ b/lib/kernel/src/raw_file_io_raw.erl
@@ -0,0 +1,25 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(raw_file_io_raw).
+
+-export([open_layer/3]).
+
+open_layer(Filename, Modes, [raw]) ->
+ prim_file:open(Filename, [raw | Modes]).