aboutsummaryrefslogblamecommitdiffstats
path: root/lib/kernel/src/raw_file_io_delayed.erl
blob: 766467437e0beece5684815f70f0b2b89b805ee3 (plain) (tree)
























                                                                           

                                               























































































































































































































































































                                                                                           


                             













                                                     
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
-module(raw_file_io_delayed).

-behavior(gen_statem).

-export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3,
         position/2, write/2, pwrite/2, pwrite/3,
         read_line/1, read/2, pread/2, pread/3,
         read_handle_info/2]).

%% OTP internal.
-export([ipread_s32bu_p32bu/3, sendfile/8]).

-export([open_layer/3]).

-export([init/1, callback_mode/0, terminate/3]).
-export([opening/3, opened/3]).

-include("file_int.hrl").

open_layer(Filename, Modes, Options) ->
    Secret = make_ref(),
    case gen_statem:start(?MODULE, {self(), Secret, Options}, []) of
        {ok, Pid} ->
            gen_statem:call(Pid, {'$open', Secret, Filename, Modes}, infinity);
        Other ->
            Other
    end.

callback_mode() -> state_functions.

init({Owner, Secret, Options}) ->
    Monitor = monitor(process, Owner),
    Defaults =
        #{ owner => Owner,
           monitor => Monitor,
           secret => Secret,
           timer => none,
           pid => self(),
           buffer => prim_buffer:new(),
           delay_size => 64 bsl 10,
           delay_time => 2000 },
    Data = fill_delay_values(Defaults, Options),
    {ok, opening, Data}.

fill_delay_values(Data, []) ->
    Data;
