aboutsummaryrefslogblamecommitdiffstats
path: root/lib/kernel/src/raw_file_io_inflate.erl
blob: d3ed02dd03068be668fad2c5cff5b4d0728c9da7 (plain) (tree)


























                                                                           
                                        






































































































































































































































                                                                                                 
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
-module(raw_file_io_inflate).

-behavior(gen_statem).

-export([init/1, callback_mode/0, terminate/3]).
-export([opening/3, opened_gzip/3, opened_passthrough/3]).

-include("file_int.hrl").

-define(INFLATE_CHUNK_SIZE, (8 bsl 10)).
-define(GZIP_WBITS, (16 + 15)).

callback_mode() -> state_functions.

init({Owner, Secret, [compressed]}) ->
    Monitor = monitor(process, Owner),
    %% We're using the undocumented inflateInit/3 to open the stream in
    %% 'reset mode', which resets the inflate state at the end of every stream,
    %% allowing us to read concatenated gzip files.
    Z = zlib:open(),
    ok = zlib:inflateInit(Z, ?GZIP_WBITS, reset),
    Data =
        #{ owner => Owner,
           monitor => Monitor,
           secret => Secret,
           position => 0,
           buffer => prim_buffer:new(),
           zlib => Z },
    {ok, opening, Data}.

