diff options
Diffstat (limited to 'lib/kernel')
-rw-r--r-- | lib/kernel/src/file.erl | 58 | ||||
-rw-r--r-- | lib/kernel/src/file_io_server.erl | 5 | ||||
-rw-r--r-- | lib/kernel/src/gen_tcp.erl | 153 | ||||
-rw-r--r-- | lib/kernel/test/gen_tcp_api_SUITE.erl | 194 |
4 files changed, 295 insertions, 115 deletions
diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index fba9a86e95..706c60caaf 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -41,7 +41,7 @@ pread/2, pread/3, pwrite/2, pwrite/3, read_line/1, position/2, truncate/1, datasync/1, sync/1, - copy/2, copy/3, sendfile/4, sendfile/2]). + copy/2, copy/3]). %% High level operations -export([consult/1, path_consult/2]). -export([eval/1, eval/2, path_eval/2, path_eval/3, path_open/3]). @@ -335,62 +335,6 @@ raw_write_file_info(Name, #file_info{} = Info) -> Error end. -%% sendfile/5 -%% TODO: add more guards? export sendfile/5? --spec sendfile(File, Sock, Offset, Bytes, ChunkSize) - -> {'ok', non_neg_integer()} | {'error', posix()} when - File::io_device(), Sock::port() | integer(), - Offset::non_neg_integer(), Bytes::non_neg_integer(), - ChunkSize::non_neg_integer(). -sendfile(File, Sock, Offset, Bytes, ChunkSize) when is_integer(Sock) - andalso is_pid(File) -> - R = file_request(File, {sendfile, Sock, Offset, Bytes, ChunkSize}), - wait_file_reply(File, R); -sendfile(File, Sock, Offset, Bytes, ChunkSize) when is_port(Sock) - andalso is_pid(File) -> - {ok, SockFD} = prim_inet:getfd(Sock), - sendfile(File, SockFD, Offset, Bytes, ChunkSize); -sendfile(#file_descriptor{module = Module} = Handle, Sock, - Offset, Bytes, ChunkSize) when is_integer(Sock) -> - Module:sendfile(Handle, Sock, Offset, Bytes, ChunkSize); -sendfile(#file_descriptor{module = _Module} = Handle, Sock, - Offset, Bytes, ChunkSize) when is_port(Sock) -> - {ok, SockFD} = prim_inet:getfd(Sock), - sendfile(Handle, SockFD, Offset, Bytes, ChunkSize); -sendfile(_, _, _, _, _) -> - {error, badarg}. - --define(SENDFILE_CHUNK_LIMIT, 2147483648). % 2GB - -%% Limit chunksize to work around 4 byte off_t/size_t limits -sendfile_chunksize(Bytes, Limit) -> - case Bytes >= Limit of - true -> Limit - 1; - false -> Bytes - end. - --spec sendfile(File, Sock, Offset, Bytes) - -> {'ok', non_neg_integer()} | {'error', posix()} when - File::io_device(), Sock::port() | integer(), - Offset::non_neg_integer(), Bytes::non_neg_integer(). -sendfile(File, Sock, Offset, Bytes) -> - ChunkSize = sendfile_chunksize(Bytes, ?SENDFILE_CHUNK_LIMIT), - sendfile(File, Sock, Offset, Bytes, ChunkSize). - -%% sendfile/2 -%% TODO: add guards? --spec sendfile(File, Sock) -> {'ok', non_neg_integer()} | {'error', posix()} - when File::name(), Sock::port(). -sendfile(File, Sock) -> - Offset = 0, - {ok, #file_info{size = Bytes}} = read_file_info(File), - %% TODO: use file:open/2 and file:read_file_info/1 instead of local calls? - {ok, Fd} = open(File, [read, raw, binary]), - ChunkSize = sendfile_chunksize(Bytes, ?SENDFILE_CHUNK_LIMIT), - Res = sendfile(Fd, Sock, Offset, Bytes, ChunkSize), - ok = close(Fd), - Res. - %%%----------------------------------------------------------------- %%% File io server functions. %%% They operate on a single open file. diff --git a/lib/kernel/src/file_io_server.erl b/lib/kernel/src/file_io_server.erl index 78d3d24394..7280635f53 100644 --- a/lib/kernel/src/file_io_server.erl +++ b/lib/kernel/src/file_io_server.erl @@ -249,9 +249,10 @@ file_request(close, file_request({position,At}, #state{handle=Handle,buf=Buf}=State) -> std_reply(position(Handle, At, Buf), State); -file_request({sendfile,DestFD,Offset,Bytes,ChunkSize}, +file_request({sendfile,DestSock,Offset,Bytes,Opts}, #state{handle=Handle}=State) -> - case ?PRIM_FILE:sendfile(Handle, DestFD, Offset, Bytes, ChunkSize) of + %% gen_tcp will call prim_file:sendfile with correct arguments + case gen_tcp:sendfile(Handle, DestSock, Offset, Bytes, Opts) of {error,_}=Reply -> {stop,normal,Reply,State}; Reply -> diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 8ab18c01b4..56eca4cda4 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -22,11 +22,12 @@ -export([connect/3, connect/4, listen/2, accept/1, accept/2, shutdown/2, close/1]). --export([send/2, recv/2, recv/3, unrecv/2]). +-export([send/2, recv/2, recv/3, unrecv/2, sendfile/2, sendfile/5]). -export([controlling_process/2]). -export([fdopen/2]). -include("inet_int.hrl"). +-include("file.hrl"). -type option() :: {active, true | false | once} | @@ -105,8 +106,13 @@ {tcp_module, module()} | option(). -type socket() :: port(). +-type sendfile_option() :: {chunk_size, non_neg_integer()} | + {headers, Hdrs :: list(iodata())} | + {trailers, Tlrs :: list(iodata())} | + sf_nodiskio | sf_mnowait | sf_sync. --export_type([option/0, option_name/0, connect_option/0, listen_option/0]). +-export_type([option/0, option_name/0, connect_option/0, listen_option/0, + sendfile_option/0]). %% %% Connect a socket @@ -305,6 +311,61 @@ unrecv(S, Data) when is_port(S) -> end. %% +%% Send data using sendfile +%% + +-define(MAX_CHUNK_SIZE, (1 bsl 30)*2-1). + +-spec sendfile(File, Sock, Offset, Bytes, Opts) -> + {'ok', non_neg_integer()} | {'error', inet:posix()} when + File :: file:io_device(), + Sock :: socket(), + Offset :: non_neg_integer() | undefined, + Bytes :: non_neg_integer() | undefined, + Opts :: [sendfile_option()]. +sendfile(File, Sock, Offset, Bytes, Opts) when is_pid(File) -> + Ref = erlang:monitor(process, File), + File ! {file_request,self(),File, + {sendfile,Sock,Offset,Bytes,Opts}}, + receive + {file_reply,File,Reply} -> + erlang:demonitor(Ref,[flush]), + Reply; + {'DOWN', Ref, _, _, _} -> + {error, terminated} + end; +sendfile(File, Sock, Offset, Bytes, []) -> + sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, undefined, undefined, + false, false, false); +sendfile(File, Sock, Offset, Bytes, Opts) -> + ChunkSize0 = proplists:get_value(chunk_size, Opts, ?MAX_CHUNK_SIZE), + ChunkSize = if ChunkSize0 > ?MAX_CHUNK_SIZE -> + ?MAX_CHUNK_SIZE; + true -> ChunkSize0 + end, + Headers = proplists:get_value(headers, Opts), + Trailers = proplists:get_value(trailers, Opts), + sendfile(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + lists:member(sf_nodiskio,Opts),lists:member(sf_mnowait,Opts), + lists:member(sf_sync,Opts)). + +%% sendfile/2 +-spec sendfile(File, Sock) -> + {'ok', non_neg_integer()} | {'error', inet:posix() | badarg} + when File :: file:name(), + Sock :: socket(). +sendfile(File, Sock) -> + case file:open(File, [read, raw, binary]) of + {error, Reason} -> + {error, Reason}; + {ok, Fd} -> + {ok, #file_info{size = Bytes}} = file:read_file_info(File), + Res = sendfile(Fd, Sock, 0, Bytes, []), + file:close(Fd), + Res + end. + +%% %% Set controlling process %% @@ -354,3 +415,91 @@ mod([_|Opts], Address) -> mod(Opts, Address); mod([], Address) -> mod(Address). + + +%% Internal sendfile functions +sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, + ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) + when is_port(Sock) -> + case Mod:sendfile(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + Nodiskio, MNowait, Sync) of + {error, enotsup} -> + sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers); + Else -> + Else + end; +sendfile(_,_,_,_,_,_,_,_,_,_) -> + {error, badarg}. + +%%% +%% Sendfile Fallback +%%% +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers) + when is_list(Headers) == false -> + case sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) of + {ok, BytesSent} when is_list(Trailers),is_integer(Headers) -> + sendfile_send(Sock, Trailers, BytesSent+Headers); + {ok, BytesSent} when is_list(Trailers) -> + sendfile_send(Sock, Trailers, BytesSent); + {ok, BytesSent} when is_integer(Headers) -> + {ok, BytesSent + Headers}; + Else -> + Else + end; +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) -> + case sendfile_send(Sock, Headers, 0) of + {ok, BytesSent} -> + sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, BytesSent, + Trailers); + Else -> + Else + end. + + +sendfile_fallback(File, Sock, undefined, Bytes, ChunkSize) -> + sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0); +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) -> + {ok, CurrPos} = file:position(File, 0), + {ok, _NewPos} = file:position(File, {bof, Offset}), + Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0), + file:position(File, {bof, CurrPos}), + Res. + + +sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> + {ok, BytesSent}; +sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) + when Bytes > BytesSent; Bytes == undefined -> + Size = if Bytes == undefined -> + ChunkSize; + (Bytes - BytesSent + ChunkSize) > 0 -> + Bytes - BytesSent; + true -> + ChunkSize + end, + case file:read(File, Size) of + {ok, Data} -> + case sendfile_send(Sock, Data, BytesSent) of + {ok,NewBytesSent} -> + sendfile_fallback_int( + File, Sock, Bytes, ChunkSize, + NewBytesSent); + Error -> + Error + end; + eof -> + {ok, BytesSent}; + Error -> + Error + end. + +sendfile_send(Sock, Data, Old) -> + Len = iolist_size(Data), + case send(Sock, Data) of + ok -> + {ok, Len+Old}; + Else -> + Else + end. diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index b622e3c5bc..492c60f521 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -22,8 +22,9 @@ %% are not tested here, because they are tested indirectly in this and %% and other test suites. --include_lib("test_server/include/test_server.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("kernel/include/inet.hrl"). +-include_lib("kernel/include/file.hrl"). -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2, @@ -32,20 +33,30 @@ t_connect_bad/1, t_recv_timeout/1, t_recv_eof/1, t_shutdown_write/1, t_shutdown_both/1, t_shutdown_error/1, - t_fdopen/1, t_implicit_inet6/1]). + t_fdopen/1, t_implicit_inet6/1, + t_sendfile/0, t_sendfile/1, t_sendfile_hdtl/1, t_sendfile_partial/1, + t_sendfile_offset/1]). + +-export([sendfile_server/1]). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> [{group, t_accept}, {group, t_connect}, {group, t_recv}, t_shutdown_write, t_shutdown_both, t_shutdown_error, - t_fdopen, t_implicit_inet6]. + t_fdopen, t_implicit_inet6,{group,t_sendfile}]. groups() -> [{t_accept, [], [t_accept_timeout]}, {t_connect, [], [t_connect_timeout, t_connect_bad]}, {t_recv, [], [t_recv_timeout, t_recv_eof]}, - {t_sendfile, [], [sendfile]}]. + {t_sendfile, [], [{group, t_sendfile_raw}, + {group, t_sendfile_ioserv}]}, + {t_sendfile_raw, [], sendfile_all()}, + {t_sendfile_ioserv, [], sendfile_all()}]. + +sendfile_all() -> + [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset]. init_per_suite(Config) -> Config. @@ -53,6 +64,16 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. +init_per_group(t_sendfile, Config) -> + Priv = ?config(priv_dir, Config), + Filename = filename:join(Priv, "sendfile_small.html"), + {ok, D} = file:open(Filename,[write]), + io:format(D,"yo baby yo",[]), + file:sync(D), + file:close(D), + [{small_file, Filename}|Config]; +init_per_group(t_sendfile_raw, Config) -> + [{file_opts, [raw]}|Config]; init_per_group(_GroupName, Config) -> Config. @@ -238,65 +259,137 @@ implicit_inet6(S, Addr) -> ?line ok = gen_tcp:close(S1). -sendfile() -> +t_sendfile() -> [{timetrap, {seconds, 5}}]. -sendfile_supported({unix,linux}) -> true; -sendfile_supported({unix,sunos}) -> true; -sendfile_supported({unix,freebsd}) -> true; -sendfile_supported({unix,dragonfly}) -> true; -sendfile_supported({unix,darwin}) -> true; -%% TODO: enable win32 once TransmitFile based implemenation written properly -%% sendfile_supported({win32,_}) -> true; -sendfile_supported(_) -> false. - -sendfile(Config) when is_list(Config) -> - case sendfile_supported(os:type()) of - true -> - ?line Data = ?config(data_dir, Config), - ?line Real = filename:join(Data, "realmen.html"), - Host = "localhost", - - %% TODO: find another way to test for {error, posix_error()}? - %% Disabled because with driver_select I cannot test for - %% invalid out_fd - %% ?line {error, Error} = file:sendfile(Real, -1), - %% ?line test_server:format("sendfile error = ~p", [Error]), - %% %% Unix ebadf, Windows eio - %% ?line true = Error =:= ebadf orelse Error =:= eio, - - ?line ok = sendfile_send(Host, Real); - false -> - {skip, "sendfile not supported on this platform"} - end. +t_sendfile(Config) when is_list(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = gen_tcp:sendfile(Filename, Sock), + Data + end, + + ok = sendfile_send(Send). + +t_sendfile_partial(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + SendSingle = fun(Sock) -> + {_Size, <<Data:5/binary,_/binary>>} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]), + file:close(D), + Data + end, + ok = sendfile_send(SendSingle), + + {_Size, <<FData:5/binary,SData:3/binary,_/binary>>} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + FSend = fun(Sock) -> + {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]), + FData + end, + + ok = sendfile_send(FSend), + + SSend = fun(Sock) -> + {ok,3} = gen_tcp:sendfile(D,Sock,undefined,3,[]), + SData + end, + + ok = sendfile_send(SSend), + file:close(D). + +t_sendfile_offset(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + Send = fun(Sock) -> + {_Size, <<_:5/binary,Data:3/binary,_/binary>> = AllData} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read,binary|FileOpts]), + {ok,3} = gen_tcp:sendfile(D,Sock,5,3,[]), + {ok, AllData} = file:read(D,100), + file:close(D), + Data + end, + ok = sendfile_send(Send). + + +t_sendfile_hdtl(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + Send = fun(Sock, Headers, Trailers, HdtlSize) -> + {Size, Data} = sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + AllSize = Size+HdtlSize, + {ok, AllSize} = gen_tcp:sendfile( + D, Sock,undefined,undefined, + [{headers,Headers}, + {trailers,Trailers}]), + file:close(D), + Data + end, + + SendHdTl = fun(Sock) -> + Headers = [<<"header1">>,"header2"], + Trailers = [<<"trailer1">>,"trailer2"], + D = Send(Sock,Headers,Trailers, + iolist_size([Headers,Trailers])), + iolist_to_binary([Headers,D,Trailers]) + end, + ok = sendfile_send(SendHdTl), + + SendHd = fun(Sock) -> + Headers = [<<"header1">>,"header2"], + D = Send(Sock,Headers,undefined, + iolist_size([Headers])), + iolist_to_binary([Headers,D]) + end, + ok = sendfile_send(SendHd), + + SendTl = fun(Sock) -> + Trailers = [<<"trailer1">>,"trailer2"], + D = Send(Sock,Trailers,undefined, + iolist_size([Trailers])), + iolist_to_binary([Trailers,D]) + end, + ok = sendfile_send(SendTl). -%% TODO: consolidate tests and reduce code -sendfile_send(Host, File) -> - {Size, _Md5} = FileInfo = sendfile_file_info(File), +%% TODO: consolidate tests and reduce code +sendfile_send(Send) -> + sendfile_send("localhost",Send). +sendfile_send(Host, Send) -> spawn_link(?MODULE, sendfile_server, [self()]), receive {server, Port} -> - ?line {ok, Sock} = gen_tcp:connect(Host, Port, + {ok, Sock} = gen_tcp:connect(Host, Port, [binary,{packet,0}]), - ?line {ok, Size} = file:sendfile(File, Sock), - ?line ok = gen_tcp:close(Sock), + Data = Send(Sock), + ok = gen_tcp:close(Sock), receive {ok, Bin} -> - ?line FileInfo = sendfile_bin_info(Bin), + Data = Bin, ok end end. sendfile_server(ClientPid) -> - ?line {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, + {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, {active, false}, {reuseaddr, true}]), - ?line {ok, Port} = inet:port(LSock), + {ok, Port} = inet:port(LSock), ClientPid ! {server, Port}, - ?line {ok, Sock} = gen_tcp:accept(LSock), - ?line {ok, Bin} = sendfile_do_recv(Sock, []), - ?line ok = gen_tcp:close(Sock), + {ok, Sock} = gen_tcp:accept(LSock), + {ok, Bin} = sendfile_do_recv(Sock, []), + ok = gen_tcp:close(Sock), ClientPid ! {ok, Bin}. -define(SENDFILE_TIMEOUT, 5000). @@ -306,20 +399,13 @@ sendfile_do_recv(Sock, Bs) -> {ok, B} -> sendfile_do_recv(Sock, [B|Bs]); {error, closed} -> - {ok, lists:reverse(Bs)} + {ok, iolist_to_binary(lists:reverse(Bs))} end. sendfile_file_info(File) -> {ok, #file_info{size = Size}} = file:read_file_info(File), {ok, Data} = file:read_file(File), - Md5 = erlang:md5(Data), - {Size, Md5}. - -sendfile_bin_info(Data) -> - Size = lists:foldl(fun(E,Sum) -> size(E) + Sum end, 0, Data), - Md5 = erlang:md5(Data), - {Size, Md5}. - + {Size, Data}. %%% Utilities |