fill_delay_values(Data, [{delayed_write, Size, Time} | Options]) ->
    fill_delay_values(Data#{ delay_size => Size, delay_time => Time }, Options);
fill_delay_values(Data, [_ | Options]) ->
    fill_delay_values(Data, Options).

opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
    case raw_file_io:open(Filename, Modes) of
        {ok, PrivateFd} ->
            PublicData = maps:with([owner, buffer, delay_size, pid], Data),
            PublicFd = #file_descriptor{ module = ?MODULE, data = PublicData },

            NewData = Data#{ handle => PrivateFd },
            Response = {ok, PublicFd},
            {next_state, opened, NewData, [{reply, From, Response}]};
        Other ->
            {stop_and_reply, normal, [{reply, From, Other}]}
    end;
opening(_Event, _Contents, _Data) ->
    {keep_state_and_data, [postpone]}.

%%

opened(info, {'$timed_out', Secret}, #{ secret := Secret } = Data) ->
    %% If the user writes something at this exact moment, the flush will fail
    %% and the timer won't reset on the next write since the buffer won't be
    %% empty (Unless we collided on a flush). We therefore reset the timeout to
    %% ensure that data won't sit idle for extended periods of time.
    case try_flush_write_buffer(Data) of
        busy -> gen_statem:cast(self(), '$reset_timeout');
        ok -> ok
    end,
    {keep_state, Data#{ timer => none }, []};

opened(info, {'DOWN', Monitor, process, _Owner, Reason}, #{ monitor := Monitor } = Data) ->
    if
        Reason =/= kill -> try_flush_write_buffer(Data);
        Reason =:= kill -> ignored
    end,
    {stop, shutdown};

opened(info, _Message, _Data) ->
    keep_state_and_data;

opened({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
    case flush_write_buffer(Data) of
        ok ->
            #{ handle := PrivateFd } = Data,
            Response = ?CALL_FD(PrivateFd, close, []),
            {stop_and_reply, normal, [{reply, From, Response}]};
        Other ->
            {stop_and_reply, normal, [{reply, From, Other}]}
    end;

opened({call, {Owner, _Tag} = From}, '$wait', #{ owner := Owner }) ->
    %% Used in write/2 to synchronize writes on lock conflicts.
    {keep_state_and_data, [{reply, From, ok}]};

opened({call, {Owner, _Tag} = From}, '$synchronous_flush', #{ owner := Owner } = Data) ->
    cancel_flush_timeout(Data),
    Response = flush_write_buffer(Data),
    {keep_state_and_data, [{reply, From, Response}]};

opened({call, {Owner, _Tag} = From}, Command, #{ owner := Owner } = Data) ->
    Response =
        case flush_write_buffer(Data) of
            ok -> dispatch_command(Data, Command);
            Other -> Other
        end,
    {keep_state_and_data, [{reply, From, Response}]};

opened({call, _From}, _Command, _Data) ->
    %% The client functions filter this out, so we'll crash if the user does
    %% anything stupid on purpose.
    {shutdown, protocol_violation};

opened(cast, '$reset_timeout', #{ delay_time := Timeout, secret := Secret } = Data) ->
    cancel_flush_timeout(Data),
    Timer = erlang:send_after(Timeout, self(), {'$timed_out', Secret}),
    {keep_state, Data#{ timer => Timer }, []};

opened(cast, _Message, _Data) ->
    {keep_state_and_data, []}.

dispatch_command(Data, [Function | Args]) ->
    #{ handle := Handle } = Data,
    Module = Handle#file_descriptor.module,
    apply(Module, Function, [Handle | Args]).

cancel_flush_timeout(#{ timer := none }) ->
    ok;
cancel_flush_timeout(#{ timer := Timer }) ->
    _ = erlang:cancel_timer(Timer, [{async, true}]),
    ok.

try_flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) ->
    case prim_buffer:try_lock(Buffer) of
        acquired ->
            flush_write_buffer_1(Buffer, PrivateFd),
            prim_buffer:unlock(Buffer),
            ok;
        busy ->
            busy
    end.

%% This is only safe to use when there is no chance of conflict with the owner
%% process, or in other words, "during synchronous calls outside of the locked
%% section of write/2"
flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) ->
    acquired = prim_buffer:try_lock(Buffer),
    Result = flush_write_buffer_1(Buffer, PrivateFd),
    prim_buffer:unlock(Buffer),
    Result.

flush_write_buffer_1(Buffer, PrivateFd) ->
    case prim_buffer:size(Buffer) of
        Size when Size > 0 ->
            ?CALL_FD(PrivateFd, write, [prim_buffer:read_iovec(Buffer, Size)]);
        0 ->
            ok
    end.

terminate(_Reason, _State, _Data) ->
    ok.

%% Client functions

write(Fd, IOData) ->
    try
        enqueue_write(Fd, erlang:iolist_to_iovec(IOData))
    catch
        error:badarg -> {error, badarg}
    end.
enqueue_write(_Fd, []) ->
    ok;
enqueue_write(Fd, IOVec) ->
    %% get_fd_data will reject everyone except the process that opened the Fd,
    %% so we can't race with anyone except the wrapper process.
    #{ delay_size := DelaySize,
       buffer := Buffer,
       pid := Pid } = get_fd_data(Fd),
    case prim_buffer:try_lock(Buffer) of
        acquired ->
            %% (The wrapper process will exit without flushing if we're killed
            %% while holding the lock).
            enqueue_write_locked(Pid, Buffer, DelaySize, IOVec);
        busy ->
            %% This can only happen while we're processing a timeout in the
            %% wrapper process, so we perform a bogus call to get a completion
            %% notification before trying again.
            gen_statem:call(Pid, '$wait'),
            enqueue_write(Fd, IOVec)
    end.
enqueue_write_locked(Pid, Buffer, DelaySize, IOVec) ->
    %% The synchronous operations (write, forced flush) are safe since we're
    %% running on the only process that can fill the buffer; a timeout being
    %% processed just before $synchronous_flush will cause the flush to nop,
    %% and a timeout sneaking in just before a synchronous write won't do
    %% anything since the buffer is guaranteed to be empty at that point.
    BufSize = prim_buffer:size(Buffer),
    case is_iovec_smaller_than(IOVec, DelaySize - BufSize) of
        true when BufSize > 0 ->
            prim_buffer:write(Buffer, IOVec),
            prim_buffer:unlock(Buffer);
        true ->
            prim_buffer:write(Buffer, IOVec),
            prim_buffer:unlock(Buffer),
            gen_statem:cast(Pid, '$reset_timeout');
        false when BufSize > 0 ->
            prim_buffer:write(Buffer, IOVec),
            prim_buffer:unlock(Buffer),
            gen_statem:call(Pid, '$synchronous_flush');
        false ->
            prim_buffer:unlock(Buffer),
            gen_statem:call(Pid, [write, IOVec])
    end.

%% iolist_size/1 will always look through the entire list to get a precise
%% amount, which is pretty inefficient since we only need to know whether we've
%% hit the buffer threshold or not.
%%
%% We only handle the binary case since write/2 forcibly translates input to
%% erlang:iovec().
is_iovec_smaller_than(IOVec, Max) ->
    is_iovec_smaller_than_1(IOVec, Max, 0).
is_iovec_smaller_than_1(_IOVec, Max, Acc) when Acc >= Max ->
    false;
is_iovec_smaller_than_1([], _Max, _Acc) ->
    true;
is_iovec_smaller_than_1([Binary | Rest], Max, Acc) when is_binary(Binary) ->
    is_iovec_smaller_than_1(Rest, Max, Acc + byte_size(Binary)).

close(Fd) ->
    wrap_call(Fd, [close]).

sync(Fd) ->
    wrap_call(Fd, [sync]).
datasync(Fd) ->
    wrap_call(Fd, [datasync]).

truncate(Fd) ->
    wrap_call(Fd, [truncate]).

advise(Fd, Offset, Length, Advise) ->
    wrap_call(Fd, [advise, Offset, Length, Advise]).
allocate(Fd, Offset, Length) ->
    wrap_call(Fd, [allocate, Offset, Length]).

position(Fd, Mark) ->
    wrap_call(Fd, [position, Mark]).

pwrite(Fd, Offset, IOData) ->
    try
        CompactedData = erlang:iolist_to_iovec(IOData),
        wrap_call(Fd, [pwrite, Offset, CompactedData])
    catch
        error:badarg -> {error, badarg}
    end.
pwrite(Fd, LocBytes) ->
    try
        CompactedLocBytes =
            [ {Offset, erlang:iolist_to_iovec(IOData)} ||
              {Offset, IOData} <- LocBytes ],
        wrap_call(Fd, [pwrite, CompactedLocBytes])
    catch
        error:badarg -> {error, badarg}
    end.

read_line(Fd) ->
    wrap_call(Fd, [read_line]).
read(Fd, Size) ->
    wrap_call(Fd, [read, Size]).
pread(Fd, Offset, Size) ->
    wrap_call(Fd, [pread, Offset, Size]).
pread(Fd, LocNums) ->
    wrap_call(Fd, [pread, LocNums]).

ipread_s32bu_p32bu(Fd, Offset, MaxSize) ->
    wrap_call(Fd, [ipread_s32bu_p32bu, Offset, MaxSize]).

sendfile(_,_,_,_,_,_,_,_) ->
    {error, enotsup}.

read_handle_info(Fd, Opts) ->
    wrap_call(Fd, [Opts]).

wrap_call(Fd, Command) ->
    #{ pid := Pid } = get_fd_data(Fd),
    try gen_statem:call(Pid, Command, infinity) of
        Result -> Result
    catch
        exit:{noproc, _StackTrace} -> {error, einval}
    end.

get_fd_data(#file_descriptor{ data = Data }) ->
    #{ owner := Owner } = Data,
    case self() of
        Owner -> Data;
        _ -> error(not_on_controlling_process)
    end.