%% The old driver fell back to plain reads if the file didn't start with the
%% magic gzip bytes.
choose_decompression_state(PrivateFd) ->
    State =
        case ?CALL_FD(PrivateFd, read, [2]) of
            {ok, <<16#1F, 16#8B>>} -> opened_gzip;
            _Other -> opened_passthrough
        end,
    {ok, 0} = ?CALL_FD(PrivateFd, position, [0]),
    State.

opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
    case raw_file_io:open(Filename, Modes) of
        {ok, PrivateFd} ->
            NextState = choose_decompression_state(PrivateFd),
            NewData = Data#{ handle => PrivateFd },
            {next_state, NextState, NewData, [{reply, From, ok}]};
        Other ->
            {stop_and_reply, normal, [{reply, From, Other}]}
    end;
opening(_Event, _Contents, _Data) ->
    {keep_state_and_data, [postpone]}.

internal_close(From, Data) ->
    #{ handle := PrivateFd } = Data,
    Response = ?CALL_FD(PrivateFd, close, []),
    {stop_and_reply, normal, [{reply, From, Response}]}.

opened_passthrough(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
    {stop, shutdown};

opened_passthrough(info, _Message, _Data) ->
    keep_state_and_data;

opened_passthrough({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
    internal_close(From, Data);

opened_passthrough({call, {Owner, _Tag} = From}, [Method | Args], #{ owner := Owner } = Data) ->
    #{ handle := PrivateFd } = Data,
    Response = ?CALL_FD(PrivateFd, Method, Args),
    {keep_state_and_data, [{reply, From, Response}]};

opened_passthrough({call, _From}, _Command, _Data) ->
    %% The client functions filter this out, so we'll crash if the user does
    %% anything stupid on purpose.
    {shutdown, protocol_violation};

opened_passthrough(_Event, _Request, _Data) ->
    keep_state_and_data.

%%

opened_gzip(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
    {stop, shutdown};

opened_gzip(info, _Message, _Data) ->
    keep_state_and_data;

opened_gzip({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
    internal_close(From, Data);

opened_gzip({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
    case position(Data, Mark) of
        {ok, NewData, Result} ->
            Response = {ok, Result},
            {keep_state, NewData, [{reply, From, Response}]};
        Other ->
            {keep_state_and_data, [{reply, From, Other}]}
    end;

opened_gzip({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Data) ->
    case read(Data, Size) of
        {ok, NewData, Result} ->
            Response = {ok, Result},
            {keep_state, NewData, [{reply, From, Response}]};
        Other ->
            {keep_state_and_data, [{reply, From, Other}]}
    end;

opened_gzip({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Data) ->
    case read_line(Data) of
        {ok, NewData, Result} ->
            Response = {ok, Result},
            {keep_state, NewData, [{reply, From, Response}]};
        Other ->
            {keep_state_and_data, [{reply, From, Other}]}
    end;

opened_gzip({call, {Owner, _Tag} = From}, [write, _IOData], #{ owner := Owner }) ->
    Response = {error, ebadf},
    {keep_state_and_data, [{reply, From, Response}]};

opened_gzip({call, {Owner, _Tag} = From}, _Request, #{ owner := Owner }) ->
    Response = {error, enotsup},
    {keep_state_and_data, [{reply, From, Response}]};

opened_gzip({call, _From}, _Request, _Data) ->
    %% The client functions filter this out, so we'll crash if the user does
    %% anything stupid on purpose.
    {shutdown, protocol_violation};

opened_gzip(_Event, _Request, _Data) ->
    keep_state_and_data.

%%

read(#{ buffer := Buffer } = Data, Size) ->
    try read_1(Data, Buffer, prim_buffer:size(Buffer), Size) of
        Result -> Result
    catch
        error:badarg -> {error, badarg};
        error:_ -> {error, eio}
    end.
read_1(Data, Buffer, BufferSize, ReadSize) when BufferSize >= ReadSize ->
    #{ position := Position } = Data,
    Decompressed = prim_buffer:read(Buffer, ReadSize),
    {ok, Data#{ position => (Position + ReadSize) }, Decompressed};
read_1(Data, Buffer, BufferSize, ReadSize) when BufferSize < ReadSize ->
    #{ handle := PrivateFd } = Data,
    case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
        {ok, Compressed} ->
            #{ zlib := Z } = Data,
            Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
            prim_buffer:write(Buffer, Uncompressed),
            read_1(Data, Buffer, prim_buffer:size(Buffer), ReadSize);
        eof when BufferSize > 0 ->
            read_1(Data, Buffer, BufferSize, BufferSize);
        Other ->
            Other
    end.

read_line(#{ buffer := Buffer } = Data) ->
    try read_line_1(Data, Buffer, prim_buffer:find_byte_index(Buffer, $\n)) of
        {ok, NewData, Decompressed} -> {ok, NewData, Decompressed};
        Other -> Other
    catch
        error:badarg -> {error, badarg};
        error:_ -> {error, eio}
    end.

read_line_1(Data, Buffer, not_found) ->
    #{ handle := PrivateFd, zlib := Z } = Data,
    case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
        {ok, Compressed} ->
            Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
            prim_buffer:write(Buffer, Uncompressed),
            read_line_1(Data, Buffer, prim_buffer:find_byte_index(Buffer, $\n));
        eof ->
            case prim_buffer:size(Buffer) of
                Size when Size > 0 -> {ok, prim_buffer:read(Buffer, Size)};
                Size when Size =:= 0 -> eof
            end;
        Error ->
            Error
    end;
read_line_1(Data, Buffer, {ok, LFIndex}) ->
    %% Translate CRLF into just LF, completely ignoring which encoding is used,
    %% but treat the file position as including CR.
    #{ position := Position } = Data,
    NewData = Data#{ position => (Position + LFIndex + 1) },
    CRIndex = (LFIndex - 1),
    TranslatedLine =
        case prim_buffer:read(Buffer, LFIndex + 1) of
            <<Line:CRIndex/binary, "\r\n">> -> <<Line/binary, "\n">>;
            Line -> Line
        end,
    {ok, NewData, TranslatedLine}.

%%
%% We support seeking in both directions as long as it isn't relative to EOF.
%%
%% Seeking backwards is extremely inefficient since we have to seek to the very
%% beginning and then decompress up to the desired point.
%%

position(Data, Mark) when is_atom(Mark) ->
    position(Data, {Mark, 0});
position(Data, Offset) when is_integer(Offset) ->
    position(Data, {bof, Offset});
position(Data, {bof, Offset}) when is_integer(Offset) ->
    position_1(Data, Offset);
position(Data, {cur, Offset}) when is_integer(Offset) ->
    #{ position := Position } = Data,
    position_1(Data, Position + Offset);
position(_Data, {eof, Offset}) when is_integer(Offset) ->
    {error, einval};
position(_Data, _Other) ->
    {error, badarg}.

position_1(_Data, Desired) when Desired < 0 ->
    {error, einval};
position_1(#{ position := Desired } = Data, Desired) ->
    {ok, Data, Desired};
position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
    case read(Data, min(Desired - Current, ?INFLATE_CHUNK_SIZE)) of
        {ok, NewData, _Data} -> position_1(NewData, Desired);
        eof -> {ok, Data, Current};
        Other -> Other
    end;
position_1(#{ position := Current } = Data, Desired) when Current > Desired ->
    #{ handle := PrivateFd, buffer := Buffer, zlib := Z } = Data,
    case ?CALL_FD(PrivateFd, position, [bof]) of
        {ok, 0} ->
            ok = zlib:inflateReset(Z),
            prim_buffer:wipe(Buffer),
            position_1(Data#{ position => 0 }, Desired);
        Other ->
            Other
    end.

terminate(_Reason, _State, _Data) ->
    ok.