%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% -module(raw_file_io_delayed). -behavior(gen_statem). -export([close/1, sync/1, datasync/1, truncate/1, advise/4, allocate/3, position/2, write/2, pwrite/2, pwrite/3, read_line/1, read/2, pread/2, pread/3, read_handle_info/2]). %% OTP internal. -export([ipread_s32bu_p32bu/3, sendfile/8]). -export([open_layer/3]). -export([init/1, callback_mode/0, terminate/3]). -export([opening/3, opened/3]). -include("file_int.hrl"). open_layer(Filename, Modes, Options) -> Secret = make_ref(), case gen_statem:start(?MODULE, {self(), Secret, Options}, []) of {ok, Pid} -> gen_statem:call(Pid, {'$open', Secret, Filename, Modes}, infinity); Other -> Other end. callback_mode() -> state_functions. init({Owner, Secret, Options}) -> Monitor = monitor(process, Owner), Defaults = #{ owner => Owner, monitor => Monitor, secret => Secret, timer => none, pid => self(), buffer => prim_buffer:new(), delay_size => 64 bsl 10, delay_time => 2000 }, Data = fill_delay_values(Defaults, Options), {ok, opening, Data}. fill_delay_values(Data, []) -> Data; fill_delay_values(Data, [{delayed_write, Size, Time} | Options]) -> fill_delay_values(Data#{ delay_size => Size, delay_time => Time }, Options); fill_delay_values(Data, [_ | Options]) -> fill_delay_values(Data, Options). opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) -> case raw_file_io:open(Filename, Modes) of {ok, PrivateFd} -> PublicData = maps:with([owner, buffer, delay_size, pid], Data), PublicFd = #file_descriptor{ module = ?MODULE, data = PublicData }, NewData = Data#{ handle => PrivateFd }, Response = {ok, PublicFd}, {next_state, opened, NewData, [{reply, From, Response}]}; Other -> {stop_and_reply, normal, [{reply, From, Other}]} end; opening(_Event, _Contents, _Data) -> {keep_state_and_data, [postpone]}. %% opened(info, {'$timed_out', Secret}, #{ secret := Secret } = Data) -> %% If the user writes something at this exact moment, the flush will fail %% and the timer won't reset on the next write since the buffer won't be %% empty (Unless we collided on a flush). We therefore reset the timeout to %% ensure that data won't sit idle for extended periods of time. case try_flush_write_buffer(Data) of busy -> gen_statem:cast(self(), '$reset_timeout'); ok -> ok end, {keep_state, Data#{ timer => none }, []}; opened(info, {'DOWN', Monitor, process, _Owner, Reason}, #{ monitor := Monitor } = Data) -> if Reason =/= kill -> try_flush_write_buffer(Data); Reason =:= kill -> ignored end, {stop, shutdown}; opened(info, _Message, _Data) -> keep_state_and_data; opened({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) -> case flush_write_buffer(Data) of ok -> #{ handle := PrivateFd } = Data, Response = ?CALL_FD(PrivateFd, close, []), {stop_and_reply, normal, [{reply, From, Response}]}; Other -> {stop_and_reply, normal, [{reply, From, Other}]} end; opened({call, {Owner, _Tag} = From}, '$wait', #{ owner := Owner }) -> %% Used in write/2 to synchronize writes on lock conflicts. {keep_state_and_data, [{reply, From, ok}]}; opened({call, {Owner, _Tag} = From}, '$synchronous_flush', #{ owner := Owner } = Data) -> cancel_flush_timeout(Data), Response = flush_write_buffer(Data), {keep_state_and_data, [{reply, From, Response}]}; opened({call, {Owner, _Tag} = From}, Command, #{ owner := Owner } = Data) -> Response = case flush_write_buffer(Data) of ok -> dispatch_command(Data, Command); Other -> Other end, {keep_state_and_data, [{reply, From, Response}]}; opened({call, _From}, _Command, _Data) -> %% The client functions filter this out, so we'll crash if the user does %% anything stupid on purpose. {shutdown, protocol_violation}; opened(cast, '$reset_timeout', #{ delay_time := Timeout, secret := Secret } = Data) -> cancel_flush_timeout(Data), Timer = erlang:send_after(Timeout, self(), {'$timed_out', Secret}), {keep_state, Data#{ timer => Timer }, []}; opened(cast, _Message, _Data) -> {keep_state_and_data, []}. dispatch_command(Data, [Function | Args]) -> #{ handle := Handle } = Data, Module = Handle#file_descriptor.module, apply(Module, Function, [Handle | Args]). cancel_flush_timeout(#{ timer := none }) -> ok; cancel_flush_timeout(#{ timer := Timer }) -> _ = erlang:cancel_timer(Timer, [{async, true}]), ok. try_flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) -> case prim_buffer:try_lock(Buffer) of acquired -> flush_write_buffer_1(Buffer, PrivateFd), prim_buffer:unlock(Buffer), ok; busy -> busy end. %% This is only safe to use when there is no chance of conflict with the owner %% process, or in other words, "during synchronous calls outside of the locked %% section of write/2" flush_write_buffer(#{ buffer := Buffer, handle := PrivateFd }) -> acquired = prim_buffer:try_lock(Buffer), Result = flush_write_buffer_1(Buffer, PrivateFd), prim_buffer:unlock(Buffer), Result. flush_write_buffer_1(Buffer, PrivateFd) -> case prim_buffer:size(Buffer) of Size when Size > 0 -> ?CALL_FD(PrivateFd, write, [prim_buffer:read_iovec(Buffer, Size)]); 0 -> ok end. terminate(_Reason, _State, _Data) -> ok. %% Client functions write(Fd, IOData) -> try enqueue_write(Fd, erlang:iolist_to_iovec(IOData)) catch error:badarg -> {error, badarg} end. enqueue_write(_Fd, []) -> ok; enqueue_write(Fd, IOVec) -> %% get_fd_data will reject everyone except the process that opened the Fd, %% so we can't race with anyone except the wrapper process. #{ delay_size := DelaySize, buffer := Buffer, pid := Pid } = get_fd_data(Fd), case prim_buffer:try_lock(Buffer) of acquired -> %% (The wrapper process will exit without flushing if we're killed %% while holding the lock). enqueue_write_locked(Pid, Buffer, DelaySize, IOVec); busy -> %% This can only happen while we're processing a timeout in the %% wrapper process, so we perform a bogus call to get a completion %% notification before trying again. gen_statem:call(Pid, '$wait'), enqueue_write(Fd, IOVec) end. enqueue_write_locked(Pid, Buffer, DelaySize, IOVec) -> %% The synchronous operations (write, forced flush) are safe since we're %% running on the only process that can fill the buffer; a timeout being %% processed just before $synchronous_flush will cause the flush to nop, %% and a timeout sneaking in just before a synchronous write won't do %% anything since the buffer is guaranteed to be empty at that point. BufSize = prim_buffer:size(Buffer), case is_iovec_smaller_than(IOVec, DelaySize - BufSize) of true when BufSize > 0 -> prim_buffer:write(Buffer, IOVec), prim_buffer:unlock(Buffer); true -> prim_buffer:write(Buffer, IOVec), prim_buffer:unlock(Buffer), gen_statem:cast(Pid, '$reset_timeout'); false when BufSize > 0 -> prim_buffer:write(Buffer, IOVec), prim_buffer:unlock(Buffer), gen_statem:call(Pid, '$synchronous_flush'); false -> prim_buffer:unlock(Buffer), gen_statem:call(Pid, [write, IOVec]) end. %% iolist_size/1 will always look through the entire list to get a precise %% amount, which is pretty inefficient since we only need to know whether we've %% hit the buffer threshold or not. %% %% We only handle the binary case since write/2 forcibly translates input to %% erlang:iovec(). is_iovec_smaller_than(IOVec, Max) -> is_iovec_smaller_than_1(IOVec, Max, 0). is_iovec_smaller_than_1(_IOVec, Max, Acc) when Acc >= Max -> false; is_iovec_smaller_than_1([], _Max, _Acc) -> true; is_iovec_smaller_than_1([Binary | Rest], Max, Acc) when is_binary(Binary) -> is_iovec_smaller_than_1(Rest, Max, Acc + byte_size(Binary)). close(Fd) -> wrap_call(Fd, [close]). sync(Fd) -> wrap_call(Fd, [sync]). datasync(Fd) -> wrap_call(Fd, [datasync]). truncate(Fd) -> wrap_call(Fd, [truncate]). advise(Fd, Offset, Length, Advise) -> wrap_call(Fd, [advise, Offset, Length, Advise]). allocate(Fd, Offset, Length) -> wrap_call(Fd, [allocate, Offset, Length]). position(Fd, Mark) -> wrap_call(Fd, [position, Mark]). pwrite(Fd, Offset, IOData) -> try CompactedData = erlang:iolist_to_iovec(IOData), wrap_call(Fd, [pwrite, Offset, CompactedData]) catch error:badarg -> {error, badarg} end. pwrite(Fd, LocBytes) -> try CompactedLocBytes = [ {Offset, erlang:iolist_to_iovec(IOData)} || {Offset, IOData} <- LocBytes ], wrap_call(Fd, [pwrite, CompactedLocBytes]) catch error:badarg -> {error, badarg} end. read_line(Fd) -> wrap_call(Fd, [read_line]). read(Fd, Size) -> wrap_call(Fd, [read, Size]). pread(Fd, Offset, Size) -> wrap_call(Fd, [pread, Offset, Size]). pread(Fd, LocNums) -> wrap_call(Fd, [pread, LocNums]). ipread_s32bu_p32bu(Fd, Offset, MaxSize) -> wrap_call(Fd, [ipread_s32bu_p32bu, Offset, MaxSize]). sendfile(_,_,_,_,_,_,_,_) -> {error, enotsup}. read_handle_info(Fd, Opts) -> wrap_call(Fd, [Opts]). wrap_call(Fd, Command) -> #{ pid := Pid } = get_fd_data(Fd), try gen_statem:call(Pid, Command, infinity) of Result -> Result catch exit:{noproc, _StackTrace} -> {error, einval} end. get_fd_data(#file_descriptor{ data = Data }) -> #{ owner := Owner } = Data, case self() of Owner -> Data; _ -> error(not_on_controlling_process) end.