diff options
Diffstat (limited to 'lib/kernel/src')
-rw-r--r-- | lib/kernel/src/file.erl | 58 | ||||
-rw-r--r-- | lib/kernel/src/file_io_server.erl | 5 | ||||
-rw-r--r-- | lib/kernel/src/gen_tcp.erl | 153 |
3 files changed, 155 insertions, 61 deletions
diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index fba9a86e95..706c60caaf 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -41,7 +41,7 @@ pread/2, pread/3, pwrite/2, pwrite/3, read_line/1, position/2, truncate/1, datasync/1, sync/1, - copy/2, copy/3, sendfile/4, sendfile/2]). + copy/2, copy/3]). %% High level operations -export([consult/1, path_consult/2]). -export([eval/1, eval/2, path_eval/2, path_eval/3, path_open/3]). @@ -335,62 +335,6 @@ raw_write_file_info(Name, #file_info{} = Info) -> Error end. -%% sendfile/5 -%% TODO: add more guards? export sendfile/5? --spec sendfile(File, Sock, Offset, Bytes, ChunkSize) - -> {'ok', non_neg_integer()} | {'error', posix()} when - File::io_device(), Sock::port() | integer(), - Offset::non_neg_integer(), Bytes::non_neg_integer(), - ChunkSize::non_neg_integer(). -sendfile(File, Sock, Offset, Bytes, ChunkSize) when is_integer(Sock) - andalso is_pid(File) -> - R = file_request(File, {sendfile, Sock, Offset, Bytes, ChunkSize}), - wait_file_reply(File, R); -sendfile(File, Sock, Offset, Bytes, ChunkSize) when is_port(Sock) - andalso is_pid(File) -> - {ok, SockFD} = prim_inet:getfd(Sock), - sendfile(File, SockFD, Offset, Bytes, ChunkSize); -sendfile(#file_descriptor{module = Module} = Handle, Sock, - Offset, Bytes, ChunkSize) when is_integer(Sock) -> - Module:sendfile(Handle, Sock, Offset, Bytes, ChunkSize); -sendfile(#file_descriptor{module = _Module} = Handle, Sock, - Offset, Bytes, ChunkSize) when is_port(Sock) -> - {ok, SockFD} = prim_inet:getfd(Sock), - sendfile(Handle, SockFD, Offset, Bytes, ChunkSize); -sendfile(_, _, _, _, _) -> - {error, badarg}. - --define(SENDFILE_CHUNK_LIMIT, 2147483648). % 2GB - -%% Limit chunksize to work around 4 byte off_t/size_t limits -sendfile_chunksize(Bytes, Limit) -> - case Bytes >= Limit of - true -> Limit - 1; - false -> Bytes - end. - --spec sendfile(File, Sock, Offset, Bytes) - -> {'ok', non_neg_integer()} | {'error', posix()} when - File::io_device(), Sock::port() | integer(), - Offset::non_neg_integer(), Bytes::non_neg_integer(). -sendfile(File, Sock, Offset, Bytes) -> - ChunkSize = sendfile_chunksize(Bytes, ?SENDFILE_CHUNK_LIMIT), - sendfile(File, Sock, Offset, Bytes, ChunkSize). - -%% sendfile/2 -%% TODO: add guards? --spec sendfile(File, Sock) -> {'ok', non_neg_integer()} | {'error', posix()} - when File::name(), Sock::port(). -sendfile(File, Sock) -> - Offset = 0, - {ok, #file_info{size = Bytes}} = read_file_info(File), - %% TODO: use file:open/2 and file:read_file_info/1 instead of local calls? - {ok, Fd} = open(File, [read, raw, binary]), - ChunkSize = sendfile_chunksize(Bytes, ?SENDFILE_CHUNK_LIMIT), - Res = sendfile(Fd, Sock, Offset, Bytes, ChunkSize), - ok = close(Fd), - Res. - %%%----------------------------------------------------------------- %%% File io server functions. %%% They operate on a single open file. diff --git a/lib/kernel/src/file_io_server.erl b/lib/kernel/src/file_io_server.erl index 78d3d24394..7280635f53 100644 --- a/lib/kernel/src/file_io_server.erl +++ b/lib/kernel/src/file_io_server.erl @@ -249,9 +249,10 @@ file_request(close, file_request({position,At}, #state{handle=Handle,buf=Buf}=State) -> std_reply(position(Handle, At, Buf), State); -file_request({sendfile,DestFD,Offset,Bytes,ChunkSize}, +file_request({sendfile,DestSock,Offset,Bytes,Opts}, #state{handle=Handle}=State) -> - case ?PRIM_FILE:sendfile(Handle, DestFD, Offset, Bytes, ChunkSize) of + %% gen_tcp will call prim_file:sendfile with correct arguments + case gen_tcp:sendfile(Handle, DestSock, Offset, Bytes, Opts) of {error,_}=Reply -> {stop,normal,Reply,State}; Reply -> diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 8ab18c01b4..56eca4cda4 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -22,11 +22,12 @@ -export([connect/3, connect/4, listen/2, accept/1, accept/2, shutdown/2, close/1]). --export([send/2, recv/2, recv/3, unrecv/2]). +-export([send/2, recv/2, recv/3, unrecv/2, sendfile/2, sendfile/5]). -export([controlling_process/2]). -export([fdopen/2]). -include("inet_int.hrl"). +-include("file.hrl"). -type option() :: {active, true | false | once} | @@ -105,8 +106,13 @@ {tcp_module, module()} | option(). -type socket() :: port(). +-type sendfile_option() :: {chunk_size, non_neg_integer()} | + {headers, Hdrs :: list(iodata())} | + {trailers, Tlrs :: list(iodata())} | + sf_nodiskio | sf_mnowait | sf_sync. --export_type([option/0, option_name/0, connect_option/0, listen_option/0]). +-export_type([option/0, option_name/0, connect_option/0, listen_option/0, + sendfile_option/0]). %% %% Connect a socket @@ -305,6 +311,61 @@ unrecv(S, Data) when is_port(S) -> end. %% +%% Send data using sendfile +%% + +-define(MAX_CHUNK_SIZE, (1 bsl 30)*2-1). + +-spec sendfile(File, Sock, Offset, Bytes, Opts) -> + {'ok', non_neg_integer()} | {'error', inet:posix()} when + File :: file:io_device(), + Sock :: socket(), + Offset :: non_neg_integer() | undefined, + Bytes :: non_neg_integer() | undefined, + Opts :: [sendfile_option()]. +sendfile(File, Sock, Offset, Bytes, Opts) when is_pid(File) -> + Ref = erlang:monitor(process, File), + File ! {file_request,self(),File, + {sendfile,Sock,Offset,Bytes,Opts}}, + receive + {file_reply,File,Reply} -> + erlang:demonitor(Ref,[flush]), + Reply; + {'DOWN', Ref, _, _, _} -> + {error, terminated} + end; +sendfile(File, Sock, Offset, Bytes, []) -> + sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, undefined, undefined, + false, false, false); +sendfile(File, Sock, Offset, Bytes, Opts) -> + ChunkSize0 = proplists:get_value(chunk_size, Opts, ?MAX_CHUNK_SIZE), + ChunkSize = if ChunkSize0 > ?MAX_CHUNK_SIZE -> + ?MAX_CHUNK_SIZE; + true -> ChunkSize0 + end, + Headers = proplists:get_value(headers, Opts), + Trailers = proplists:get_value(trailers, Opts), + sendfile(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + lists:member(sf_nodiskio,Opts),lists:member(sf_mnowait,Opts), + lists:member(sf_sync,Opts)). + +%% sendfile/2 +-spec sendfile(File, Sock) -> + {'ok', non_neg_integer()} | {'error', inet:posix() | badarg} + when File :: file:name(), + Sock :: socket(). +sendfile(File, Sock) -> + case file:open(File, [read, raw, binary]) of + {error, Reason} -> + {error, Reason}; + {ok, Fd} -> + {ok, #file_info{size = Bytes}} = file:read_file_info(File), + Res = sendfile(Fd, Sock, 0, Bytes, []), + file:close(Fd), + Res + end. + +%% %% Set controlling process %% @@ -354,3 +415,91 @@ mod([_|Opts], Address) -> mod(Opts, Address); mod([], Address) -> mod(Address). + + +%% Internal sendfile functions +sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, + ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) + when is_port(Sock) -> + case Mod:sendfile(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + Nodiskio, MNowait, Sync) of + {error, enotsup} -> + sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers); + Else -> + Else + end; +sendfile(_,_,_,_,_,_,_,_,_,_) -> + {error, badarg}. + +%%% +%% Sendfile Fallback +%%% +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers) + when is_list(Headers) == false -> + case sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) of + {ok, BytesSent} when is_list(Trailers),is_integer(Headers) -> + sendfile_send(Sock, Trailers, BytesSent+Headers); + {ok, BytesSent} when is_list(Trailers) -> + sendfile_send(Sock, Trailers, BytesSent); + {ok, BytesSent} when is_integer(Headers) -> + {ok, BytesSent + Headers}; + Else -> + Else + end; +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) -> + case sendfile_send(Sock, Headers, 0) of + {ok, BytesSent} -> + sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, BytesSent, + Trailers); + Else -> + Else + end. + + +sendfile_fallback(File, Sock, undefined, Bytes, ChunkSize) -> + sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0); +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) -> + {ok, CurrPos} = file:position(File, 0), + {ok, _NewPos} = file:position(File, {bof, Offset}), + Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0), + file:position(File, {bof, CurrPos}), + Res. + + +sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> + {ok, BytesSent}; +sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) + when Bytes > BytesSent; Bytes == undefined -> + Size = if Bytes == undefined -> + ChunkSize; + (Bytes - BytesSent + ChunkSize) > 0 -> + Bytes - BytesSent; + true -> + ChunkSize + end, + case file:read(File, Size) of + {ok, Data} -> + case sendfile_send(Sock, Data, BytesSent) of + {ok,NewBytesSent} -> + sendfile_fallback_int( + File, Sock, Bytes, ChunkSize, + NewBytesSent); + Error -> + Error + end; + eof -> + {ok, BytesSent}; + Error -> + Error + end. + +sendfile_send(Sock, Data, Old) -> + Len = iolist_size(Data), + case send(Sock, Data) of + ok -> + {ok, Len+Old}; + Else -> + Else + end. |