From 195e1f19b06095f39a4fb0da46dfab2ec5b10e9a Mon Sep 17 00:00:00 2001 From: Tuncer Ayaz Date: Thu, 13 Jan 2011 12:36:14 +0100 Subject: Implement file:sendfile Allow Erlang code to use sendfile() where available by wrapping it as file:sendfile/4 and file:sendfile/2. sendfile(2) - Linux man page: "sendfile() copies data between one file descriptor and another. Because this copying is done within the kernel, sendfile() is more efficient than the combination of read(2) and write(2), which would require transferring data to and from user space." --- erts/configure.in | 16 +++ erts/emulator/drivers/common/efile_drv.c | 234 ++++++++++++++++++++++++++++++- erts/emulator/drivers/common/erl_efile.h | 2 + erts/emulator/drivers/unix/unix_efile.c | 44 ++++++ erts/emulator/drivers/win32/win_efile.c | 24 ++++ erts/preloaded/src/prim_file.erl | 19 ++- lib/kernel/doc/src/file.xml | 22 +++ lib/kernel/src/file.erl | 58 +++++++- lib/kernel/src/file_io_server.erl | 8 ++ lib/kernel/test/file_SUITE.erl | 87 +++++++++++- 10 files changed, 509 insertions(+), 5 deletions(-) diff --git a/erts/configure.in b/erts/configure.in index e3eb6034e6..548e4cc9d5 100644 --- a/erts/configure.in +++ b/erts/configure.in @@ -1690,6 +1690,22 @@ dnl fdatasync requires linking against -lrt on SunOS <= 5.10. dnl OpenSolaris 2009.06 is SunOS 5.11 and does not require -lrt. AC_SEARCH_LIBS(fdatasync, [rt]) + +dnl sendfile syscall +case $host_os in + linux*|freebsd*|dragonfly*|darwin*) + AC_CHECK_FUNCS([sendfile]) + ;; + solaris*) + AC_SEARCH_LIBS(sendfile, sendfile, AC_DEFINE(HAVE_SENDFILE, 1)) + ;; + win32) + LIBS="$LIBS -lmswsock" + ;; + *) + ;; +esac + dnl ---------------------------------------------------------------------- dnl Checks for library functions. dnl ---------------------------------------------------------------------- diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 901d98c09d..509c4fe48c 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -55,6 +55,7 @@ #define FILE_READ_LINE 29 #define FILE_FDATASYNC 30 #define FILE_FADVISE 31 +#define FILE_SENDFILE 32 /* Return codes */ @@ -217,6 +218,7 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -224,6 +226,7 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); +static void file_stop_select(ErlDrvEvent event, void* _); @@ -253,6 +256,18 @@ typedef struct { ErlDrvPDL q_mtx; /* Mutex for the driver queue, known by the emulator. Also used for mutual exclusion when accessing field(s) below. */ size_t write_buffered; + ErlDrvTermData caller; /* recipient of sync reply */ + /* sendfile call state to retry/resume on event */ + int command; /* same as d->command. for sendfile. TODO: this seems wrong */ + struct { + int eagain; + int out_fd; + /* TODO: Use Sint64 instead? What about 32-bit off_t linux */ + off_t offset; + size_t count; + size_t chunksize; + ErlDrvSInt64 written; + } sendfile; } file_descriptor; @@ -264,7 +279,7 @@ struct erl_drv_entry efile_driver_entry = { file_stop, file_output, NULL, - NULL, + file_ready_output, "efile", NULL, NULL, @@ -279,7 +294,9 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_EXTENDED_MAJOR_VERSION, ERL_DRV_EXTENDED_MINOR_VERSION, ERL_DRV_FLAG_USE_PORT_LOCKING, - NULL + NULL, + NULL, + file_stop_select }; @@ -613,6 +630,111 @@ static struct t_data *cq_deq(file_descriptor *desc) { } +/********************************************************************* + * Command queue functions + */ + +static ErlDrvTermData am_ok; +static ErlDrvTermData am_error; +static ErlDrvTermData am_efile_reply; + +#define INIT_ATOM(NAME) am_ ## NAME = driver_mk_atom(#NAME) + +#define LOAD_ATOM_CNT 2 +#define LOAD_ATOM(vec, i, atom) \ + (((vec)[(i)] = ERL_DRV_ATOM), \ + ((vec)[(i)+1] = (atom)), \ + ((i)+LOAD_ATOM_CNT)) + +#define LOAD_INT_CNT 2 +#define LOAD_INT(vec, i, val) \ + (((vec)[(i)] = ERL_DRV_INT), \ + ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ + ((i)+LOAD_INT_CNT)) + +#define LOAD_INT64_CNT 2 +#define LOAD_INT64(vec, i, val) \ + (((vec)[(i)] = ERL_DRV_INT64), \ + ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ + ((i)+LOAD_INT64_CNT)) + +#define LOAD_PORT_CNT 2 +#define LOAD_PORT(vec, i, port) \ + (((vec)[(i)] = ERL_DRV_PORT), \ + ((vec)[(i)+1] = (port)), \ + ((i)+LOAD_PORT_CNT)) + +#define LOAD_PID_CNT 2 +#define LOAD_PID(vec, i, pid) \ + (((vec)[(i)] = ERL_DRV_PID), \ + ((vec)[(i)+1] = (pid)), \ + ((i)+LOAD_PID_CNT)) + +#define LOAD_TUPLE_CNT 2 +#define LOAD_TUPLE(vec, i, size) \ + (((vec)[(i)] = ERL_DRV_TUPLE), \ + ((vec)[(i)+1] = (size)), \ + ((i)+LOAD_TUPLE_CNT)) + +/* send: +** {efile_reply, Pid, Port, {ok, int64()}} +*/ + +static int ef_send_ok_int64(file_descriptor *desc, ErlDrvTermData caller, + ErlDrvSInt64 *n) +{ + ErlDrvTermData spec[2*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT + + LOAD_INT64_CNT + 2*LOAD_TUPLE_CNT]; + int i = 0; + + i = LOAD_ATOM(spec, i, am_efile_reply); + i = LOAD_PID(spec, i, caller); + i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); + i = LOAD_ATOM(spec, i, am_ok); + i = LOAD_INT64(spec, i, n); + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 4); + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + return driver_send_term(desc->port, caller, spec, i); +} + +static ErlDrvTermData error_atom(int err) +{ + char errstr[256]; + char* s; + char* t; + + for (s = erl_errno_id(err), t = errstr; *s; s++, t++) + *t = tolower(*s); + *t = '\0'; + return driver_mk_atom(errstr); +} + +/* send: +** {efile_reply, Pid, Port, {error, posix_error()} +*/ + +static int ef_send_posix_error(file_descriptor *desc, ErlDrvTermData caller, + int e) +{ + ErlDrvTermData spec[3*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT + + 2*LOAD_TUPLE_CNT]; + int i = 0; + + i = LOAD_ATOM(spec, i, am_efile_reply); + i = LOAD_PID(spec, i, caller); + i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); + i = LOAD_ATOM(spec, i, am_error); + /* TODO: safe? set of error codes should be limited and safe */ + i = LOAD_ATOM(spec, i, error_atom(e)); + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 4); + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + desc->caller = 0; + return driver_send_term(desc->port, caller, spec, i); +} /********************************************************************* * Driver entry point -> init @@ -628,6 +750,11 @@ file_init(void) ? atoi(buf) : 0); driver_system_info(&sys_info, sizeof(ErlDrvSysInfo)); + + INIT_ATOM(ok); + INIT_ATOM(error); + INIT_ATOM(efile_reply); + return 0; } @@ -1694,6 +1821,74 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } + + +static void do_sendfile(file_descriptor *desc); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event) +{ + file_descriptor* d = (file_descriptor*) data; + + switch (d->command) { + case FILE_SENDFILE: + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_WRITE, 0); + do_sendfile(d); + break; + default: + break; + } +} + +static void file_stop_select(ErlDrvEvent event, void* _) +{ + /* TODO: close socket? */ +} + +static void invoke_sendfile(void *data) +{ + ((struct t_data *)data)->again = 0; +} + +static void do_sendfile(file_descriptor *d) +{ + int fd = d->fd; + int out_fd = d->sendfile.out_fd; + off_t offset = d->sendfile.offset; + size_t count = d->sendfile.count; + size_t chunksize = count < d->sendfile.chunksize + ? count : d->sendfile.chunksize; + int result_ok = 0; + Efile_error errInfo; + + result_ok = efile_sendfile(&errInfo, fd, out_fd, &offset, &chunksize); + + if (result_ok) { + d->sendfile.offset += chunksize; + d->sendfile.written += chunksize; + d->sendfile.count -= chunksize; + if (d->sendfile.count > 0) { + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + } else { + printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain); + ef_send_ok_int64(d, d->caller, &d->sendfile.written); + } + } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) { + if (chunksize > 0) { + d->sendfile.offset += chunksize; + d->sendfile.written += chunksize; + d->sendfile.count -= chunksize; + } + d->sendfile.eagain++; + + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + } else { + printf("==> sendfile ERROR %s\n", erl_errno_id(errInfo.posix_errno)); + ef_send_posix_error(d, d->caller, errInfo.posix_errno); + } +} + static void free_readdir(void *data) { struct t_data *d = (struct t_data *) data; @@ -2105,6 +2300,13 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } free_preadv(data); break; + case FILE_SENDFILE: + /* Return 'ok' and let prim_file:sendfile wait for message */ + reply_ok(desc); + driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + free_data(data); + break; default: abort(); } @@ -2452,6 +2654,34 @@ file_output(ErlDrvData e, char* buf, int count) goto done; } + case FILE_SENDFILE: + { + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = free_data; + d->level = 2; + desc->sendfile.out_fd = get_int32((uchar*) buf); + /* TODO: are off_t and size_t 64bit on all platforms? + off_t is 32bit on win32 msvc. maybe configurable in msvc. + Maybe use '#if SIZEOF_SIZE_T == 4'? */ + desc->sendfile.offset = get_int64(((uchar*) buf) + + sizeof(Sint32)); + desc->sendfile.count = get_int64(((uchar*) buf) + + sizeof(Sint32) + + sizeof(Sint64)); + desc->sendfile.chunksize = get_int64(((uchar*) buf) + + sizeof(Sint32) + + 2*sizeof(Sint64)); + desc->sendfile.written = 0; + desc->sendfile.eagain = 0; + /* TODO: shouldn't d->command be enough? */ + desc->command = command; + desc->caller = driver_caller(desc->port); + goto done; + } + } /* diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 3097ded3f1..3c6c2ec2db 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,3 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); +int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, + size_t *count); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 4b3934657c..5b001b3819 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -33,6 +33,9 @@ #include #include #endif +#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) +#include +#endif #if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__) #define DARWIN 1 @@ -1464,3 +1467,44 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, return check_error(0, errInfo); #endif } + +#ifdef HAVE_SENDFILE +int +efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *count) +{ +#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) + ssize_t retval = sendfile(out_fd, in_fd, offset, *count); + if (retval >= 0) { + if (retval != *count) { + *count = retval; + retval = -1; + errno = EAGAIN; + } else { + *count = retval; + } + } else if (retval == -1 && (errno == EINTR || errno == EAGAIN)) { + *count = 0; + } + return check_error(retval == -1 ? -1 : 0, errInfo); +#elif defined(DARWIN) + off_t len = *count; + int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); + *count = len; + return check_error(retval, errInfo); +#elif defined(__FreeBSD__) || defined(__DragonFly__) + off_t len = 0; + int retval = sendfile(in_fd, out_fd, *offset, *count, NULL, &len, 0); + *count = len; + return check_error(retval, errInfo); +#endif +} +#else /* no sendfile() */ +int +efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *count) +{ + errno = ENOTSUP; + return check_error(-1, errInfo); +} +#endif diff --git a/erts/emulator/drivers/win32/win_efile.c b/erts/emulator/drivers/win32/win_efile.c index 931bb196f1..0f41a09bf6 100644 --- a/erts/emulator/drivers/win32/win_efile.c +++ b/erts/emulator/drivers/win32/win_efile.c @@ -1581,3 +1581,27 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, errno = ERROR_SUCCESS; return check_error(0, errInfo); } + +int +efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *count) +{ + /* TODO: write proper Windows TransmitFile based implementation */ + /* use overlapped I/O and driver_select on the structure? */ + /* int res = efile_seek(errInfo, in_fd, *offset, EFILE_SEEK_SET, NULL); */ + /* if (res) { */ + /* /\* TODO: could in_fd be shared and require protecting/locking */ + /* efile_seek/SetFilePointerEx? *\/ */ + /* if (TransmitFile((SOCKET) out_fd, (HANDLE) in_fd, *count, */ + /* 0, NULL, NULL, 0)) { */ + /* return check_error(0, errInfo); */ + /* } else { */ + /* /\* TODO: correct error handling? *\/ */ + /* return set_error(errInfo); */ + /* } */ + /* } else { */ + /* return res; */ + /* } */ + errno = ENOTSUP; + return check_error(-1, errInfo); +} diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index 30b7a5246a..f3f977a30b 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -26,7 +26,8 @@ %% Generic file contents operations -export([open/2, close/1, datasync/1, sync/1, advise/4, position/2, truncate/1, - write/2, pwrite/2, pwrite/3, read/2, read_line/1, pread/2, pread/3, copy/3]). + write/2, pwrite/2, pwrite/3, read/2, read_line/1, pread/2, pread/3, + copy/3, sendfile/5]). %% Specialized file operations -export([open/1, open/3]). @@ -98,6 +99,7 @@ -define(FILE_READ_LINE, 29). -define(FILE_FDATASYNC, 30). -define(FILE_ADVISE, 31). +-define(FILE_SENDFILE, 32). %% Driver responses -define(FILE_RESP_OK, 0). @@ -539,6 +541,21 @@ write_file(_, _) -> {error, badarg}. +%% Returns {error, Reason} | {ok, BytesCopied} +sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, + DestFD, Offset, Bytes, ChunkSize) -> + ok = drv_command(Port, <>), + Self = self(), + %% Should we use a ref()? + receive + {efile_reply, Self, Port, {ok, _Written}=OKRes}-> + OKRes; + {efile_reply, Self, Port, {error, _PosixError}=Error}-> + Error; + Unexpected -> + Unexpected + end. %%%----------------------------------------------------------------- %%% Functions operating on files without handle to the file. ?DRV. diff --git a/lib/kernel/doc/src/file.xml b/lib/kernel/doc/src/file.xml index 7db20e6343..78bf0aff45 100644 --- a/lib/kernel/doc/src/file.xml +++ b/lib/kernel/doc/src/file.xml @@ -1573,6 +1573,28 @@ otherwise {error, Reason}.

+ + + send a file to a socket + +

Sends Bytes in from the file + referenced by IoDevice beginning at Offset to + Socket. + Returns {ok, BytesSent} if successful, + otherwise {error, Reason}.

+

Available on Linux, FreeBSD, DragonflyBSD, Solaris, Darwin and Windows

+
+
+ + + send a file to a socket + +

Sends the file Filename to Socket. + Returns {ok, BytesSent} if successful, + otherwise {error, Reason}.

+

Available on Linux, FreeBSD, DragonflyBSD, Solaris, Darwin and Windows

+
+
Write to a file diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index 706c60caaf..fba9a86e95 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -41,7 +41,7 @@ pread/2, pread/3, pwrite/2, pwrite/3, read_line/1, position/2, truncate/1, datasync/1, sync/1, - copy/2, copy/3]). + copy/2, copy/3, sendfile/4, sendfile/2]). %% High level operations -export([consult/1, path_consult/2]). -export([eval/1, eval/2, path_eval/2, path_eval/3, path_open/3]). @@ -335,6 +335,62 @@ raw_write_file_info(Name, #file_info{} = Info) -> Error end. +%% sendfile/5 +%% TODO: add more guards? export sendfile/5? +-spec sendfile(File, Sock, Offset, Bytes, ChunkSize) + -> {'ok', non_neg_integer()} | {'error', posix()} when + File::io_device(), Sock::port() | integer(), + Offset::non_neg_integer(), Bytes::non_neg_integer(), + ChunkSize::non_neg_integer(). +sendfile(File, Sock, Offset, Bytes, ChunkSize) when is_integer(Sock) + andalso is_pid(File) -> + R = file_request(File, {sendfile, Sock, Offset, Bytes, ChunkSize}), + wait_file_reply(File, R); +sendfile(File, Sock, Offset, Bytes, ChunkSize) when is_port(Sock) + andalso is_pid(File) -> + {ok, SockFD} = prim_inet:getfd(Sock), + sendfile(File, SockFD, Offset, Bytes, ChunkSize); +sendfile(#file_descriptor{module = Module} = Handle, Sock, + Offset, Bytes, ChunkSize) when is_integer(Sock) -> + Module:sendfile(Handle, Sock, Offset, Bytes, ChunkSize); +sendfile(#file_descriptor{module = _Module} = Handle, Sock, + Offset, Bytes, ChunkSize) when is_port(Sock) -> + {ok, SockFD} = prim_inet:getfd(Sock), + sendfile(Handle, SockFD, Offset, Bytes, ChunkSize); +sendfile(_, _, _, _, _) -> + {error, badarg}. + +-define(SENDFILE_CHUNK_LIMIT, 2147483648). % 2GB + +%% Limit chunksize to work around 4 byte off_t/size_t limits +sendfile_chunksize(Bytes, Limit) -> + case Bytes >= Limit of + true -> Limit - 1; + false -> Bytes + end. + +-spec sendfile(File, Sock, Offset, Bytes) + -> {'ok', non_neg_integer()} | {'error', posix()} when + File::io_device(), Sock::port() | integer(), + Offset::non_neg_integer(), Bytes::non_neg_integer(). +sendfile(File, Sock, Offset, Bytes) -> + ChunkSize = sendfile_chunksize(Bytes, ?SENDFILE_CHUNK_LIMIT), + sendfile(File, Sock, Offset, Bytes, ChunkSize). + +%% sendfile/2 +%% TODO: add guards? +-spec sendfile(File, Sock) -> {'ok', non_neg_integer()} | {'error', posix()} + when File::name(), Sock::port(). +sendfile(File, Sock) -> + Offset = 0, + {ok, #file_info{size = Bytes}} = read_file_info(File), + %% TODO: use file:open/2 and file:read_file_info/1 instead of local calls? + {ok, Fd} = open(File, [read, raw, binary]), + ChunkSize = sendfile_chunksize(Bytes, ?SENDFILE_CHUNK_LIMIT), + Res = sendfile(Fd, Sock, Offset, Bytes, ChunkSize), + ok = close(Fd), + Res. + %%%----------------------------------------------------------------- %%% File io server functions. %%% They operate on a single open file. diff --git a/lib/kernel/src/file_io_server.erl b/lib/kernel/src/file_io_server.erl index 14da9c1a55..78d3d24394 100644 --- a/lib/kernel/src/file_io_server.erl +++ b/lib/kernel/src/file_io_server.erl @@ -249,6 +249,14 @@ file_request(close, file_request({position,At}, #state{handle=Handle,buf=Buf}=State) -> std_reply(position(Handle, At, Buf), State); +file_request({sendfile,DestFD,Offset,Bytes,ChunkSize}, + #state{handle=Handle}=State) -> + case ?PRIM_FILE:sendfile(Handle, DestFD, Offset, Bytes, ChunkSize) of + {error,_}=Reply -> + {stop,normal,Reply,State}; + Reply -> + {reply,Reply,State} + end; file_request(truncate, #state{handle=Handle}=State) -> case ?PRIM_FILE:truncate(Handle) of diff --git a/lib/kernel/test/file_SUITE.erl b/lib/kernel/test/file_SUITE.erl index 77fc7e73f9..67458ae77d 100644 --- a/lib/kernel/test/file_SUITE.erl +++ b/lib/kernel/test/file_SUITE.erl @@ -86,6 +86,8 @@ -export([standard_io/1,mini_server/1]). +-export([sendfile/0, sendfile/1, sendfile_server/1]). + %% Debug exports -export([create_file_slow/2, create_file/2, create_bin/2]). -export([verify_file/2, verify_bin/3]). @@ -107,7 +109,7 @@ all() -> delayed_write, read_ahead, segment_read, segment_write, ipread, pid2name, interleaved_read_write, otp_5814, large_file, read_line_1, read_line_2, read_line_3, - read_line_4, standard_io]. + read_line_4, standard_io, sendfile]. groups() -> [{dirs, [], [make_del_dir, cur_dir_0, cur_dir_1]}, @@ -3950,3 +3952,86 @@ flush(Msgs) -> after 0 -> lists:reverse(Msgs) end. + + +sendfile() -> + [{timetrap, {seconds, 5}}]. + +sendfile_supported({unix,linux}) -> true; +sendfile_supported({unix,sunos}) -> true; +sendfile_supported({unix,freebsd}) -> true; +sendfile_supported({unix,dragonfly}) -> true; +sendfile_supported({unix,darwin}) -> true; +%% TODO: enable win32 once TransmitFile based implemenation written properly +%% sendfile_supported({win32,_}) -> true; +sendfile_supported(_) -> false. + +sendfile(Config) when is_list(Config) -> + case sendfile_supported(os:type()) of + true -> + ?line Data = ?config(data_dir, Config), + ?line Real = filename:join(Data, "realmen.html"), + Host = "localhost", + + %% TODO: find another way to test for {error, posix_error()}? + %% Disabled because with driver_select I cannot test for + %% invalid out_fd + %% ?line {error, Error} = file:sendfile(Real, -1), + %% ?line test_server:format("sendfile error = ~p", [Error]), + %% %% Unix ebadf, Windows eio + %% ?line true = Error =:= ebadf orelse Error =:= eio, + + ?line ok = sendfile_send(Host, Real); + false -> + {skip, "sendfile not supported on this platform"} + end. + +%% TODO: consolidate tests and reduce code + +sendfile_send(Host, File) -> + {Size, _Md5} = FileInfo = sendfile_file_info(File), + spawn_link(?MODULE, sendfile_server, [self()]), + receive + {server, Port} -> + ?line {ok, Sock} = gen_tcp:connect(Host, Port, + [binary,{packet,0}]), + ?line {ok, Size} = file:sendfile(File, Sock), + ?line ok = gen_tcp:close(Sock), + receive + {ok, Bin} -> + ?line FileInfo = sendfile_bin_info(Bin), + ok + end + end. + +sendfile_server(ClientPid) -> + ?line {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, + {active, false}, + {reuseaddr, true}]), + ?line {ok, Port} = inet:port(LSock), + ClientPid ! {server, Port}, + ?line {ok, Sock} = gen_tcp:accept(LSock), + ?line {ok, Bin} = sendfile_do_recv(Sock, []), + ?line ok = gen_tcp:close(Sock), + ClientPid ! {ok, Bin}. + +-define(SENDFILE_TIMEOUT, 5000). + +sendfile_do_recv(Sock, Bs) -> + case gen_tcp:recv(Sock, 0, ?SENDFILE_TIMEOUT) of + {ok, B} -> + sendfile_do_recv(Sock, [B|Bs]); + {error, closed} -> + {ok, lists:reverse(Bs)} + end. + +sendfile_file_info(File) -> + {ok, #file_info{size = Size}} = file:read_file_info(File), + {ok, Data} = file:read_file(File), + Md5 = erlang:md5(Data), + {Size, Md5}. + +sendfile_bin_info(Data) -> + Size = lists:foldl(fun(E,Sum) -> size(E) + Sum end, 0, Data), + Md5 = erlang:md5(Data), + {Size, Md5}. -- cgit v1.2.3 From 8beda283543ca89052a5e7ca6491345cd9916eff Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Thu, 1 Dec 2011 14:09:37 +0100 Subject: Move sendfile tests to gen_tcp_api_SUITE --- lib/kernel/test/file_SUITE.erl | 87 +---------------------------------- lib/kernel/test/gen_tcp_api_SUITE.erl | 86 +++++++++++++++++++++++++++++++++- 2 files changed, 86 insertions(+), 87 deletions(-) diff --git a/lib/kernel/test/file_SUITE.erl b/lib/kernel/test/file_SUITE.erl index 67458ae77d..77fc7e73f9 100644 --- a/lib/kernel/test/file_SUITE.erl +++ b/lib/kernel/test/file_SUITE.erl @@ -86,8 +86,6 @@ -export([standard_io/1,mini_server/1]). --export([sendfile/0, sendfile/1, sendfile_server/1]). - %% Debug exports -export([create_file_slow/2, create_file/2, create_bin/2]). -export([verify_file/2, verify_bin/3]). @@ -109,7 +107,7 @@ all() -> delayed_write, read_ahead, segment_read, segment_write, ipread, pid2name, interleaved_read_write, otp_5814, large_file, read_line_1, read_line_2, read_line_3, - read_line_4, standard_io, sendfile]. + read_line_4, standard_io]. groups() -> [{dirs, [], [make_del_dir, cur_dir_0, cur_dir_1]}, @@ -3952,86 +3950,3 @@ flush(Msgs) -> after 0 -> lists:reverse(Msgs) end. - - -sendfile() -> - [{timetrap, {seconds, 5}}]. - -sendfile_supported({unix,linux}) -> true; -sendfile_supported({unix,sunos}) -> true; -sendfile_supported({unix,freebsd}) -> true; -sendfile_supported({unix,dragonfly}) -> true; -sendfile_supported({unix,darwin}) -> true; -%% TODO: enable win32 once TransmitFile based implemenation written properly -%% sendfile_supported({win32,_}) -> true; -sendfile_supported(_) -> false. - -sendfile(Config) when is_list(Config) -> - case sendfile_supported(os:type()) of - true -> - ?line Data = ?config(data_dir, Config), - ?line Real = filename:join(Data, "realmen.html"), - Host = "localhost", - - %% TODO: find another way to test for {error, posix_error()}? - %% Disabled because with driver_select I cannot test for - %% invalid out_fd - %% ?line {error, Error} = file:sendfile(Real, -1), - %% ?line test_server:format("sendfile error = ~p", [Error]), - %% %% Unix ebadf, Windows eio - %% ?line true = Error =:= ebadf orelse Error =:= eio, - - ?line ok = sendfile_send(Host, Real); - false -> - {skip, "sendfile not supported on this platform"} - end. - -%% TODO: consolidate tests and reduce code - -sendfile_send(Host, File) -> - {Size, _Md5} = FileInfo = sendfile_file_info(File), - spawn_link(?MODULE, sendfile_server, [self()]), - receive - {server, Port} -> - ?line {ok, Sock} = gen_tcp:connect(Host, Port, - [binary,{packet,0}]), - ?line {ok, Size} = file:sendfile(File, Sock), - ?line ok = gen_tcp:close(Sock), - receive - {ok, Bin} -> - ?line FileInfo = sendfile_bin_info(Bin), - ok - end - end. - -sendfile_server(ClientPid) -> - ?line {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, - {active, false}, - {reuseaddr, true}]), - ?line {ok, Port} = inet:port(LSock), - ClientPid ! {server, Port}, - ?line {ok, Sock} = gen_tcp:accept(LSock), - ?line {ok, Bin} = sendfile_do_recv(Sock, []), - ?line ok = gen_tcp:close(Sock), - ClientPid ! {ok, Bin}. - --define(SENDFILE_TIMEOUT, 5000). - -sendfile_do_recv(Sock, Bs) -> - case gen_tcp:recv(Sock, 0, ?SENDFILE_TIMEOUT) of - {ok, B} -> - sendfile_do_recv(Sock, [B|Bs]); - {error, closed} -> - {ok, lists:reverse(Bs)} - end. - -sendfile_file_info(File) -> - {ok, #file_info{size = Size}} = file:read_file_info(File), - {ok, Data} = file:read_file(File), - Md5 = erlang:md5(Data), - {Size, Md5}. - -sendfile_bin_info(Data) -> - Size = lists:foldl(fun(E,Sum) -> size(E) + Sum end, 0, Data), - Md5 = erlang:md5(Data), - {Size, Md5}. diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index cbaec2d6dd..b622e3c5bc 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -44,7 +44,8 @@ all() -> groups() -> [{t_accept, [], [t_accept_timeout]}, {t_connect, [], [t_connect_timeout, t_connect_bad]}, - {t_recv, [], [t_recv_timeout, t_recv_eof]}]. + {t_recv, [], [t_recv_timeout, t_recv_eof]}, + {t_sendfile, [], [sendfile]}]. init_per_suite(Config) -> Config. @@ -237,6 +238,89 @@ implicit_inet6(S, Addr) -> ?line ok = gen_tcp:close(S1). +sendfile() -> + [{timetrap, {seconds, 5}}]. + +sendfile_supported({unix,linux}) -> true; +sendfile_supported({unix,sunos}) -> true; +sendfile_supported({unix,freebsd}) -> true; +sendfile_supported({unix,dragonfly}) -> true; +sendfile_supported({unix,darwin}) -> true; +%% TODO: enable win32 once TransmitFile based implemenation written properly +%% sendfile_supported({win32,_}) -> true; +sendfile_supported(_) -> false. + +sendfile(Config) when is_list(Config) -> + case sendfile_supported(os:type()) of + true -> + ?line Data = ?config(data_dir, Config), + ?line Real = filename:join(Data, "realmen.html"), + Host = "localhost", + + %% TODO: find another way to test for {error, posix_error()}? + %% Disabled because with driver_select I cannot test for + %% invalid out_fd + %% ?line {error, Error} = file:sendfile(Real, -1), + %% ?line test_server:format("sendfile error = ~p", [Error]), + %% %% Unix ebadf, Windows eio + %% ?line true = Error =:= ebadf orelse Error =:= eio, + + ?line ok = sendfile_send(Host, Real); + false -> + {skip, "sendfile not supported on this platform"} + end. + +%% TODO: consolidate tests and reduce code + +sendfile_send(Host, File) -> + {Size, _Md5} = FileInfo = sendfile_file_info(File), + spawn_link(?MODULE, sendfile_server, [self()]), + receive + {server, Port} -> + ?line {ok, Sock} = gen_tcp:connect(Host, Port, + [binary,{packet,0}]), + ?line {ok, Size} = file:sendfile(File, Sock), + ?line ok = gen_tcp:close(Sock), + receive + {ok, Bin} -> + ?line FileInfo = sendfile_bin_info(Bin), + ok + end + end. + +sendfile_server(ClientPid) -> + ?line {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, + {active, false}, + {reuseaddr, true}]), + ?line {ok, Port} = inet:port(LSock), + ClientPid ! {server, Port}, + ?line {ok, Sock} = gen_tcp:accept(LSock), + ?line {ok, Bin} = sendfile_do_recv(Sock, []), + ?line ok = gen_tcp:close(Sock), + ClientPid ! {ok, Bin}. + +-define(SENDFILE_TIMEOUT, 5000). + +sendfile_do_recv(Sock, Bs) -> + case gen_tcp:recv(Sock, 0, ?SENDFILE_TIMEOUT) of + {ok, B} -> + sendfile_do_recv(Sock, [B|Bs]); + {error, closed} -> + {ok, lists:reverse(Bs)} + end. + +sendfile_file_info(File) -> + {ok, #file_info{size = Size}} = file:read_file_info(File), + {ok, Data} = file:read_file(File), + Md5 = erlang:md5(Data), + {Size, Md5}. + +sendfile_bin_info(Data) -> + Size = lists:foldl(fun(E,Sum) -> size(E) + Sum end, 0, Data), + Md5 = erlang:md5(Data), + {Size, Md5}. + + %%% Utilities -- cgit v1.2.3 From 5ba916ef7ac71bd1e7e23b4c87ae6a472f14fd6c Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 12:03:58 +0100 Subject: Create erlang fallback for sendfile Created erlang fallback for sendfile in gen_tcp and moved sendfile from file to gen_tcp. Also created testcases for testing all different options to sendfile. For info about how sendfile should work see the BSD man pages as they contain a more complete API than other *nixes. --- erts/preloaded/src/prim_file.erl | 40 +++++-- erts/preloaded/src/prim_inet.erl | 25 ++++- lib/kernel/src/file.erl | 58 +--------- lib/kernel/src/file_io_server.erl | 5 +- lib/kernel/src/gen_tcp.erl | 153 ++++++++++++++++++++++++++- lib/kernel/test/gen_tcp_api_SUITE.erl | 194 ++++++++++++++++++++++++---------- 6 files changed, 353 insertions(+), 122 deletions(-) diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index f3f977a30b..606d7d5aab 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -27,7 +27,7 @@ %% Generic file contents operations -export([open/2, close/1, datasync/1, sync/1, advise/4, position/2, truncate/1, write/2, pwrite/2, pwrite/3, read/2, read_line/1, pread/2, pread/3, - copy/3, sendfile/5]). + copy/3, sendfile/10]). %% Specialized file operations -export([open/1, open/3]). @@ -542,21 +542,49 @@ write_file(_, _) -> %% Returns {error, Reason} | {ok, BytesCopied} +sendfile(_,_,_,_,_,_,_,_,_,_) -> + {error, enotsup}; sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, - DestFD, Offset, Bytes, ChunkSize) -> + DestFD, Offset, Bytes, ChunkSize, Headers, Trailers, + Nodiskio, MNowait, Sync) -> + ok = drv_command(Port, <>), + ChunkSize:64, + (get_bit(Nodiskio)):1, + (get_bit(MNowait)):1, + (get_bit(Sync)):1,0:5, + (encode_hdtl(Headers))/binary, + (encode_hdtl(Trailers))/binary>>), Self = self(), %% Should we use a ref()? receive {efile_reply, Self, Port, {ok, _Written}=OKRes}-> OKRes; {efile_reply, Self, Port, {error, _PosixError}=Error}-> - Error; - Unexpected -> - Unexpected + Error end. +get_bit(true) -> + 1; +get_bit(false) -> + 0. + +encode_hdtl(undefined) -> + <<0>>; +encode_hdtl([]) -> + <<0>>; +encode_hdtl(List) -> + encode_hdtl(List,<<>>,0). + +encode_hdtl([], Acc, Cnt) -> + <>; +encode_hdtl([Bin|T], Acc, Cnt) -> + encode_hdtl(T, <<(byte_size(Bin)):32, Bin/binary, Acc/binary>>,Cnt + 1). + + + + + %%%----------------------------------------------------------------- %%% Functions operating on files without handle to the file. ?DRV. %%% diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl index f144f73d68..015930c0c0 100644 --- a/erts/preloaded/src/prim_inet.erl +++ b/erts/preloaded/src/prim_inet.erl @@ -36,7 +36,8 @@ -export([recvfrom/2, recvfrom/3]). -export([setopt/3, setopts/2, getopt/2, getopts/2, is_sockopt_val/2]). -export([chgopt/3, chgopts/2]). --export([getstat/2, getfd/1, getindex/1, getstatus/1, gettype/1, +-export([getstat/2, getfd/1, stealfd/1, returnfd/1, + getindex/1, getstatus/1, gettype/1, getifaddrs/1, getiflist/1, ifget/3, ifset/3, gethostname/1]). -export([getservbyname/3, getservbyport/3]). @@ -840,6 +841,28 @@ getfd(S) when is_port(S) -> {error,_}=Error -> Error end. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% +%% STEALFD(insock()) -> {ok,integer()} | {error, Reason} +%% +%% steal internal file descriptor +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +stealfd(S) when is_port(S) -> + getfd(S). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% +%% RETURNFD(insock()) -> {ok,integer()} | {error, Reason} +%% +%% return internal file descriptor +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +returnfd(S) when is_port(S) -> + ok. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% GETIX(insock()) -> {ok,integer()} | {error, Reason} diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index fba9a86e95..706c60caaf 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -41,7 +41,7 @@ pread/2, pread/3, pwrite/2, pwrite/3, read_line/1, position/2, truncate/1, datasync/1, sync/1, - copy/2, copy/3, sendfile/4, sendfile/2]). + copy/2, copy/3]). %% High level operations -export([consult/1, path_consult/2]). -export([eval/1, eval/2, path_eval/2, path_eval/3, path_open/3]). @@ -335,62 +335,6 @@ raw_write_file_info(Name, #file_info{} = Info) -> Error end. -%% sendfile/5 -%% TODO: add more guards? export sendfile/5? --spec sendfile(File, Sock, Offset, Bytes, ChunkSize) - -> {'ok', non_neg_integer()} | {'error', posix()} when - File::io_device(), Sock::port() | integer(), - Offset::non_neg_integer(), Bytes::non_neg_integer(), - ChunkSize::non_neg_integer(). -sendfile(File, Sock, Offset, Bytes, ChunkSize) when is_integer(Sock) - andalso is_pid(File) -> - R = file_request(File, {sendfile, Sock, Offset, Bytes, ChunkSize}), - wait_file_reply(File, R); -sendfile(File, Sock, Offset, Bytes, ChunkSize) when is_port(Sock) - andalso is_pid(File) -> - {ok, SockFD} = prim_inet:getfd(Sock), - sendfile(File, SockFD, Offset, Bytes, ChunkSize); -sendfile(#file_descriptor{module = Module} = Handle, Sock, - Offset, Bytes, ChunkSize) when is_integer(Sock) -> - Module:sendfile(Handle, Sock, Offset, Bytes, ChunkSize); -sendfile(#file_descriptor{module = _Module} = Handle, Sock, - Offset, Bytes, ChunkSize) when is_port(Sock) -> - {ok, SockFD} = prim_inet:getfd(Sock), - sendfile(Handle, SockFD, Offset, Bytes, ChunkSize); -sendfile(_, _, _, _, _) -> - {error, badarg}. - --define(SENDFILE_CHUNK_LIMIT, 2147483648). % 2GB - -%% Limit chunksize to work around 4 byte off_t/size_t limits -sendfile_chunksize(Bytes, Limit) -> - case Bytes >= Limit of - true -> Limit - 1; - false -> Bytes - end. - --spec sendfile(File, Sock, Offset, Bytes) - -> {'ok', non_neg_integer()} | {'error', posix()} when - File::io_device(), Sock::port() | integer(), - Offset::non_neg_integer(), Bytes::non_neg_integer(). -sendfile(File, Sock, Offset, Bytes) -> - ChunkSize = sendfile_chunksize(Bytes, ?SENDFILE_CHUNK_LIMIT), - sendfile(File, Sock, Offset, Bytes, ChunkSize). - -%% sendfile/2 -%% TODO: add guards? --spec sendfile(File, Sock) -> {'ok', non_neg_integer()} | {'error', posix()} - when File::name(), Sock::port(). -sendfile(File, Sock) -> - Offset = 0, - {ok, #file_info{size = Bytes}} = read_file_info(File), - %% TODO: use file:open/2 and file:read_file_info/1 instead of local calls? - {ok, Fd} = open(File, [read, raw, binary]), - ChunkSize = sendfile_chunksize(Bytes, ?SENDFILE_CHUNK_LIMIT), - Res = sendfile(Fd, Sock, Offset, Bytes, ChunkSize), - ok = close(Fd), - Res. - %%%----------------------------------------------------------------- %%% File io server functions. %%% They operate on a single open file. diff --git a/lib/kernel/src/file_io_server.erl b/lib/kernel/src/file_io_server.erl index 78d3d24394..7280635f53 100644 --- a/lib/kernel/src/file_io_server.erl +++ b/lib/kernel/src/file_io_server.erl @@ -249,9 +249,10 @@ file_request(close, file_request({position,At}, #state{handle=Handle,buf=Buf}=State) -> std_reply(position(Handle, At, Buf), State); -file_request({sendfile,DestFD,Offset,Bytes,ChunkSize}, +file_request({sendfile,DestSock,Offset,Bytes,Opts}, #state{handle=Handle}=State) -> - case ?PRIM_FILE:sendfile(Handle, DestFD, Offset, Bytes, ChunkSize) of + %% gen_tcp will call prim_file:sendfile with correct arguments + case gen_tcp:sendfile(Handle, DestSock, Offset, Bytes, Opts) of {error,_}=Reply -> {stop,normal,Reply,State}; Reply -> diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 8ab18c01b4..56eca4cda4 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -22,11 +22,12 @@ -export([connect/3, connect/4, listen/2, accept/1, accept/2, shutdown/2, close/1]). --export([send/2, recv/2, recv/3, unrecv/2]). +-export([send/2, recv/2, recv/3, unrecv/2, sendfile/2, sendfile/5]). -export([controlling_process/2]). -export([fdopen/2]). -include("inet_int.hrl"). +-include("file.hrl"). -type option() :: {active, true | false | once} | @@ -105,8 +106,13 @@ {tcp_module, module()} | option(). -type socket() :: port(). +-type sendfile_option() :: {chunk_size, non_neg_integer()} | + {headers, Hdrs :: list(iodata())} | + {trailers, Tlrs :: list(iodata())} | + sf_nodiskio | sf_mnowait | sf_sync. --export_type([option/0, option_name/0, connect_option/0, listen_option/0]). +-export_type([option/0, option_name/0, connect_option/0, listen_option/0, + sendfile_option/0]). %% %% Connect a socket @@ -304,6 +310,61 @@ unrecv(S, Data) when is_port(S) -> Error end. +%% +%% Send data using sendfile +%% + +-define(MAX_CHUNK_SIZE, (1 bsl 30)*2-1). + +-spec sendfile(File, Sock, Offset, Bytes, Opts) -> + {'ok', non_neg_integer()} | {'error', inet:posix()} when + File :: file:io_device(), + Sock :: socket(), + Offset :: non_neg_integer() | undefined, + Bytes :: non_neg_integer() | undefined, + Opts :: [sendfile_option()]. +sendfile(File, Sock, Offset, Bytes, Opts) when is_pid(File) -> + Ref = erlang:monitor(process, File), + File ! {file_request,self(),File, + {sendfile,Sock,Offset,Bytes,Opts}}, + receive + {file_reply,File,Reply} -> + erlang:demonitor(Ref,[flush]), + Reply; + {'DOWN', Ref, _, _, _} -> + {error, terminated} + end; +sendfile(File, Sock, Offset, Bytes, []) -> + sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, undefined, undefined, + false, false, false); +sendfile(File, Sock, Offset, Bytes, Opts) -> + ChunkSize0 = proplists:get_value(chunk_size, Opts, ?MAX_CHUNK_SIZE), + ChunkSize = if ChunkSize0 > ?MAX_CHUNK_SIZE -> + ?MAX_CHUNK_SIZE; + true -> ChunkSize0 + end, + Headers = proplists:get_value(headers, Opts), + Trailers = proplists:get_value(trailers, Opts), + sendfile(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + lists:member(sf_nodiskio,Opts),lists:member(sf_mnowait,Opts), + lists:member(sf_sync,Opts)). + +%% sendfile/2 +-spec sendfile(File, Sock) -> + {'ok', non_neg_integer()} | {'error', inet:posix() | badarg} + when File :: file:name(), + Sock :: socket(). +sendfile(File, Sock) -> + case file:open(File, [read, raw, binary]) of + {error, Reason} -> + {error, Reason}; + {ok, Fd} -> + {ok, #file_info{size = Bytes}} = file:read_file_info(File), + Res = sendfile(Fd, Sock, 0, Bytes, []), + file:close(Fd), + Res + end. + %% %% Set controlling process %% @@ -354,3 +415,91 @@ mod([_|Opts], Address) -> mod(Opts, Address); mod([], Address) -> mod(Address). + + +%% Internal sendfile functions +sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, + ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) + when is_port(Sock) -> + case Mod:sendfile(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + Nodiskio, MNowait, Sync) of + {error, enotsup} -> + sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers); + Else -> + Else + end; +sendfile(_,_,_,_,_,_,_,_,_,_) -> + {error, badarg}. + +%%% +%% Sendfile Fallback +%%% +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers) + when is_list(Headers) == false -> + case sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) of + {ok, BytesSent} when is_list(Trailers),is_integer(Headers) -> + sendfile_send(Sock, Trailers, BytesSent+Headers); + {ok, BytesSent} when is_list(Trailers) -> + sendfile_send(Sock, Trailers, BytesSent); + {ok, BytesSent} when is_integer(Headers) -> + {ok, BytesSent + Headers}; + Else -> + Else + end; +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) -> + case sendfile_send(Sock, Headers, 0) of + {ok, BytesSent} -> + sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, BytesSent, + Trailers); + Else -> + Else + end. + + +sendfile_fallback(File, Sock, undefined, Bytes, ChunkSize) -> + sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0); +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) -> + {ok, CurrPos} = file:position(File, 0), + {ok, _NewPos} = file:position(File, {bof, Offset}), + Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0), + file:position(File, {bof, CurrPos}), + Res. + + +sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> + {ok, BytesSent}; +sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) + when Bytes > BytesSent; Bytes == undefined -> + Size = if Bytes == undefined -> + ChunkSize; + (Bytes - BytesSent + ChunkSize) > 0 -> + Bytes - BytesSent; + true -> + ChunkSize + end, + case file:read(File, Size) of + {ok, Data} -> + case sendfile_send(Sock, Data, BytesSent) of + {ok,NewBytesSent} -> + sendfile_fallback_int( + File, Sock, Bytes, ChunkSize, + NewBytesSent); + Error -> + Error + end; + eof -> + {ok, BytesSent}; + Error -> + Error + end. + +sendfile_send(Sock, Data, Old) -> + Len = iolist_size(Data), + case send(Sock, Data) of + ok -> + {ok, Len+Old}; + Else -> + Else + end. diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index b622e3c5bc..492c60f521 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -22,8 +22,9 @@ %% are not tested here, because they are tested indirectly in this and %% and other test suites. --include_lib("test_server/include/test_server.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("kernel/include/inet.hrl"). +-include_lib("kernel/include/file.hrl"). -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2, @@ -32,20 +33,30 @@ t_connect_bad/1, t_recv_timeout/1, t_recv_eof/1, t_shutdown_write/1, t_shutdown_both/1, t_shutdown_error/1, - t_fdopen/1, t_implicit_inet6/1]). + t_fdopen/1, t_implicit_inet6/1, + t_sendfile/0, t_sendfile/1, t_sendfile_hdtl/1, t_sendfile_partial/1, + t_sendfile_offset/1]). + +-export([sendfile_server/1]). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> [{group, t_accept}, {group, t_connect}, {group, t_recv}, t_shutdown_write, t_shutdown_both, t_shutdown_error, - t_fdopen, t_implicit_inet6]. + t_fdopen, t_implicit_inet6,{group,t_sendfile}]. groups() -> [{t_accept, [], [t_accept_timeout]}, {t_connect, [], [t_connect_timeout, t_connect_bad]}, {t_recv, [], [t_recv_timeout, t_recv_eof]}, - {t_sendfile, [], [sendfile]}]. + {t_sendfile, [], [{group, t_sendfile_raw}, + {group, t_sendfile_ioserv}]}, + {t_sendfile_raw, [], sendfile_all()}, + {t_sendfile_ioserv, [], sendfile_all()}]. + +sendfile_all() -> + [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset]. init_per_suite(Config) -> Config. @@ -53,6 +64,16 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. +init_per_group(t_sendfile, Config) -> + Priv = ?config(priv_dir, Config), + Filename = filename:join(Priv, "sendfile_small.html"), + {ok, D} = file:open(Filename,[write]), + io:format(D,"yo baby yo",[]), + file:sync(D), + file:close(D), + [{small_file, Filename}|Config]; +init_per_group(t_sendfile_raw, Config) -> + [{file_opts, [raw]}|Config]; init_per_group(_GroupName, Config) -> Config. @@ -238,65 +259,137 @@ implicit_inet6(S, Addr) -> ?line ok = gen_tcp:close(S1). -sendfile() -> +t_sendfile() -> [{timetrap, {seconds, 5}}]. -sendfile_supported({unix,linux}) -> true; -sendfile_supported({unix,sunos}) -> true; -sendfile_supported({unix,freebsd}) -> true; -sendfile_supported({unix,dragonfly}) -> true; -sendfile_supported({unix,darwin}) -> true; -%% TODO: enable win32 once TransmitFile based implemenation written properly -%% sendfile_supported({win32,_}) -> true; -sendfile_supported(_) -> false. - -sendfile(Config) when is_list(Config) -> - case sendfile_supported(os:type()) of - true -> - ?line Data = ?config(data_dir, Config), - ?line Real = filename:join(Data, "realmen.html"), - Host = "localhost", - - %% TODO: find another way to test for {error, posix_error()}? - %% Disabled because with driver_select I cannot test for - %% invalid out_fd - %% ?line {error, Error} = file:sendfile(Real, -1), - %% ?line test_server:format("sendfile error = ~p", [Error]), - %% %% Unix ebadf, Windows eio - %% ?line true = Error =:= ebadf orelse Error =:= eio, - - ?line ok = sendfile_send(Host, Real); - false -> - {skip, "sendfile not supported on this platform"} - end. +t_sendfile(Config) when is_list(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = gen_tcp:sendfile(Filename, Sock), + Data + end, + + ok = sendfile_send(Send). + +t_sendfile_partial(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + SendSingle = fun(Sock) -> + {_Size, <>} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]), + file:close(D), + Data + end, + ok = sendfile_send(SendSingle), + + {_Size, <>} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + FSend = fun(Sock) -> + {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]), + FData + end, + + ok = sendfile_send(FSend), + + SSend = fun(Sock) -> + {ok,3} = gen_tcp:sendfile(D,Sock,undefined,3,[]), + SData + end, + + ok = sendfile_send(SSend), + file:close(D). + +t_sendfile_offset(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + Send = fun(Sock) -> + {_Size, <<_:5/binary,Data:3/binary,_/binary>> = AllData} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read,binary|FileOpts]), + {ok,3} = gen_tcp:sendfile(D,Sock,5,3,[]), + {ok, AllData} = file:read(D,100), + file:close(D), + Data + end, + ok = sendfile_send(Send). + + +t_sendfile_hdtl(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + Send = fun(Sock, Headers, Trailers, HdtlSize) -> + {Size, Data} = sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + AllSize = Size+HdtlSize, + {ok, AllSize} = gen_tcp:sendfile( + D, Sock,undefined,undefined, + [{headers,Headers}, + {trailers,Trailers}]), + file:close(D), + Data + end, + + SendHdTl = fun(Sock) -> + Headers = [<<"header1">>,"header2"], + Trailers = [<<"trailer1">>,"trailer2"], + D = Send(Sock,Headers,Trailers, + iolist_size([Headers,Trailers])), + iolist_to_binary([Headers,D,Trailers]) + end, + ok = sendfile_send(SendHdTl), + + SendHd = fun(Sock) -> + Headers = [<<"header1">>,"header2"], + D = Send(Sock,Headers,undefined, + iolist_size([Headers])), + iolist_to_binary([Headers,D]) + end, + ok = sendfile_send(SendHd), + + SendTl = fun(Sock) -> + Trailers = [<<"trailer1">>,"trailer2"], + D = Send(Sock,Trailers,undefined, + iolist_size([Trailers])), + iolist_to_binary([Trailers,D]) + end, + ok = sendfile_send(SendTl). -%% TODO: consolidate tests and reduce code -sendfile_send(Host, File) -> - {Size, _Md5} = FileInfo = sendfile_file_info(File), +%% TODO: consolidate tests and reduce code +sendfile_send(Send) -> + sendfile_send("localhost",Send). +sendfile_send(Host, Send) -> spawn_link(?MODULE, sendfile_server, [self()]), receive {server, Port} -> - ?line {ok, Sock} = gen_tcp:connect(Host, Port, + {ok, Sock} = gen_tcp:connect(Host, Port, [binary,{packet,0}]), - ?line {ok, Size} = file:sendfile(File, Sock), - ?line ok = gen_tcp:close(Sock), + Data = Send(Sock), + ok = gen_tcp:close(Sock), receive {ok, Bin} -> - ?line FileInfo = sendfile_bin_info(Bin), + Data = Bin, ok end end. sendfile_server(ClientPid) -> - ?line {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, + {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, {active, false}, {reuseaddr, true}]), - ?line {ok, Port} = inet:port(LSock), + {ok, Port} = inet:port(LSock), ClientPid ! {server, Port}, - ?line {ok, Sock} = gen_tcp:accept(LSock), - ?line {ok, Bin} = sendfile_do_recv(Sock, []), - ?line ok = gen_tcp:close(Sock), + {ok, Sock} = gen_tcp:accept(LSock), + {ok, Bin} = sendfile_do_recv(Sock, []), + ok = gen_tcp:close(Sock), ClientPid ! {ok, Bin}. -define(SENDFILE_TIMEOUT, 5000). @@ -306,20 +399,13 @@ sendfile_do_recv(Sock, Bs) -> {ok, B} -> sendfile_do_recv(Sock, [B|Bs]); {error, closed} -> - {ok, lists:reverse(Bs)} + {ok, iolist_to_binary(lists:reverse(Bs))} end. sendfile_file_info(File) -> {ok, #file_info{size = Size}} = file:read_file_info(File), {ok, Data} = file:read_file(File), - Md5 = erlang:md5(Data), - {Size, Md5}. - -sendfile_bin_info(Data) -> - Size = lists:foldl(fun(E,Sum) -> size(E) + Sum end, 0, Data), - Md5 = erlang:md5(Data), - {Size, Md5}. - + {Size, Data}. %%% Utilities -- cgit v1.2.3 From 0348a9c9c0114ddf83d776adc3d01ac60dfcccfc Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 10:48:46 +0100 Subject: Implement sendfile using blocking io in asynch threads Move the command handling to outputv in preparation for header and trailer inclusion in the sendfile api. Use the standard efile communication functions for sendfile. --- erts/emulator/drivers/common/efile_drv.c | 171 ++++++------------------------- erts/emulator/drivers/common/erl_efile.h | 2 +- erts/emulator/drivers/unix/unix_efile.c | 27 ++--- erts/preloaded/src/prim_file.erl | 33 +++--- lib/kernel/src/gen_tcp.erl | 5 +- 5 files changed, 58 insertions(+), 180 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 509c4fe48c..7e194a3787 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -629,113 +629,6 @@ static struct t_data *cq_deq(file_descriptor *desc) { return d; } - -/********************************************************************* - * Command queue functions - */ - -static ErlDrvTermData am_ok; -static ErlDrvTermData am_error; -static ErlDrvTermData am_efile_reply; - -#define INIT_ATOM(NAME) am_ ## NAME = driver_mk_atom(#NAME) - -#define LOAD_ATOM_CNT 2 -#define LOAD_ATOM(vec, i, atom) \ - (((vec)[(i)] = ERL_DRV_ATOM), \ - ((vec)[(i)+1] = (atom)), \ - ((i)+LOAD_ATOM_CNT)) - -#define LOAD_INT_CNT 2 -#define LOAD_INT(vec, i, val) \ - (((vec)[(i)] = ERL_DRV_INT), \ - ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ - ((i)+LOAD_INT_CNT)) - -#define LOAD_INT64_CNT 2 -#define LOAD_INT64(vec, i, val) \ - (((vec)[(i)] = ERL_DRV_INT64), \ - ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ - ((i)+LOAD_INT64_CNT)) - -#define LOAD_PORT_CNT 2 -#define LOAD_PORT(vec, i, port) \ - (((vec)[(i)] = ERL_DRV_PORT), \ - ((vec)[(i)+1] = (port)), \ - ((i)+LOAD_PORT_CNT)) - -#define LOAD_PID_CNT 2 -#define LOAD_PID(vec, i, pid) \ - (((vec)[(i)] = ERL_DRV_PID), \ - ((vec)[(i)+1] = (pid)), \ - ((i)+LOAD_PID_CNT)) - -#define LOAD_TUPLE_CNT 2 -#define LOAD_TUPLE(vec, i, size) \ - (((vec)[(i)] = ERL_DRV_TUPLE), \ - ((vec)[(i)+1] = (size)), \ - ((i)+LOAD_TUPLE_CNT)) - -/* send: -** {efile_reply, Pid, Port, {ok, int64()}} -*/ - -static int ef_send_ok_int64(file_descriptor *desc, ErlDrvTermData caller, - ErlDrvSInt64 *n) -{ - ErlDrvTermData spec[2*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT - + LOAD_INT64_CNT + 2*LOAD_TUPLE_CNT]; - int i = 0; - - i = LOAD_ATOM(spec, i, am_efile_reply); - i = LOAD_PID(spec, i, caller); - i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); - i = LOAD_ATOM(spec, i, am_ok); - i = LOAD_INT64(spec, i, n); - i = LOAD_TUPLE(spec, i, 2); - i = LOAD_TUPLE(spec, i, 4); - ASSERT(i == sizeof(spec)/sizeof(*spec)); - - return driver_send_term(desc->port, caller, spec, i); -} - -static ErlDrvTermData error_atom(int err) -{ - char errstr[256]; - char* s; - char* t; - - for (s = erl_errno_id(err), t = errstr; *s; s++, t++) - *t = tolower(*s); - *t = '\0'; - return driver_mk_atom(errstr); -} - -/* send: -** {efile_reply, Pid, Port, {error, posix_error()} -*/ - -static int ef_send_posix_error(file_descriptor *desc, ErlDrvTermData caller, - int e) -{ - ErlDrvTermData spec[3*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT - + 2*LOAD_TUPLE_CNT]; - int i = 0; - - i = LOAD_ATOM(spec, i, am_efile_reply); - i = LOAD_PID(spec, i, caller); - i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); - i = LOAD_ATOM(spec, i, am_error); - /* TODO: safe? set of error codes should be limited and safe */ - i = LOAD_ATOM(spec, i, error_atom(e)); - i = LOAD_TUPLE(spec, i, 2); - i = LOAD_TUPLE(spec, i, 4); - ASSERT(i == sizeof(spec)/sizeof(*spec)); - - desc->caller = 0; - return driver_send_term(desc->port, caller, spec, i); -} - /********************************************************************* * Driver entry point -> init */ @@ -751,10 +644,6 @@ file_init(void) : 0); driver_system_info(&sys_info, sizeof(ErlDrvSysInfo)); - INIT_ATOM(ok); - INIT_ATOM(error); - INIT_ATOM(efile_reply); - return 0; } @@ -1871,7 +1760,7 @@ static void do_sendfile(file_descriptor *d) ERL_DRV_USE|ERL_DRV_WRITE, 1); } else { printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain); - ef_send_ok_int64(d, d->caller, &d->sendfile.written); + reply_Uint(d, d->sendfile.written); } } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) { if (chunksize > 0) { @@ -2301,8 +2190,6 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - /* Return 'ok' and let prim_file:sendfile wait for message */ - reply_ok(desc); driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd, ERL_DRV_USE|ERL_DRV_WRITE, 1); free_data(data); @@ -2654,34 +2541,6 @@ file_output(ErlDrvData e, char* buf, int count) goto done; } - case FILE_SENDFILE: - { - d = EF_SAFE_ALLOC(sizeof(struct t_data)); - d->fd = fd; - d->command = command; - d->invoke = invoke_sendfile; - d->free = free_data; - d->level = 2; - desc->sendfile.out_fd = get_int32((uchar*) buf); - /* TODO: are off_t and size_t 64bit on all platforms? - off_t is 32bit on win32 msvc. maybe configurable in msvc. - Maybe use '#if SIZEOF_SIZE_T == 4'? */ - desc->sendfile.offset = get_int64(((uchar*) buf) - + sizeof(Sint32)); - desc->sendfile.count = get_int64(((uchar*) buf) - + sizeof(Sint32) - + sizeof(Sint64)); - desc->sendfile.chunksize = get_int64(((uchar*) buf) - + sizeof(Sint32) - + 2*sizeof(Sint64)); - desc->sendfile.written = 0; - desc->sendfile.eagain = 0; - /* TODO: shouldn't d->command be enough? */ - desc->command = command; - desc->caller = driver_caller(desc->port); - goto done; - } - } /* @@ -3475,6 +3334,34 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ + case FILE_SENDFILE: + { + struct t_data *d; + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = desc->fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = free_data; + d->level = 2; + desc->sendfile.out_fd = get_int32((uchar*) buf); + /* TODO: are off_t and size_t 64bit on all platforms? + off_t is 32bit on win32 msvc. maybe configurable in msvc. + Maybe use '#if SIZEOF_SIZE_T == 4'? */ + desc->sendfile.offset = get_int64(((uchar*) buf) + + sizeof(Sint32)); + desc->sendfile.count = get_int64(((uchar*) buf) + + sizeof(Sint32) + + sizeof(Sint64)); + desc->sendfile.chunksize = get_int64(((uchar*) buf) + + sizeof(Sint32) + + 2*sizeof(Sint64)); + desc->sendfile.written = 0; + desc->sendfile.eagain = 0; + /* TODO: shouldn't d->command be enough? */ + desc->command = command; + desc->caller = driver_caller(desc->port); + goto done; + } } /* switch(command) */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 3c6c2ec2db..864b867955 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,5 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); -int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, +int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t offset, size_t *count); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 5b001b3819..8b612164da 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1471,31 +1471,24 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *count) + off_t offset, size_t *ret_nbytes) { #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval = sendfile(out_fd, in_fd, offset, *count); - if (retval >= 0) { - if (retval != *count) { - *count = retval; - retval = -1; - errno = EAGAIN; - } else { - *count = retval; - } - } else if (retval == -1 && (errno == EINTR || errno == EAGAIN)) { - *count = 0; - } + ssize_t retval; + do { + retval = sendfile(out_fd, in_fd, &offset, *ret_nbytes); + } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); + *ret_nbytes = retval; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) - off_t len = *count; + off_t len = *ret_nbytes; int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); - *count = len; + *ret_nbytes = len; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) off_t len = 0; - int retval = sendfile(in_fd, out_fd, *offset, *count, NULL, &len, 0); - *count = len; + int retval = sendfile(in_fd, out_fd, *offset, *nbytes, NULL, &len, 0); + *nbytes = len; return check_error(retval, errInfo); #endif } diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index 606d7d5aab..0767067682 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -539,30 +539,22 @@ write_file(File, Bin) when (is_list(File) orelse is_binary(File)) -> end; write_file(_, _) -> {error, badarg}. - + %% Returns {error, Reason} | {ok, BytesCopied} -sendfile(_,_,_,_,_,_,_,_,_,_) -> - {error, enotsup}; +%sendfile(_,_,_,_,_,_,_,_,_,_) -> +% {error, enotsup}; sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, DestFD, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) -> - ok = drv_command(Port, <>), - Self = self(), - %% Should we use a ref()? - receive - {efile_reply, Self, Port, {ok, _Written}=OKRes}-> - OKRes; - {efile_reply, Self, Port, {error, _PosixError}=Error}-> - Error - end. + drv_command(Port, <>). get_bit(true) -> 1; @@ -578,8 +570,11 @@ encode_hdtl(List) -> encode_hdtl([], Acc, Cnt) -> <>; +encode_hdtl([Bin|T], Acc, Cnt) when is_binary(Bin) -> + encode_hdtl(T, <<(byte_size(Bin)):32, Bin/binary, Acc/binary>>,Cnt + 1); encode_hdtl([Bin|T], Acc, Cnt) -> - encode_hdtl(T, <<(byte_size(Bin)):32, Bin/binary, Acc/binary>>,Cnt + 1). + encode_hdtl(T, <<(iolist_size(Bin)):32, (iolist_to_binary(Bin))/binary, + Acc/binary>>,Cnt + 1). diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 56eca4cda4..ea25dc3dc3 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -421,12 +421,15 @@ mod([], Address) -> sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) when is_port(Sock) -> - case Mod:sendfile(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + {ok,SockFd} = prim_inet:stealfd(Sock), + case Mod:sendfile(Fd, SockFd, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) of {error, enotsup} -> + prim_inet:returnfd(Sock), sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers); Else -> + prim_inet:returnfd(Sock), Else end; sendfile(_,_,_,_,_,_,_,_,_,_) -> -- cgit v1.2.3 From bfa81856150b59ea4578e0eef79b97ab0decb8f7 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 10:52:16 +0100 Subject: Remove output from driver entry outputv will always be used so removed output to decrease confusion. --- erts/emulator/drivers/common/efile_drv.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 7e194a3787..b14f5844b2 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -277,7 +277,7 @@ struct erl_drv_entry efile_driver_entry = { file_init, file_start, file_stop, - file_output, + NULL, NULL, file_ready_output, "efile", -- cgit v1.2.3 From 54bdd9a15d2e130c76f76ca322af56b306d02078 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 9 Nov 2011 11:52:14 +0100 Subject: Implement blocking calls for sendfile Move sendfile data to invoke data instead of file_descr. Remove usage of ready_output when doing a send. If told to send 0 bytes, file_sendfile now sends the entire file for linux. --- erts/emulator/drivers/common/efile_drv.c | 206 +++++++++++++++---------------- erts/emulator/drivers/unix/unix_efile.c | 31 +++-- erts/preloaded/src/prim_file.erl | 7 +- lib/kernel/src/gen_tcp.erl | 23 ++-- lib/kernel/test/gen_tcp_api_SUITE.erl | 59 ++++++--- 5 files changed, 177 insertions(+), 149 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index b14f5844b2..4ed6aa4891 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -76,6 +76,11 @@ #define FILE_OPT_DELAYED_WRITE 0 #define FILE_OPT_READ_AHEAD 1 +#define FILE_SENDFILE_OFFSET 0x10 +#define FILE_SENDFILE_NBYTES 0x08 +#define FILE_SENDFILE_NODISKIO 0x4 +#define FILE_SENDFILE_MNOWAIT 0x2 +#define FILE_SENDFILE_SYNC 0x1 /* IPREAD variants */ @@ -218,7 +223,6 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); -static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -226,7 +230,6 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); -static void file_stop_select(ErlDrvEvent event, void* _); @@ -256,18 +259,6 @@ typedef struct { ErlDrvPDL q_mtx; /* Mutex for the driver queue, known by the emulator. Also used for mutual exclusion when accessing field(s) below. */ size_t write_buffered; - ErlDrvTermData caller; /* recipient of sync reply */ - /* sendfile call state to retry/resume on event */ - int command; /* same as d->command. for sendfile. TODO: this seems wrong */ - struct { - int eagain; - int out_fd; - /* TODO: Use Sint64 instead? What about 32-bit off_t linux */ - off_t offset; - size_t count; - size_t chunksize; - ErlDrvSInt64 written; - } sendfile; } file_descriptor; @@ -279,7 +270,7 @@ struct erl_drv_entry efile_driver_entry = { file_stop, NULL, NULL, - file_ready_output, + NULL, "efile", NULL, NULL, @@ -296,7 +287,7 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, NULL, - file_stop_select + NULL }; @@ -415,6 +406,18 @@ struct t_data Sint64 length; int advise; } fadvise; +#ifdef HAVE_SENDFILE + struct { + int out_fd; + off_t offset; + size_t nbytes; + int flags; + int hdr_cnt; /* number of header iovecs */ + struct iovec *headers; /* pointer to header iovecs */ + int trl_cnt; /* number of trailer iovecs */ + struct iovec *trailers; /* pointer to trailer iovecs */ + } sendfile; +#endif } c; char b[1]; }; @@ -1710,71 +1713,27 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } - - -static void do_sendfile(file_descriptor *desc); -static void file_ready_output(ErlDrvData data, ErlDrvEvent event) -{ - file_descriptor* d = (file_descriptor*) data; - - switch (d->command) { - case FILE_SENDFILE: - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_WRITE, 0); - do_sendfile(d); - break; - default: - break; - } -} - -static void file_stop_select(ErlDrvEvent event, void* _) -{ - /* TODO: close socket? */ +static void free_sendfile(void *data) { + EF_FREE(data); } static void invoke_sendfile(void *data) { - ((struct t_data *)data)->again = 0; -} - -static void do_sendfile(file_descriptor *d) -{ - int fd = d->fd; - int out_fd = d->sendfile.out_fd; - off_t offset = d->sendfile.offset; - size_t count = d->sendfile.count; - size_t chunksize = count < d->sendfile.chunksize - ? count : d->sendfile.chunksize; - int result_ok = 0; - Efile_error errInfo; - - result_ok = efile_sendfile(&errInfo, fd, out_fd, &offset, &chunksize); - - if (result_ok) { - d->sendfile.offset += chunksize; - d->sendfile.written += chunksize; - d->sendfile.count -= chunksize; - if (d->sendfile.count > 0) { - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); - } else { - printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain); - reply_Uint(d, d->sendfile.written); - } - } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) { - if (chunksize > 0) { - d->sendfile.offset += chunksize; - d->sendfile.written += chunksize; - d->sendfile.count -= chunksize; - } - d->sendfile.eagain++; + struct t_data *d = (struct t_data *) data; + int fd = (int)d->fd; + int out_fd = (int) d->c.sendfile.out_fd; + off_t offset = (off_t) d->c.sendfile.offset; + size_t nbytes = (size_t) d->c.sendfile.nbytes; + + d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes); + d->c.sendfile.offset = offset; + d->c.sendfile.nbytes = nbytes; + d->again = 0; - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); + if (d->result_ok) { + printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes); } else { - printf("==> sendfile ERROR %s\n", erl_errno_id(errInfo.posix_errno)); - ef_send_posix_error(d, d->caller, errInfo.posix_errno); + printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno)); } } @@ -2190,9 +2149,12 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); - free_data(data); + if (!d->result_ok) { + reply_error(desc, &d->errInfo); + } else { + reply_Sint64(desc, d->c.sendfile.nbytes); + } + free_sendfile(data); break; default: abort(); @@ -3334,37 +3296,71 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ - case FILE_SENDFILE: - { + case FILE_SENDFILE: { + struct t_data *d; - d = EF_SAFE_ALLOC(sizeof(struct t_data)); - d->fd = desc->fd; - d->command = command; - d->invoke = invoke_sendfile; - d->free = free_data; - d->level = 2; - desc->sendfile.out_fd = get_int32((uchar*) buf); - /* TODO: are off_t and size_t 64bit on all platforms? - off_t is 32bit on win32 msvc. maybe configurable in msvc. - Maybe use '#if SIZEOF_SIZE_T == 4'? */ - desc->sendfile.offset = get_int64(((uchar*) buf) - + sizeof(Sint32)); - desc->sendfile.count = get_int64(((uchar*) buf) - + sizeof(Sint32) - + sizeof(Sint64)); - desc->sendfile.chunksize = get_int64(((uchar*) buf) - + sizeof(Sint32) - + 2*sizeof(Sint64)); - desc->sendfile.written = 0; - desc->sendfile.eagain = 0; - /* TODO: shouldn't d->command be enough? */ - desc->command = command; - desc->caller = driver_caller(desc->port); + Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + char flags; + + /* DestFD:32, Offset:64, Bytes:64, + ChunkSize:64, + (get_bit(Nodiskio)):1, + (get_bit(MNowait)):1, + (get_bit(Sync)):1,0:5, + (encode_hdtl(Headers))/binary, + (encode_hdtl(Trailers))/binary */ + if (ev->size < 1 + 1 + 5 * sizeof(Uint32) + sizeof(char) + || !EV_GET_UINT32(ev, &out_fd, &p, &q) + || !EV_GET_CHAR(ev, &flags, &p, &q) + || !EV_GET_UINT32(ev, &offsetH, &p, &q) + || !EV_GET_UINT32(ev, &offsetL, &p, &q) + || !EV_GET_UINT32(ev, &nbytesH, &p, &q) + || !EV_GET_UINT32(ev, &nbytesL, &p, &q)) { + /* Buffer has wrong length to contain all the needed values */ + reply_posix_error(desc, EINVAL); goto done; } - + + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = desc->fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = free_sendfile; + d->level = 2; + + d->c.sendfile.out_fd = (int) out_fd; + d->c.sendfile.flags = (int) flags; + +#if SIZEOF_OFF_T == 4 + if (offsetH != 0) { + reply_posix_error(desc, EINVAL); + goto done; + } + d->c.sendfile.offset = (off_t) offsetT; +#else + d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; +#endif + +#if SIZEOF_SIZE_T == 4 + if (nbytesH != 0) { + reply_posix_error(desc, EINVAL); + goto done; + } + d->c.sendfile.nbytes = (size_t) nbytesT; +#else + d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; +#endif + + printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); + + /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ + + cq_enq(desc, d); + goto done; + } /* case FILE_SENDFILE: */ + } /* switch(command) */ - + if (lseek_flush_read(desc, &err) < 0) { reply_posix_error(desc, err); goto done; diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 8b612164da..05c2f1fce9 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1471,19 +1471,34 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t offset, size_t *ret_nbytes) + off_t offset, size_t *nbytes) { #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval; - do { - retval = sendfile(out_fd, in_fd, &offset, *ret_nbytes); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - *ret_nbytes = retval; + ssize_t retval, nbytes_sent = 0; + if (*nbytes == 0) { + *nbytes = (1 << 20) - 1; + do { + retval = sendfile(out_fd, in_fd, &offset, *nbytes); + nbytes_sent += retval; + printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes); + } while ((retval == -1 && errno == EINTR) + || (retval > 0 && errno == EAGAIN)); + } else { + do { + retval = sendfile(out_fd, in_fd, &offset, *nbytes); + if (retval > 0) { + nbytes_sent += retval; + *nbytes -= retval; + } + } while ((retval == -1 && errno == EINTR) + || (*nbytes > 0 && errno == EAGAIN)); + } + *nbytes = nbytes_sent; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) - off_t len = *ret_nbytes; + off_t len = *nbytes; int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); - *ret_nbytes = len; + *nbytes = len; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) off_t len = 0; diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index 0767067682..6bdf5f6e2e 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -547,12 +547,13 @@ write_file(_, _) -> sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, DestFD, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) -> - - drv_command(Port, <>). diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index ea25dc3dc3..26afed4ff9 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -314,14 +314,14 @@ unrecv(S, Data) when is_port(S) -> %% Send data using sendfile %% --define(MAX_CHUNK_SIZE, (1 bsl 30)*2-1). +-define(MAX_CHUNK_SIZE, (1 bsl 20)*20). %% 20 MB, has to fit in primary memory -spec sendfile(File, Sock, Offset, Bytes, Opts) -> {'ok', non_neg_integer()} | {'error', inet:posix()} when File :: file:io_device(), Sock :: socket(), - Offset :: non_neg_integer() | undefined, - Bytes :: non_neg_integer() | undefined, + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(), Opts :: [sendfile_option()]. sendfile(File, Sock, Offset, Bytes, Opts) when is_pid(File) -> Ref = erlang:monitor(process, File), @@ -359,8 +359,7 @@ sendfile(File, Sock) -> {error, Reason} -> {error, Reason}; {ok, Fd} -> - {ok, #file_info{size = Bytes}} = file:read_file_info(File), - Res = sendfile(Fd, Sock, 0, Bytes, []), + Res = sendfile(Fd, Sock, 0, 0, []), file:close(Fd), Res end. @@ -461,21 +460,17 @@ sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) -> end. -sendfile_fallback(File, Sock, undefined, Bytes, ChunkSize) -> - sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0); sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) -> - {ok, CurrPos} = file:position(File, 0), + {ok, CurrPos} = file:position(File, {cur, 0}), {ok, _NewPos} = file:position(File, {bof, Offset}), Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0), file:position(File, {bof, CurrPos}), Res. -sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> - {ok, BytesSent}; sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) - when Bytes > BytesSent; Bytes == undefined -> - Size = if Bytes == undefined -> + when Bytes > BytesSent; Bytes == 0 -> + Size = if Bytes == 0 -> ChunkSize; (Bytes - BytesSent + ChunkSize) > 0 -> Bytes - BytesSent; @@ -496,7 +491,9 @@ sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) {ok, BytesSent}; Error -> Error - end. + end; +sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> + {ok, BytesSent}. sendfile_send(Sock, Data, Old) -> Len = iolist_size(Data), diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index 492c60f521..60a3dcb2f4 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -34,7 +34,8 @@ t_recv_timeout/1, t_recv_eof/1, t_shutdown_write/1, t_shutdown_both/1, t_shutdown_error/1, t_fdopen/1, t_implicit_inet6/1, - t_sendfile/0, t_sendfile/1, t_sendfile_hdtl/1, t_sendfile_partial/1, + t_sendfile_small/1, t_sendfile_big/1, + t_sendfile_hdtl/1, t_sendfile_partial/1, t_sendfile_offset/1]). -export([sendfile_server/1]). @@ -56,7 +57,8 @@ groups() -> {t_sendfile_ioserv, [], sendfile_all()}]. sendfile_all() -> - [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset]. +% [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset]. + [t_sendfile_big]. init_per_suite(Config) -> Config. @@ -66,19 +68,25 @@ end_per_suite(_Config) -> init_per_group(t_sendfile, Config) -> Priv = ?config(priv_dir, Config), - Filename = filename:join(Priv, "sendfile_small.html"), - {ok, D} = file:open(Filename,[write]), - io:format(D,"yo baby yo",[]), - file:sync(D), - file:close(D), - [{small_file, Filename}|Config]; + SFilename = filename:join(Priv, "sendfile_small.html"), + {ok, DS} = file:open(SFilename,[write]), + io:format(DS,"yo baby yo",[]), + file:sync(DS), + file:close(DS), + BFilename = filename:join(Priv, "sendfile_big.html"), + {ok, DB} = file:open(BFilename,[write,raw]), + [file:write(DB,[<<0:(10*8*1024*1024)>>]) || + _I <- lists:seq(1,51)], + file:sync(DB), + file:close(DB), + [{small_file, SFilename},{big_file, BFilename}|Config]; init_per_group(t_sendfile_raw, Config) -> [{file_opts, [raw]}|Config]; init_per_group(_GroupName, Config) -> Config. end_per_group(_GroupName, Config) -> - Config. + file:delete(proplists:get_value(big_file, Config)). init_per_testcase(_Func, Config) -> @@ -258,12 +266,19 @@ implicit_inet6(S, Addr) -> ?line ok = gen_tcp:close(S2), ?line ok = gen_tcp:close(S1). +t_sendfile_small(Config) when is_list(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = gen_tcp:sendfile(Filename, Sock), + Data + end, -t_sendfile() -> - [{timetrap, {seconds, 5}}]. + ok = sendfile_send(Send). -t_sendfile(Config) when is_list(Config) -> - Filename = proplists:get_value(small_file, Config), +t_sendfile_big(Config) when is_list(Config) -> + Filename = proplists:get_value(big_file, Config), Send = fun(Sock) -> {Size, Data} = sendfile_file_info(Filename), @@ -281,7 +296,7 @@ t_sendfile_partial(Config) -> {_Size, <>} = sendfile_file_info(Filename), {ok,D} = file:open(Filename,[read|FileOpts]), - {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]), + {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]), file:close(D), Data end, @@ -289,20 +304,24 @@ t_sendfile_partial(Config) -> {_Size, <>} = sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read|FileOpts]), + {ok,D} = file:open(Filename,[read,binary|FileOpts]), + {ok, <>} = file:read(D,5), FSend = fun(Sock) -> - {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]), + {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]), FData end, ok = sendfile_send(FSend), SSend = fun(Sock) -> - {ok,3} = gen_tcp:sendfile(D,Sock,undefined,3,[]), + {ok,3} = gen_tcp:sendfile(D,Sock,5,3,[]), SData end, ok = sendfile_send(SSend), + + {ok, <>} = file:read(D,3), + file:close(D). t_sendfile_offset(Config) -> @@ -330,7 +349,7 @@ t_sendfile_hdtl(Config) -> {ok,D} = file:open(Filename,[read|FileOpts]), AllSize = Size+HdtlSize, {ok, AllSize} = gen_tcp:sendfile( - D, Sock,undefined,undefined, + D, Sock,0,0, [{headers,Headers}, {trailers,Trailers}]), file:close(D), @@ -356,9 +375,9 @@ t_sendfile_hdtl(Config) -> SendTl = fun(Sock) -> Trailers = [<<"trailer1">>,"trailer2"], - D = Send(Sock,Trailers,undefined, + D = Send(Sock,undefined,Trailers, iolist_size([Trailers])), - iolist_to_binary([Trailers,D]) + iolist_to_binary([D,Trailers]) end, ok = sendfile_send(SendTl). -- cgit v1.2.3 From 020e988424cf0d15ebab8de50638492defb6f2b5 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 11 Nov 2011 16:19:07 +0100 Subject: Implement sendfile when there are no async threads When there are no async threads sendfile will use the ready_output select on the socket fd to know when to send data. The file_desc will also be put in the sending sendfile_state which buffers all other commands to that file until the sendfile is done. --- erts/emulator/drivers/common/efile_drv.c | 143 +++++++++++++++++++++---------- erts/emulator/drivers/common/erl_efile.h | 4 +- erts/emulator/drivers/unix/unix_efile.c | 30 +++---- lib/kernel/test/gen_tcp_api_SUITE.erl | 52 +++++++---- 4 files changed, 146 insertions(+), 83 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 4ed6aa4891..a318384479 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -76,11 +76,6 @@ #define FILE_OPT_DELAYED_WRITE 0 #define FILE_OPT_READ_AHEAD 1 -#define FILE_SENDFILE_OFFSET 0x10 -#define FILE_SENDFILE_NBYTES 0x08 -#define FILE_SENDFILE_NODISKIO 0x4 -#define FILE_SENDFILE_MNOWAIT 0x2 -#define FILE_SENDFILE_SYNC 0x1 /* IPREAD variants */ @@ -223,6 +218,7 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -230,10 +226,12 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); +static void file_stop_select(ErlDrvEvent event, void* _); enum e_timer {timer_idle, timer_again, timer_write}; +enum e_sendfile {sending, not_sending}; struct t_data; @@ -248,6 +246,7 @@ typedef struct { struct t_data *cq_head; /* Queue of incoming commands */ struct t_data *cq_tail; /* -""- */ enum e_timer timer_state; + enum e_sendfile sendfile_state; size_t read_bufsize; ErlDrvBinary *read_binp; size_t read_offset; @@ -268,9 +267,9 @@ struct erl_drv_entry efile_driver_entry = { file_init, file_start, file_stop, + file_output, NULL, - NULL, - NULL, + file_ready_output, "efile", NULL, NULL, @@ -287,7 +286,7 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, NULL, - NULL + file_stop_select }; @@ -339,6 +338,13 @@ struct t_readdir_buf { char buf[READDIR_BUFSIZE]; }; +struct t_sendfile_hdtl { + int hdr_cnt; /* number of header iovecs */ + struct iovec *headers; /* pointer to header iovecs */ + int trl_cnt; /* number of trailer iovecs */ + struct iovec *trailers; /* pointer to trailer iovecs */ +}; + struct t_data { struct t_data *next; @@ -406,18 +412,14 @@ struct t_data Sint64 length; int advise; } fadvise; -#ifdef HAVE_SENDFILE struct { int out_fd; off_t offset; size_t nbytes; - int flags; - int hdr_cnt; /* number of header iovecs */ - struct iovec *headers; /* pointer to header iovecs */ - int trl_cnt; /* number of trailer iovecs */ - struct iovec *trailers; /* pointer to trailer iovecs */ + Uint64 written; + short flags; + struct t_sendfile_hdtl *hdtl; } sendfile; -#endif } c; char b[1]; }; @@ -632,6 +634,7 @@ static struct t_data *cq_deq(file_descriptor *desc) { return d; } + /********************************************************************* * Driver entry point -> init */ @@ -674,6 +677,7 @@ file_start(ErlDrvPort port, char* command) desc->cq_head = NULL; desc->cq_tail = NULL; desc->timer_state = timer_idle; + desc->sendfile_state = not_sending; desc->read_bufsize = 0; desc->read_binp = NULL; desc->read_offset = 0; @@ -1713,30 +1717,65 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } -static void free_sendfile(void *data) { - EF_FREE(data); -} - static void invoke_sendfile(void *data) { - struct t_data *d = (struct t_data *) data; - int fd = (int)d->fd; - int out_fd = (int) d->c.sendfile.out_fd; - off_t offset = (off_t) d->c.sendfile.offset; - size_t nbytes = (size_t) d->c.sendfile.nbytes; - - d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes); - d->c.sendfile.offset = offset; - d->c.sendfile.nbytes = nbytes; + struct t_data *d = (struct t_data *)data; + int fd = d->fd; + int out_fd = d->c.sendfile.out_fd; + size_t nbytes = d->c.sendfile.nbytes; + int result = 0; d->again = 0; - if (d->result_ok) { - printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes); + result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); + + /* printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n", + result, errno, d->c.sendfile.offset,nbytes);*/ + d->c.sendfile.written += nbytes; + + if (result == 1) { + if (d->c.sendfile.nbytes == 0 && nbytes != 0) { + d->result_ok = 1; + } else if ((d->c.sendfile.nbytes - nbytes) != 0) { + d->result_ok = 1; + d->c.sendfile.nbytes -= nbytes; + } else { + printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); + d->result_ok = 0; + } + } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN + || d->errInfo.posix_errno == EINTR)) { + d->result_ok = 1; } else { - printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno)); + d->result_ok = -1; + printf("==> sendfile ERROR %s\r\n", erl_errno_id(d->errInfo.posix_errno)); } } +static void free_sendfile(void *data) { + EF_FREE(data); +} + +static void file_ready_output(ErlDrvData data, ErlDrvEvent event) +{ + file_descriptor* fd = (file_descriptor*) data; + + switch (fd->d->command) { + case FILE_SENDFILE: + driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd, + (int)ERL_DRV_WRITE,(int) 0); + invoke_sendfile((void *)fd->d); + file_async_ready((ErlDrvData)fd, (ErlDrvThreadData)fd->d); + break; + default: + break; + } +} + +static void file_stop_select(ErlDrvEvent event, void* _) +{ + /* TODO: close socket? */ +} + static void free_readdir(void *data) { struct t_data *d = (struct t_data *) data; @@ -1796,7 +1835,8 @@ static int try_again(file_descriptor *desc, struct t_data *d) { static void cq_execute(file_descriptor *desc) { struct t_data *d; register void *void_ptr; /* Soft cast variable */ - if (desc->timer_state == timer_again) + if (desc->timer_state == timer_again || + desc->sendfile_state == sending) return; if (! (d = cq_deq(desc))) return; @@ -2149,12 +2189,21 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - if (!d->result_ok) { + //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); + if (d->result_ok == -1) { + desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); - } else { - reply_Sint64(desc, d->c.sendfile.nbytes); + free_sendfile(data); + } else if (d->result_ok == 0) { + desc->sendfile_state = not_sending; + reply_Sint64(desc, d->c.sendfile.written); + free_sendfile(data); + } else if (d->result_ok == 1) { // If we are using select to send the rest of the data + desc->sendfile_state = sending; + desc->d = d; + driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); } - free_sendfile(data); break; default: abort(); @@ -3296,11 +3345,12 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ + case FILE_SENDFILE: { - struct t_data *d; - Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; - char flags; + struct t_data *d; + Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + char flags; /* DestFD:32, Offset:64, Bytes:64, ChunkSize:64, @@ -3330,26 +3380,27 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->c.sendfile.out_fd = (int) out_fd; d->c.sendfile.flags = (int) flags; + d->c.sendfile.written = 0; -#if SIZEOF_OFF_T == 4 + #if SIZEOF_OFF_T == 4 if (offsetH != 0) { reply_posix_error(desc, EINVAL); goto done; } d->c.sendfile.offset = (off_t) offsetT; -#else + #else d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; -#endif + #endif -#if SIZEOF_SIZE_T == 4 + #if SIZEOF_SIZE_T == 4 if (nbytesH != 0) { reply_posix_error(desc, EINVAL); goto done; } d->c.sendfile.nbytes = (size_t) nbytesT; -#else + #else d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; -#endif + #endif printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); @@ -3357,7 +3408,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { cq_enq(desc, d); goto done; - } /* case FILE_SENDFILE: */ + } /* case FILE_SENDFILE: */ } /* switch(command) */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 864b867955..e0b8cfca03 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,5 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); -int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t offset, - size_t *count); +int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *nbytes); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 05c2f1fce9..3a966757d9 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1469,31 +1469,27 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, } #ifdef HAVE_SENDFILE +#define SENDFILE_CHUNK_SIZE ((1 << 30) - 1) int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t offset, size_t *nbytes) + off_t *offset, size_t *nbytes) { #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval, nbytes_sent = 0; + ssize_t retval, written = 0; + // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); if (*nbytes == 0) { - *nbytes = (1 << 20) - 1; do { - retval = sendfile(out_fd, in_fd, &offset, *nbytes); - nbytes_sent += retval; - printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes); - } while ((retval == -1 && errno == EINTR) - || (retval > 0 && errno == EAGAIN)); + *nbytes = SENDFILE_CHUNK_SIZE; // chunk size + retval = sendfile(out_fd, in_fd, offset, *nbytes); + if (retval > 0) + written += retval; + } while (retval == SENDFILE_CHUNK_SIZE); } else { - do { - retval = sendfile(out_fd, in_fd, &offset, *nbytes); - if (retval > 0) { - nbytes_sent += retval; - *nbytes -= retval; - } - } while ((retval == -1 && errno == EINTR) - || (*nbytes > 0 && errno == EAGAIN)); + retval = sendfile(out_fd, in_fd, offset, *nbytes); + if (retval > 0) + written = retval; } - *nbytes = nbytes_sent; + *nbytes = written; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) off_t len = *nbytes; diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index 60a3dcb2f4..1a88b2e17f 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -38,7 +38,7 @@ t_sendfile_hdtl/1, t_sendfile_partial/1, t_sendfile_offset/1]). --export([sendfile_server/1]). +-export([sendfile_server/2]). suite() -> [{ct_hooks,[ts_install_cth]}]. @@ -57,8 +57,15 @@ groups() -> {t_sendfile_ioserv, [], sendfile_all()}]. sendfile_all() -> -% [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset]. - [t_sendfile_big]. + [ + t_sendfile_small + ,t_sendfile_big +% ,t_sendfile_hdtl + ,t_sendfile_partial + ,t_sendfile_offset + ]. +% [t_sendfile_big]. +% [t_sendfile_small]. init_per_suite(Config) -> Config. @@ -75,8 +82,7 @@ init_per_group(t_sendfile, Config) -> file:close(DS), BFilename = filename:join(Priv, "sendfile_big.html"), {ok, DB} = file:open(BFilename,[write,raw]), - [file:write(DB,[<<0:(10*8*1024*1024)>>]) || - _I <- lists:seq(1,51)], + [file:write(DB,[<<0:(10*8*1024*1024)>>]) || _I <- lists:seq(1,51)], file:sync(DB), file:close(DB), [{small_file, SFilename},{big_file, BFilename}|Config]; @@ -85,8 +91,11 @@ init_per_group(t_sendfile_raw, Config) -> init_per_group(_GroupName, Config) -> Config. -end_per_group(_GroupName, Config) -> - file:delete(proplists:get_value(big_file, Config)). +end_per_group(t_sendfile, Config) -> + file:delete(proplists:get_value(big_file, Config)); +end_per_group(_,_Config) -> + ok. + init_per_testcase(_Func, Config) -> @@ -281,12 +290,13 @@ t_sendfile_big(Config) when is_list(Config) -> Filename = proplists:get_value(big_file, Config), Send = fun(Sock) -> - {Size, Data} = sendfile_file_info(Filename), + {ok, #file_info{size = Size}} = + file:read_file_info(Filename), {ok, Size} = gen_tcp:sendfile(Filename, Sock), - Data + Size end, - ok = sendfile_send(Send). + ok = sendfile_send("localhost", Send, 0). t_sendfile_partial(Config) -> Filename = proplists:get_value(small_file, Config), @@ -386,7 +396,9 @@ t_sendfile_hdtl(Config) -> sendfile_send(Send) -> sendfile_send("localhost",Send). sendfile_send(Host, Send) -> - spawn_link(?MODULE, sendfile_server, [self()]), + sendfile_send(Host, Send, []). +sendfile_send(Host, Send, Orig) -> + spawn_link(?MODULE, sendfile_server, [self(), Orig]), receive {server, Port} -> {ok, Sock} = gen_tcp:connect(Host, Port, @@ -400,25 +412,29 @@ sendfile_send(Host, Send) -> end end. -sendfile_server(ClientPid) -> +sendfile_server(ClientPid, Orig) -> {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, {active, false}, {reuseaddr, true}]), {ok, Port} = inet:port(LSock), ClientPid ! {server, Port}, {ok, Sock} = gen_tcp:accept(LSock), - {ok, Bin} = sendfile_do_recv(Sock, []), + {ok, Bin} = sendfile_do_recv(Sock, Orig), ok = gen_tcp:close(Sock), ClientPid ! {ok, Bin}. --define(SENDFILE_TIMEOUT, 5000). - +-define(SENDFILE_TIMEOUT, 10000). +%% f(),{ok, S} = gen_tcp:connect("localhost",7890,[binary]),gen_tcp:sendfile("/ldisk/lukas/otp/sendfiletest.dat",S). sendfile_do_recv(Sock, Bs) -> case gen_tcp:recv(Sock, 0, ?SENDFILE_TIMEOUT) of - {ok, B} -> + {ok, B} when is_list(Bs) -> sendfile_do_recv(Sock, [B|Bs]); - {error, closed} -> - {ok, iolist_to_binary(lists:reverse(Bs))} + {error, closed} when is_list(Bs) -> + {ok, iolist_to_binary(lists:reverse(Bs))}; + {ok, B} when is_integer(Bs) -> + sendfile_do_recv(Sock, byte_size(B) + Bs); + {error, closed} when is_integer(Bs) -> + {ok, Bs} end. sendfile_file_info(File) -> -- cgit v1.2.3 From 035279e92f9bb3f9601098f6a70ba6d398d6727f Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Mon, 14 Nov 2011 12:24:37 +0100 Subject: Make socket blocking if there are async threads --- erts/emulator/drivers/common/efile_drv.c | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index a318384479..70917119e2 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -99,7 +99,13 @@ # include "config.h" #endif #include + +// Need (NON)BLOCKING macros for sendfile +#ifndef WANT_NONBLOCKING +#define WANT_NONBLOCKING +#endif #include "sys.h" + #include "erl_driver.h" #include "erl_efile.h" #include "erl_threads.h" @@ -1728,12 +1734,14 @@ static void invoke_sendfile(void *data) result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); - /* printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n", - result, errno, d->c.sendfile.offset,nbytes);*/ + //printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n",result, errno, d->c.sendfile.offset,nbytes); d->c.sendfile.written += nbytes; if (result == 1) { - if (d->c.sendfile.nbytes == 0 && nbytes != 0) { + if (sys_info.async_threads != 0) { + printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); + d->result_ok = 0; + } else if (d->c.sendfile.nbytes == 0 && nbytes != 0) { d->result_ok = 1; } else if ((d->c.sendfile.nbytes - nbytes) != 0) { d->result_ok = 1; @@ -2189,14 +2197,20 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); + printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); if (d->result_ok == -1) { desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); + if (sys_info.async_threads != 0) { + SET_NONBLOCKING(d->c.sendfile.out_fd); + } free_sendfile(data); } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; reply_Sint64(desc, d->c.sendfile.written); + if (sys_info.async_threads != 0) { + SET_NONBLOCKING(d->c.sendfile.out_fd); + } free_sendfile(data); } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; @@ -3348,9 +3362,9 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { case FILE_SENDFILE: { - struct t_data *d; - Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; - char flags; + struct t_data *d; + Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + char flags; /* DestFD:32, Offset:64, Bytes:64, ChunkSize:64, @@ -3406,6 +3420,10 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ + if (sys_info.async_threads != 0) { + SET_BLOCKING(d->c.sendfile.out_fd); + } + cq_enq(desc, d); goto done; } /* case FILE_SENDFILE: */ -- cgit v1.2.3 From 8e653ead2f361ce37b6e00b85844a48bd0cab394 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:10:56 +0100 Subject: Fix 32 bit parameters --- erts/emulator/drivers/common/efile_drv.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 70917119e2..0364d0722a 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -3401,7 +3401,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { reply_posix_error(desc, EINVAL); goto done; } - d->c.sendfile.offset = (off_t) offsetT; + d->c.sendfile.offset = (off_t) offsetL; #else d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; #endif @@ -3411,7 +3411,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { reply_posix_error(desc, EINVAL); goto done; } - d->c.sendfile.nbytes = (size_t) nbytesT; + d->c.sendfile.nbytes = (size_t) nbytesL; #else d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; #endif -- cgit v1.2.3 From 1087d64af63d1fd5044d56e864557365ed40aab3 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:12:36 +0100 Subject: Fix cleanup for sendfile --- erts/emulator/drivers/common/efile_drv.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 0364d0722a..98da63f4e3 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -1772,7 +1772,7 @@ static void file_ready_output(ErlDrvData data, ErlDrvEvent event) driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd, (int)ERL_DRV_WRITE,(int) 0); invoke_sendfile((void *)fd->d); - file_async_ready((ErlDrvData)fd, (ErlDrvThreadData)fd->d); + file_async_ready(data, (ErlDrvThreadData)fd->d); break; default: break; @@ -2203,15 +2203,19 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) reply_error(desc, &d->errInfo); if (sys_info.async_threads != 0) { SET_NONBLOCKING(d->c.sendfile.out_fd); + } else { + driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + ERL_DRV_USE, 0); } - free_sendfile(data); } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; reply_Sint64(desc, d->c.sendfile.written); if (sys_info.async_threads != 0) { SET_NONBLOCKING(d->c.sendfile.out_fd); + } else { + driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + ERL_DRV_USE, 0); } - free_sendfile(data); } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; desc->d = d; @@ -3389,7 +3393,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->fd = desc->fd; d->command = command; d->invoke = invoke_sendfile; - d->free = free_sendfile; + d->free = NULL; d->level = 2; d->c.sendfile.out_fd = (int) out_fd; @@ -3422,6 +3426,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { if (sys_info.async_threads != 0) { SET_BLOCKING(d->c.sendfile.out_fd); + d->free = free_sendfile; } cq_enq(desc, d); -- cgit v1.2.3 From eccba49e87ad32248a678d90cfdf355ffd97015d Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:14:10 +0100 Subject: Fix offset calculation for darwin --- erts/emulator/drivers/unix/unix_efile.c | 1 + 1 file changed, 1 insertion(+) diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 3a966757d9..61df572a91 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1494,6 +1494,7 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, #elif defined(DARWIN) off_t len = *nbytes; int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); + *offset += len; *nbytes = len; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) -- cgit v1.2.3 From 59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:16:54 +0100 Subject: Implement ignorefd for TCP Ignore fd is a feature used by sendfile to temporarily remove all driver_select calls on that fd so that another driver can select on it. It also delays all actions which sends or receives data in that fd until in the fd is no longer ignored. Only the controlling_process should use the feature as it is otherwise possible that the ignore will never be cleaned up and hence create a memory leak in the driver. An ignored driver will not detect that an fd has been closed until it is unignored. --- erts/emulator/drivers/common/inet_drv.c | 67 +++++++++++++++++++++++++++++---- erts/preloaded/src/prim_inet.erl | 23 ++++------- lib/kernel/src/gen_tcp.erl | 7 ++-- lib/kernel/src/inet_int.hrl | 2 + 4 files changed, 74 insertions(+), 25 deletions(-) diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 1fe9e04341..56799555a9 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -445,6 +445,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) driver_select(port, e, mode | (on?ERL_DRV_USE:0), on) #define sock_select(d, flags, onoff) do { \ + ASSERT(!onoff || !(d)->is_ignored); \ (d)->event_mask = (onoff) ? \ ((d)->event_mask | (flags)) : \ ((d)->event_mask & ~(flags)); \ @@ -538,6 +539,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_REQ_GETIFADDRS 25 #define INET_REQ_ACCEPT 26 #define INET_REQ_LISTEN 27 +#define INET_REQ_IGNOREFD 28 + /* TCP requests */ /* #define TCP_REQ_ACCEPT 40 MOVED */ /* #define TCP_REQ_LISTEN 41 MERGED */ @@ -725,6 +728,11 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) /* Max interface name */ #define INET_IFNAMSIZ 16 +/* INET Ignore states */ +#define INET_IGNORE_NONE 0 +#define INET_IGNORE_READ 1 +#define INET_IGNORE_WRITE 1 << 1 + /* Max length of Erlang Term Buffer (for outputting structured terms): */ #ifdef HAVE_SCTP #define PACKET_ERL_DRV_TERM_DATA_LEN 512 @@ -864,6 +872,9 @@ typedef struct { double send_avg; /* average packet size sent */ subs_list empty_out_q_subs; /* Empty out queue subscribers */ + int is_ignored; /* if a fd is ignored by from the inet_drv, + this should be set to true when the fd is used + outside of inet_drv. */ } inet_descriptor; @@ -7302,6 +7313,8 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol) sys_memzero((char *)&desc->remote,sizeof(desc->remote)); + desc->is_ignored = 0; + return (ErlDrvData)desc; } @@ -7584,6 +7597,33 @@ static int inet_ctl(inet_descriptor* desc, int cmd, char* buf, int len, return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); } + case INET_REQ_IGNOREFD: { + DEBUGF(("inet_ctl(%ld): IGNOREFD, IGNORED = %d\r\n", + (long)desc->port,(int)*buf)); + + /* + * FD can only be ignored for connected TCP connections for now, + * possible to add UDP and SCTP support if needed. + */ + if (!IS_CONNECTED(desc)) + return ctl_error(ENOTCONN, rbuf, rsize); + + if (!desc->stype == SOCK_STREAM) + return ctl_error(EINVAL, rbuf, rsize); + + if (*buf == 1 && !desc->is_ignored) { + desc->is_ignored = INET_IGNORE_READ; + sock_select(desc, (FD_READ|FD_WRITE|FD_CLOSE|ERL_DRV_USE_NO_CALLBACK), 0); + } else if (*buf == 0 && desc->is_ignored) { + int flags = (FD_READ|FD_CLOSE|((desc->is_ignored & INET_IGNORE_WRITE)?FD_WRITE:0)); + desc->is_ignored = INET_IGNORE_NONE; + sock_select(desc, flags, 1); + } else + return ctl_error(EINVAL, rbuf, rsize); + + return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); + } + #ifndef VXWORKS case INET_REQ_GETSERVBYNAME: { /* L1 Name-String L2 Proto-String */ @@ -7959,6 +7999,7 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, char** rbuf, int rsize) { tcp_descriptor* desc = (tcp_descriptor*)e; + switch(cmd) { case INET_REQ_OPEN: { /* open socket and return internal index */ int domain; @@ -8224,13 +8265,14 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, if (enq_async(INETP(desc), tbuf, TCP_REQ_RECV) < 0) return ctl_error(EALREADY, rbuf, rsize); - if (tcp_recv(desc, n) == 0) { + if (INETP(desc)->is_ignored || tcp_recv(desc, n) == 0) { if (timeout == 0) async_error_am(INETP(desc), am_timeout); else { if (timeout != INET_INFINITY) - driver_set_timer(desc->inet.port, timeout); - sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); + driver_set_timer(desc->inet.port, timeout); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); } } return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); @@ -8970,6 +9012,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) #ifdef DEBUG long port = (long) desc->inet.port; /* Used after driver_exit() */ #endif + ASSERT(!INETP(desc)->is_ignored); DEBUGF(("tcp_inet_input(%ld) {s=%d\r\n", port, desc->inet.s)); if (desc->inet.state == INET_STATE_ACCEPTING) { SOCKET s; @@ -9231,7 +9274,11 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) DEBUGF(("tcp_sendv(%ld): s=%d, about to send %d,%d bytes\r\n", (long)desc->inet.port, desc->inet.s, h_len, len)); - if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { + + if (INETP(desc)->is_ignored) { + INETP(desc)->is_ignored |= INET_IGNORE_WRITE; + n = 0; + } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { n = 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov, vsize, &n, 0))) { @@ -9259,7 +9306,8 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n", (long)desc->inet.port, desc->inet.s)); driver_enqv(ix, ev, n); - sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } return 0; } @@ -9324,7 +9372,10 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len) DEBUGF(("tcp_send(%ld): s=%d, about to send %d,%d bytes\r\n", (long)desc->inet.port, desc->inet.s, h_len, len)); - if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { + if (INETP(desc)->is_ignored) { + INETP(desc)->is_ignored |= INET_IGNORE_WRITE; + n = 0; + } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { sock_send(desc->inet.s, buf, 0, 0); n = 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s,iov,2,&n,0))) { @@ -9355,7 +9406,8 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len) n -= h_len; driver_enq(ix, ptr+n, len-n); } - sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } return 0; } @@ -9379,6 +9431,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) int ret = 0; ErlDrvPort ix = desc->inet.port; + ASSERT(!INETP(desc)->is_ignored); DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n", (long)desc->inet.port, desc->inet.s)); if (desc->inet.state == INET_STATE_CONNECTING) { diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl index 015930c0c0..0cedd284db 100644 --- a/erts/preloaded/src/prim_inet.erl +++ b/erts/preloaded/src/prim_inet.erl @@ -36,7 +36,7 @@ -export([recvfrom/2, recvfrom/3]). -export([setopt/3, setopts/2, getopt/2, getopts/2, is_sockopt_val/2]). -export([chgopt/3, chgopts/2]). --export([getstat/2, getfd/1, stealfd/1, returnfd/1, +-export([getstat/2, getfd/1, ignorefd/2, getindex/1, getstatus/1, gettype/1, getifaddrs/1, getiflist/1, ifget/3, ifset/3, gethostname/1]). @@ -843,25 +843,18 @@ getfd(S) when is_port(S) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% -%% STEALFD(insock()) -> {ok,integer()} | {error, Reason} +%% IGNOREFD(insock(),boolean()) -> {ok,integer()} | {error, Reason} %% %% steal internal file descriptor %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -stealfd(S) when is_port(S) -> - getfd(S). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% -%% RETURNFD(insock()) -> {ok,integer()} | {error, Reason} -%% -%% return internal file descriptor -%% -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -returnfd(S) when is_port(S) -> - ok. +ignorefd(S,Bool) when is_port(S) -> + Val = if Bool -> 1; true -> 0 end, + case ctl_cmd(S, ?INET_REQ_IGNOREFD, [Val]) of + {ok, _} -> ok; + Error -> Error + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 26afed4ff9..3f1e432f7a 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -420,15 +420,16 @@ mod([], Address) -> sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) when is_port(Sock) -> - {ok,SockFd} = prim_inet:stealfd(Sock), + ok = prim_inet:ignorefd(Sock,true), + {ok, SockFd} = prim_inet:getfd(Sock), case Mod:sendfile(Fd, SockFd, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) of {error, enotsup} -> - prim_inet:returnfd(Sock), + ok = prim_inet:ignorefd(Sock,false), sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers); Else -> - prim_inet:returnfd(Sock), + ok = prim_inet:ignorefd(Sock,false), Else end; sendfile(_,_,_,_,_,_,_,_,_,_) -> diff --git a/lib/kernel/src/inet_int.hrl b/lib/kernel/src/inet_int.hrl index f8984b13fe..cf893c73eb 100644 --- a/lib/kernel/src/inet_int.hrl +++ b/lib/kernel/src/inet_int.hrl @@ -85,6 +85,8 @@ -define(INET_REQ_GETIFADDRS, 25). -define(INET_REQ_ACCEPT, 26). -define(INET_REQ_LISTEN, 27). +-define(INET_REQ_IGNOREFD, 28). + %% TCP requests %%-define(TCP_REQ_ACCEPT, 40). MOVED %%-define(TCP_REQ_LISTEN, 41). MERGED -- cgit v1.2.3 From 23d62043ebf4bfad900935c650e8fcb3f2e6f88c Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:19:54 +0100 Subject: Change nbytes to 64 bit --- erts/emulator/drivers/common/efile_drv.c | 26 ++++++---------- erts/emulator/drivers/common/erl_efile.h | 2 +- erts/emulator/drivers/unix/unix_efile.c | 53 ++++++++++++++++++++------------ 3 files changed, 44 insertions(+), 37 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 98da63f4e3..5d785a75c4 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -421,7 +421,7 @@ struct t_data struct { int out_fd; off_t offset; - size_t nbytes; + Uint64 nbytes; Uint64 written; short flags; struct t_sendfile_hdtl *hdtl; @@ -1728,7 +1728,7 @@ static void invoke_sendfile(void *data) struct t_data *d = (struct t_data *)data; int fd = d->fd; int out_fd = d->c.sendfile.out_fd; - size_t nbytes = d->c.sendfile.nbytes; + Uint64 nbytes = d->c.sendfile.nbytes; int result = 0; d->again = 0; @@ -2197,12 +2197,13 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); + //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); if (d->result_ok == -1) { desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); if (sys_info.async_threads != 0) { SET_NONBLOCKING(d->c.sendfile.out_fd); + free_sendfile(data); } else { driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, ERL_DRV_USE, 0); @@ -2212,6 +2213,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) reply_Sint64(desc, d->c.sendfile.written); if (sys_info.async_threads != 0) { SET_NONBLOCKING(d->c.sendfile.out_fd); + free_sendfile(data); } else { driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, ERL_DRV_USE, 0); @@ -3367,7 +3369,8 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { case FILE_SENDFILE: { struct t_data *d; - Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + Uint32 out_fd, offsetH, offsetL; + Uint64 nbytes; char flags; /* DestFD:32, Offset:64, Bytes:64, @@ -3382,8 +3385,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { || !EV_GET_CHAR(ev, &flags, &p, &q) || !EV_GET_UINT32(ev, &offsetH, &p, &q) || !EV_GET_UINT32(ev, &offsetL, &p, &q) - || !EV_GET_UINT32(ev, &nbytesH, &p, &q) - || !EV_GET_UINT32(ev, &nbytesL, &p, &q)) { + || !EV_GET_UINT64(ev, &nbytes, &p, &q)) { /* Buffer has wrong length to contain all the needed values */ reply_posix_error(desc, EINVAL); goto done; @@ -3410,17 +3412,9 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; #endif - #if SIZEOF_SIZE_T == 4 - if (nbytesH != 0) { - reply_posix_error(desc, EINVAL); - goto done; - } - d->c.sendfile.nbytes = (size_t) nbytesL; - #else - d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; - #endif + d->c.sendfile.nbytes = nbytes; - printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); + printf("sendfile(nbytes => %ld, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index e0b8cfca03..8e79b3923a 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -163,4 +163,4 @@ int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *nbytes); + off_t *offset, Uint64 *nbytes); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 61df572a91..911ec63588 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1469,33 +1469,46 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, } #ifdef HAVE_SENDFILE -#define SENDFILE_CHUNK_SIZE ((1 << 30) - 1) int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *nbytes) + off_t *offset, Uint64 *nbytes) { + // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); + Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval, written = 0; - // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); - if (*nbytes == 0) { - do { - *nbytes = SENDFILE_CHUNK_SIZE; // chunk size - retval = sendfile(out_fd, in_fd, offset, *nbytes); - if (retval > 0) - written += retval; - } while (retval == SENDFILE_CHUNK_SIZE); - } else { - retval = sendfile(out_fd, in_fd, offset, *nbytes); - if (retval > 0) - written = retval; - } +#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_SIZE_T)) - 1) + ssize_t retval; + do { + // check if *nbytes is 0 or greater than the largest size_t + if (*nbytes == 0 || *nbytes > SENDFILE_CHUNK_SIZE) + retval = sendfile(out_fd, in_fd, offset, SENDFILE_CHUNK_SIZE); + else + retval = sendfile(out_fd, in_fd, offset, *nbytes); + if (retval > 0) { + written += retval; + *nbytes -= retval; + } + } while (retval == SENDFILE_CHUNK_SIZE); *nbytes = written; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) - off_t len = *nbytes; - int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); - *offset += len; - *nbytes = len; +#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_OFF_T)) - 1) + int retval; + off_t len; + do { + // check if *nbytes is 0 or greater than the largest off_t + if(*nbytes > SENDFILE_CHUNK_SIZE) + len = SENDFILE_CHUNK_SIZE; + else + len = *nbytes; + retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); + if (retval != -1 || errno == EAGAIN || errno == EINTR) { + *offset += len; + *nbytes -= len; + written += len; + } + } while (len == SENDFILE_CHUNK_SIZE); + *nbytes = written; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) off_t len = 0; -- cgit v1.2.3 From a508d712553761d3cdc69d5e14c09ba3a6530d7a Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Tue, 15 Nov 2011 10:32:03 +0100 Subject: Fix freebsd support for sendfile --- erts/emulator/drivers/unix/unix_efile.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 911ec63588..14b7a5cffa 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1511,9 +1511,22 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, *nbytes = written; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) - off_t len = 0; - int retval = sendfile(in_fd, out_fd, *offset, *nbytes, NULL, &len, 0); - *nbytes = len; +#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_SIZE_T)) - 1) + off_t len; + int retval; + do { + if (*nbytes > SENDFILE_CHUNK_SIZE) + retval = sendfile(in_fd, out_fd, *offset, SENDFILE_CHUNK_SIZE, + NULL, &len, 0); + else + retval = sendfile(in_fd, out_fd, *offset, *nbytes, NULL, &len, 0); + if (retval != -1 || errno == EAGAIN || errno == EINTR) { + *offset += len; + *nbytes -= len; + written += len; + } + } while(len == SENDFILE_CHUNK_SIZE); + *nbytes = written; return check_error(retval, errInfo); #endif } -- cgit v1.2.3 From a0d3a833cbd70971aa0c79da9853502e6631524d Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:02:57 +0100 Subject: Add ifdef's for HAVE_SENDFILE --- erts/emulator/drivers/common/efile_drv.c | 42 ++++++++++++++++++++++++++++---- erts/emulator/drivers/common/erl_efile.h | 2 ++ erts/emulator/drivers/unix/unix_efile.c | 12 ++------- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 5d785a75c4..b26e244312 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -224,7 +224,6 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); -static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -232,12 +231,17 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); -static void file_stop_select(ErlDrvEvent event, void* _); +#ifdef HAVE_SENDFILE +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); +static void file_stop_select(ErlDrvEvent event, void* _); +#endif /* HAVE_SENDFILE */ enum e_timer {timer_idle, timer_again, timer_write}; +#ifdef HAVE_SENDFILE enum e_sendfile {sending, not_sending}; +#endif /* HAVE_SENDFILE */ struct t_data; @@ -252,7 +256,9 @@ typedef struct { struct t_data *cq_head; /* Queue of incoming commands */ struct t_data *cq_tail; /* -""- */ enum e_timer timer_state; +#ifdef HAVE_SENDFILE enum e_sendfile sendfile_state; +#endif /* HAVE_SENDFILE */ size_t read_bufsize; ErlDrvBinary *read_binp; size_t read_offset; @@ -275,7 +281,11 @@ struct erl_drv_entry efile_driver_entry = { file_stop, file_output, NULL, +#ifdef HAVE_SENDFILE file_ready_output, +#else + NULL, +#endif /* HAVE_SENDFILE */ "efile", NULL, NULL, @@ -292,7 +302,11 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, NULL, +#ifdef HAVE_SENDFILE file_stop_select +#else + NULL +#endif /* HAVE_SENDFILE */ }; @@ -344,12 +358,14 @@ struct t_readdir_buf { char buf[READDIR_BUFSIZE]; }; +#ifdef HAVE_SENDFILE struct t_sendfile_hdtl { int hdr_cnt; /* number of header iovecs */ struct iovec *headers; /* pointer to header iovecs */ int trl_cnt; /* number of trailer iovecs */ struct iovec *trailers; /* pointer to trailer iovecs */ }; +#endif /* HAVE_SENDFILE */ struct t_data { @@ -418,6 +434,7 @@ struct t_data Sint64 length; int advise; } fadvise; +#ifdef HAVE_SENDFILE struct { int out_fd; off_t offset; @@ -426,6 +443,7 @@ struct t_data short flags; struct t_sendfile_hdtl *hdtl; } sendfile; +#endif /* HAVE_SENDFILE */ } c; char b[1]; }; @@ -683,7 +701,9 @@ file_start(ErlDrvPort port, char* command) desc->cq_head = NULL; desc->cq_tail = NULL; desc->timer_state = timer_idle; +#ifdef HAVE_SENDFILE desc->sendfile_state = not_sending; +#endif desc->read_bufsize = 0; desc->read_binp = NULL; desc->read_offset = 0; @@ -1723,6 +1743,7 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } +#ifdef HAVE_SENDFILE static void invoke_sendfile(void *data) { struct t_data *d = (struct t_data *)data; @@ -1781,8 +1802,10 @@ static void file_ready_output(ErlDrvData data, ErlDrvEvent event) static void file_stop_select(ErlDrvEvent event, void* _) { - /* TODO: close socket? */ + } +#endif /* HAVE_SENDFILE */ + static void free_readdir(void *data) { @@ -1843,9 +1866,12 @@ static int try_again(file_descriptor *desc, struct t_data *d) { static void cq_execute(file_descriptor *desc) { struct t_data *d; register void *void_ptr; /* Soft cast variable */ - if (desc->timer_state == timer_again || - desc->sendfile_state == sending) + if (desc->timer_state == timer_again) return; +#ifdef HAVE_SENDFILE + if (desc->sendfile_state == sending) + return; +#endif if (! (d = cq_deq(desc))) return; TRACE_F(("x%i", (int) d->command)); @@ -2196,6 +2222,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } free_preadv(data); break; +#ifdef HAVE_SENDFILE case FILE_SENDFILE: //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); if (d->result_ok == -1) { @@ -2225,6 +2252,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) ERL_DRV_USE|ERL_DRV_WRITE, 1); } break; +#endif default: abort(); } @@ -3368,6 +3396,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { case FILE_SENDFILE: { +#ifdef HAVE_SENDFILE struct t_data *d; Uint32 out_fd, offsetH, offsetL; Uint64 nbytes; @@ -3424,6 +3453,9 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { } cq_enq(desc, d); +#else + reply_posix_error(desc, ENOTSUP); +#endif goto done; } /* case FILE_SENDFILE: */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 8e79b3923a..fd6dc94755 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,5 +162,7 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); +#ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, Uint64 *nbytes); +#endif /* HAVE_SENDFILE */ diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 14b7a5cffa..dc118c9b9f 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -33,7 +33,7 @@ #include #include #endif -#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) +#if defined(HAVE_SENDFILE) && (defined(__linux__) || (defined(__sun) && defined(__SVR4))) #include #endif @@ -1530,12 +1530,4 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, return check_error(retval, errInfo); #endif } -#else /* no sendfile() */ -int -efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *count) -{ - errno = ENOTSUP; - return check_error(-1, errInfo); -} -#endif +#endif /* HAVE_SENDFILE */ -- cgit v1.2.3 From ce8fb42d7e92a95666e40614684232d476509cbe Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:04:59 +0100 Subject: Change type of fd to be ErlDrvEvent --- erts/emulator/drivers/common/efile_drv.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index b26e244312..c4f92cc318 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -1748,7 +1748,7 @@ static void invoke_sendfile(void *data) { struct t_data *d = (struct t_data *)data; int fd = d->fd; - int out_fd = d->c.sendfile.out_fd; + int out_fd = (int)d->c.sendfile.out_fd; Uint64 nbytes = d->c.sendfile.nbytes; int result = 0; d->again = 0; @@ -1790,7 +1790,7 @@ static void file_ready_output(ErlDrvData data, ErlDrvEvent event) switch (fd->d->command) { case FILE_SENDFILE: - driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd, + driver_select(fd->port, event, (int)ERL_DRV_WRITE,(int) 0); invoke_sendfile((void *)fd->d); file_async_ready(data, (ErlDrvThreadData)fd->d); @@ -2232,8 +2232,8 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) SET_NONBLOCKING(d->c.sendfile.out_fd); free_sendfile(data); } else { - driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, - ERL_DRV_USE, 0); + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, + ERL_DRV_USE, 0); } } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; @@ -2242,13 +2242,12 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) SET_NONBLOCKING(d->c.sendfile.out_fd); free_sendfile(data); } else { - driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, - ERL_DRV_USE, 0); + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0); } } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; desc->d = d; - driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE|ERL_DRV_WRITE, 1); } break; -- cgit v1.2.3 From 16b395a11ddc45ee8a36324ed0fb543f4065fc76 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:05:34 +0100 Subject: Set chunk size to 3 GB It is not possible to use the maximum size_t/off_t for the chunks as that causes sendfile to return einval. 3GB seems to work on all *nix platforms. --- erts/emulator/drivers/unix/unix_efile.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index dc118c9b9f..8db7f2336e 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1469,6 +1469,8 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, } #ifdef HAVE_SENDFILE +#define SENDFILE_CHUNK_SIZE ((1 << 30) -1) + int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, Uint64 *nbytes) @@ -1476,7 +1478,6 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) -#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_SIZE_T)) - 1) ssize_t retval; do { // check if *nbytes is 0 or greater than the largest size_t @@ -1488,11 +1489,10 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, written += retval; *nbytes -= retval; } - } while (retval == SENDFILE_CHUNK_SIZE); + } while (retval != -1 && retval == SENDFILE_CHUNK_SIZE); *nbytes = written; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) -#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_OFF_T)) - 1) int retval; off_t len; do { @@ -1511,7 +1511,6 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, *nbytes = written; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) -#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_SIZE_T)) - 1) off_t len; int retval; do { -- cgit v1.2.3 From c68746bda431c5a068e6bb4a93bfe5ae77ce2d9a Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:07:48 +0100 Subject: Remove support for file_server, sendfile has to be raw Because the sending process has to be the controlling process of the tcp socket used to send data it is not possible to use the file_server --- lib/kernel/src/file_io_server.erl | 9 --------- lib/kernel/src/gen_tcp.erl | 18 +++++------------- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/lib/kernel/src/file_io_server.erl b/lib/kernel/src/file_io_server.erl index 7280635f53..14da9c1a55 100644 --- a/lib/kernel/src/file_io_server.erl +++ b/lib/kernel/src/file_io_server.erl @@ -249,15 +249,6 @@ file_request(close, file_request({position,At}, #state{handle=Handle,buf=Buf}=State) -> std_reply(position(Handle, At, Buf), State); -file_request({sendfile,DestSock,Offset,Bytes,Opts}, - #state{handle=Handle}=State) -> - %% gen_tcp will call prim_file:sendfile with correct arguments - case gen_tcp:sendfile(Handle, DestSock, Offset, Bytes, Opts) of - {error,_}=Reply -> - {stop,normal,Reply,State}; - Reply -> - {reply,Reply,State} - end; file_request(truncate, #state{handle=Handle}=State) -> case ?PRIM_FILE:truncate(Handle) of diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 3f1e432f7a..9dd70ce5cc 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -317,23 +317,15 @@ unrecv(S, Data) when is_port(S) -> -define(MAX_CHUNK_SIZE, (1 bsl 20)*20). %% 20 MB, has to fit in primary memory -spec sendfile(File, Sock, Offset, Bytes, Opts) -> - {'ok', non_neg_integer()} | {'error', inet:posix()} when - File :: file:io_device(), + {'ok', non_neg_integer()} | {'error', inet:posix() } | + {'error', not_owner} when + File :: file:fd(), Sock :: socket(), Offset :: non_neg_integer(), Bytes :: non_neg_integer(), Opts :: [sendfile_option()]. -sendfile(File, Sock, Offset, Bytes, Opts) when is_pid(File) -> - Ref = erlang:monitor(process, File), - File ! {file_request,self(),File, - {sendfile,Sock,Offset,Bytes,Opts}}, - receive - {file_reply,File,Reply} -> - erlang:demonitor(Ref,[flush]), - Reply; - {'DOWN', Ref, _, _, _} -> - {error, terminated} - end; +sendfile(File, _Sock, _Offet, _Bytes, _Opts) when is_pid(File) -> + {error, badarg}; sendfile(File, Sock, Offset, Bytes, []) -> sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, undefined, undefined, false, false, false); -- cgit v1.2.3 From 06e77d9baa4c631bc329aa1fafb29ee55f66d906 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:09:32 +0100 Subject: sendfile caller now has to be the controlling_process This is needed because otherwise there could be scenarios when the efile driver blocks a fd and then crashes without the inet driver ever finding out. Now when the process crashes the port will close and we can cleanup in the inet driver. --- lib/kernel/src/gen_tcp.erl | 6 +++--- lib/kernel/src/inet.erl | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 9dd70ce5cc..2eaa44b966 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -412,16 +412,16 @@ mod([], Address) -> sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) when is_port(Sock) -> - ok = prim_inet:ignorefd(Sock,true), + ok = inet:lock_socket(Sock,true), {ok, SockFd} = prim_inet:getfd(Sock), case Mod:sendfile(Fd, SockFd, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) of {error, enotsup} -> - ok = prim_inet:ignorefd(Sock,false), + ok = inet:lock_socket(Sock,false), sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers); Else -> - ok = prim_inet:ignorefd(Sock,false), + ok = inet:lock_socket(Sock,false), Else end; sendfile(_,_,_,_,_,_,_,_,_,_) -> diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl index b60c68e3a1..49f64a9236 100644 --- a/lib/kernel/src/inet.erl +++ b/lib/kernel/src/inet.erl @@ -40,6 +40,10 @@ -export([tcp_controlling_process/2, udp_controlling_process/2, tcp_close/1, udp_close/1]). + +%% used by sendfile +-export([lock_socket/2]). + %% used by socks5 -export([setsockname/2, setpeername/2]). @@ -1353,3 +1357,14 @@ stop_timer(Timer) -> end; T -> T end. + + +lock_socket(S,Val) -> + case erlang:port_info(S, connected) of + {connected, Pid} when Pid =/= self() -> + {error, not_owner}; + undefined -> + {error, einval}; + _ -> + prim_inet:ignorefd(S,Val) + end. -- cgit v1.2.3 From abbf1a7b6825f0d62a1efd71e03942cb5170e9d1 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:12:22 +0100 Subject: Remove tests for file_server sendfile --- lib/kernel/test/gen_tcp_api_SUITE.erl | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index 1a88b2e17f..9e53a900d7 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -51,10 +51,7 @@ groups() -> [{t_accept, [], [t_accept_timeout]}, {t_connect, [], [t_connect_timeout, t_connect_bad]}, {t_recv, [], [t_recv_timeout, t_recv_eof]}, - {t_sendfile, [], [{group, t_sendfile_raw}, - {group, t_sendfile_ioserv}]}, - {t_sendfile_raw, [], sendfile_all()}, - {t_sendfile_ioserv, [], sendfile_all()}]. + {t_sendfile, [], [{group, sendfile_all()}]}]. sendfile_all() -> [ @@ -76,8 +73,8 @@ end_per_suite(_Config) -> init_per_group(t_sendfile, Config) -> Priv = ?config(priv_dir, Config), SFilename = filename:join(Priv, "sendfile_small.html"), - {ok, DS} = file:open(SFilename,[write]), - io:format(DS,"yo baby yo",[]), + {ok, DS} = file:open(SFilename,[write,raw]), + file:write(DS,"yo baby yo"), file:sync(DS), file:close(DS), BFilename = filename:join(Priv, "sendfile_big.html"), @@ -85,9 +82,9 @@ init_per_group(t_sendfile, Config) -> [file:write(DB,[<<0:(10*8*1024*1024)>>]) || _I <- lists:seq(1,51)], file:sync(DB), file:close(DB), - [{small_file, SFilename},{big_file, BFilename}|Config]; -init_per_group(t_sendfile_raw, Config) -> - [{file_opts, [raw]}|Config]; + [{small_file, SFilename}, + {file_opts,[raw,binary]}, + {big_file, BFilename}|Config]; init_per_group(_GroupName, Config) -> Config. @@ -314,7 +311,7 @@ t_sendfile_partial(Config) -> {_Size, <>} = sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read,binary|FileOpts]), + {ok,D} = file:open(Filename,[read|FileOpts]), {ok, <>} = file:read(D,5), FSend = fun(Sock) -> {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]), @@ -341,7 +338,7 @@ t_sendfile_offset(Config) -> Send = fun(Sock) -> {_Size, <<_:5/binary,Data:3/binary,_/binary>> = AllData} = sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read,binary|FileOpts]), + {ok,D} = file:open(Filename,[read|FileOpts]), {ok,3} = gen_tcp:sendfile(D,Sock,5,3,[]), {ok, AllData} = file:read(D,100), file:close(D), -- cgit v1.2.3 From 16338730fe6400f536c0b816abbf75b27f07eb00 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:13:06 +0100 Subject: Add tests for send/recv/sendfile interactions Tests for when a gen_tcp:send/recv is ordered while a sendfile is in progress. --- lib/kernel/test/gen_tcp_api_SUITE.erl | 100 +++++++++++++++++++++++++++++----- 1 file changed, 87 insertions(+), 13 deletions(-) diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index 9e53a900d7..876d266c4b 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -36,7 +36,9 @@ t_fdopen/1, t_implicit_inet6/1, t_sendfile_small/1, t_sendfile_big/1, t_sendfile_hdtl/1, t_sendfile_partial/1, - t_sendfile_offset/1]). + t_sendfile_offset/1, t_sendfile_sendafter/1, + t_sendfile_recvafter/1, t_sendfile_sendduring/1, + t_sendfile_recvduring/1]). -export([sendfile_server/2]). @@ -60,6 +62,10 @@ sendfile_all() -> % ,t_sendfile_hdtl ,t_sendfile_partial ,t_sendfile_offset + ,t_sendfile_sendafter + ,t_sendfile_recvafter + ,t_sendfile_sendduring + ,t_sendfile_recvduring ]. % [t_sendfile_big]. % [t_sendfile_small]. @@ -388,6 +394,64 @@ t_sendfile_hdtl(Config) -> end, ok = sendfile_send(SendTl). +t_sendfile_sendafter(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = gen_tcp:sendfile(Filename, Sock), + ok = gen_tcp:send(Sock, <<2>>), + <> + end, + + ok = sendfile_send(Send). + +t_sendfile_recvafter(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = gen_tcp:sendfile(Filename, Sock), + ok = gen_tcp:send(Sock, <<1>>), + {ok,<<1>>} = gen_tcp:recv(Sock, 1), + <> + end, + + ok = sendfile_send(Send). + +t_sendfile_sendduring(Config) -> + Filename = proplists:get_value(big_file, Config), + + Send = fun(Sock) -> + {ok, #file_info{size = Size}} = + file:read_file_info(Filename), + spawn_link(fun() -> + timer:sleep(10), + ok = gen_tcp:send(Sock, <<2>>) + end), + {ok, Size} = gen_tcp:sendfile(Filename, Sock), + Size+1 + end, + + ok = sendfile_send("localhost", Send, 0). + +t_sendfile_recvduring(Config) -> + Filename = proplists:get_value(big_file, Config), + + Send = fun(Sock) -> + {ok, #file_info{size = Size}} = + file:read_file_info(Filename), + spawn_link(fun() -> + timer:sleep(10), + ok = gen_tcp:send(Sock, <<1>>), + {ok,<<1>>} = gen_tcp:recv(Sock, 1) + end), + {ok, Size} = gen_tcp:sendfile(Filename, Sock), + timer:sleep(1000), + Size+1 + end, + + ok = sendfile_send("localhost", Send, 0). %% TODO: consolidate tests and reduce code sendfile_send(Send) -> @@ -399,7 +463,8 @@ sendfile_send(Host, Send, Orig) -> receive {server, Port} -> {ok, Sock} = gen_tcp:connect(Host, Port, - [binary,{packet,0}]), + [binary,{packet,0}, + {active,false}]), Data = Send(Sock), ok = gen_tcp:close(Sock), receive @@ -411,27 +476,36 @@ sendfile_send(Host, Send, Orig) -> sendfile_server(ClientPid, Orig) -> {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, - {active, false}, - {reuseaddr, true}]), + {active, true}, + {reuseaddr, true}]), {ok, Port} = inet:port(LSock), ClientPid ! {server, Port}, {ok, Sock} = gen_tcp:accept(LSock), {ok, Bin} = sendfile_do_recv(Sock, Orig), - ok = gen_tcp:close(Sock), - ClientPid ! {ok, Bin}. + ClientPid ! {ok, Bin}, + gen_tcp:send(Sock, <<1>>). -define(SENDFILE_TIMEOUT, 10000). %% f(),{ok, S} = gen_tcp:connect("localhost",7890,[binary]),gen_tcp:sendfile("/ldisk/lukas/otp/sendfiletest.dat",S). sendfile_do_recv(Sock, Bs) -> - case gen_tcp:recv(Sock, 0, ?SENDFILE_TIMEOUT) of - {ok, B} when is_list(Bs) -> - sendfile_do_recv(Sock, [B|Bs]); - {error, closed} when is_list(Bs) -> + receive + {tcp, Sock, B} -> + case binary:match(B,<<1>>) of + nomatch when is_list(Bs) -> + sendfile_do_recv(Sock, [B|Bs]); + nomatch when is_integer(Bs) -> + sendfile_do_recv(Sock, byte_size(B) + Bs); + _ when is_list(Bs) -> + {ok, iolist_to_binary(lists:reverse([B|Bs]))}; + _ when is_integer(Bs) -> + {ok, byte_size(B) + Bs} + end; + {tcp_closed, Sock} when is_list(Bs) -> {ok, iolist_to_binary(lists:reverse(Bs))}; - {ok, B} when is_integer(Bs) -> - sendfile_do_recv(Sock, byte_size(B) + Bs); - {error, closed} when is_integer(Bs) -> + {tcp_closed, Sock} when is_integer(Bs) -> {ok, Bs} + after ?SENDFILE_TIMEOUT -> + timeout end. sendfile_file_info(File) -> -- cgit v1.2.3 From 82ed2e3d5d959b0dce61056c648bead385c77d65 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 12:20:51 +0100 Subject: Remove debug printouts --- erts/emulator/drivers/common/efile_drv.c | 7 ------- erts/emulator/drivers/unix/unix_efile.c | 1 - 2 files changed, 8 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index c4f92cc318..b807f110eb 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -1755,12 +1755,10 @@ static void invoke_sendfile(void *data) result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); - //printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n",result, errno, d->c.sendfile.offset,nbytes); d->c.sendfile.written += nbytes; if (result == 1) { if (sys_info.async_threads != 0) { - printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); d->result_ok = 0; } else if (d->c.sendfile.nbytes == 0 && nbytes != 0) { d->result_ok = 1; @@ -1768,7 +1766,6 @@ static void invoke_sendfile(void *data) d->result_ok = 1; d->c.sendfile.nbytes -= nbytes; } else { - printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); d->result_ok = 0; } } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN @@ -1776,7 +1773,6 @@ static void invoke_sendfile(void *data) d->result_ok = 1; } else { d->result_ok = -1; - printf("==> sendfile ERROR %s\r\n", erl_errno_id(d->errInfo.posix_errno)); } } @@ -2224,7 +2220,6 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) break; #ifdef HAVE_SENDFILE case FILE_SENDFILE: - //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); if (d->result_ok == -1) { desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); @@ -3442,8 +3437,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->c.sendfile.nbytes = nbytes; - printf("sendfile(nbytes => %ld, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); - /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ if (sys_info.async_threads != 0) { diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 8db7f2336e..01de088b0f 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1475,7 +1475,6 @@ int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, Uint64 *nbytes) { - // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) ssize_t retval; -- cgit v1.2.3 From 9322161952ed25a96578f163cc383be605e7f75c Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 12:21:08 +0100 Subject: Use free_sendfile explicitly for non-async This is needed because aync job will not call free_sendfile if there is an async_ready callback. --- erts/emulator/drivers/common/efile_drv.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index b807f110eb..1f13a65350 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -2229,6 +2229,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } else { driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0); + free_sendfile(data); } } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; @@ -2238,6 +2239,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_sendfile(data); } else { driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0); + free_sendfile(data); } } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; @@ -3441,7 +3443,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { if (sys_info.async_threads != 0) { SET_BLOCKING(d->c.sendfile.out_fd); - d->free = free_sendfile; } cq_enq(desc, d); -- cgit v1.2.3 From a5b3d81936ab85edb8713f29baf85307ae0b25b8 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 20:29:59 +0100 Subject: Preliminary work on header/trailer Have to figure out how to represent progress in header writing when using non-blocking, not sure how to do this. --- erts/emulator/drivers/common/efile_drv.c | 68 +++++++++++++++++++++++++------- erts/emulator/drivers/common/erl_efile.h | 15 ++++++- erts/emulator/drivers/unix/unix_efile.c | 2 +- erts/preloaded/src/prim_file.erl | 35 +++++----------- lib/kernel/src/gen_tcp.erl | 14 ++++--- lib/kernel/test/gen_tcp_api_SUITE.erl | 2 +- 6 files changed, 87 insertions(+), 49 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 1f13a65350..7eaafd5af1 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -358,15 +358,6 @@ struct t_readdir_buf { char buf[READDIR_BUFSIZE]; }; -#ifdef HAVE_SENDFILE -struct t_sendfile_hdtl { - int hdr_cnt; /* number of header iovecs */ - struct iovec *headers; /* pointer to header iovecs */ - int trl_cnt; /* number of trailer iovecs */ - struct iovec *trailers; /* pointer to trailer iovecs */ -}; -#endif /* HAVE_SENDFILE */ - struct t_data { struct t_data *next; @@ -530,6 +521,8 @@ static void *ef_safe_realloc(void *op, Uint s) !0) \ : 0) +/* int EV_GET_SYSIOVEC(ErlIoVec *ev, Uint32, int *cnt, SysIOVec **target, int *pp, int *qp) */ +#define EV_GET_SYSIOVEC ev_get_sysiovec #if 0 @@ -943,6 +936,41 @@ static int reply_eof(file_descriptor *desc) { return 0; } +static int ev_get_sysiovec(ErlIOVec *ev, Uint32 len, int *cnt, SysIOVec **target, int *pp, int *qp) { + int tmp_p = *pp, tmp_q = *qp, tmp_len = len, i; + SysIOVec *tmp_target; + while (tmp_len != 0) { + if (tmp_len + tmp_p > ev->iov[tmp_q].iov_len) { + + tmp_len -= ev->iov[tmp_q].iov_len - tmp_p; + tmp_q++; + tmp_p = 0; + if (tmp_q == ev->vsize) + return 0; + } else break; + } + *cnt = tmp_q - *qp + 1; + tmp_target = EF_SAFE_ALLOC(sizeof(SysIOVec)* (*cnt)); + *target = tmp_target; + for (i = 0; i < *cnt; i++) { + tmp_target[i].iov_base = ev->iov[*qp].iov_base+*pp; + if (len + *pp <= ev->iov[*qp].iov_len) { + tmp_target[i].iov_len = len; + if (len + *pp == ev->iov[*qp].iov_len) { + *pp = 0; + (*qp)++; + } else + *pp += len; + } else { + tmp_target[i].iov_len = ev->iov[*qp].iov_len - *pp; + len -= ev->iov[*qp].iov_len - *pp; + *pp = 0; + (*qp)++; + } + } + return 1; +} + static void invoke_name(void *data, int (*f)(Efile_error *, char *)) @@ -1753,7 +1781,7 @@ static void invoke_sendfile(void *data) int result = 0; d->again = 0; - result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); + result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes, &d->c.sendfile.hdtl); d->c.sendfile.written += nbytes; @@ -3394,7 +3422,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { #ifdef HAVE_SENDFILE struct t_data *d; - Uint32 out_fd, offsetH, offsetL; + Uint32 out_fd, offsetH, offsetL, hd_len, tl_len; Uint64 nbytes; char flags; @@ -3410,7 +3438,9 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { || !EV_GET_CHAR(ev, &flags, &p, &q) || !EV_GET_UINT32(ev, &offsetH, &p, &q) || !EV_GET_UINT32(ev, &offsetL, &p, &q) - || !EV_GET_UINT64(ev, &nbytes, &p, &q)) { + || !EV_GET_UINT64(ev, &nbytes, &p, &q) + || !EV_GET_UINT32(ev, &hd_len, &p, &q) + || !EV_GET_UINT32(ev, &tl_len, &p, &q)) { /* Buffer has wrong length to contain all the needed values */ reply_posix_error(desc, EINVAL); goto done; @@ -3438,8 +3468,18 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { #endif d->c.sendfile.nbytes = nbytes; - - /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ + if (hd_len == 0 && tl_len == 0) + d->c.sendfile.hdtl = NULL; + else { + d->c.sendfile.hdtl = EF_SAFE_ALLOC(sizeof(struct t_sendfile_hdtl)); + if (!EV_GET_SYSIOVEC(ev, hd_len, &d->c.sendfile.hdtl->hdr_cnt, &d->c.sendfile.hdtl->headers, &p, &q) + || !EV_GET_SYSIOVEC(ev, tl_len, &d->c.sendfile.hdtl->trl_cnt, &d->c.sendfile.hdtl->trailers, &p, &q)) { + EF_FREE(d->c.sendfile.hdtl); + EF_FREE(d); + reply_posix_error(desc, EINVAL); + goto done; + } + } if (sys_info.async_threads != 0) { SET_BLOCKING(d->c.sendfile.out_fd); diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index fd6dc94755..b73fb35120 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -118,6 +118,19 @@ typedef struct _Efile_info { */ } Efile_info; + +#ifdef HAVE_SENDFILE +/* + * Described the structure of header/trailers for sendfile + */ +struct t_sendfile_hdtl { + SysIOVec *headers; + int hdr_cnt; + SysIOVec *trailers; + int trl_cnt; +}; +#endif /* HAVE_SENDFILE */ + /* * Functions. */ @@ -164,5 +177,5 @@ int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, Uint64 *nbytes); + off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl **hdtl); #endif /* HAVE_SENDFILE */ diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 01de088b0f..138c550fdd 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1473,7 +1473,7 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, Uint64 *nbytes) + off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl** hdtl) { Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index 6bdf5f6e2e..fb19521382 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -547,38 +547,21 @@ write_file(_, _) -> sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, DestFD, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) -> - drv_command(Port, <>). + drv_command(Port, [<>, + Headers,Trailers]). get_bit(true) -> 1; get_bit(false) -> 0. -encode_hdtl(undefined) -> - <<0>>; -encode_hdtl([]) -> - <<0>>; -encode_hdtl(List) -> - encode_hdtl(List,<<>>,0). - -encode_hdtl([], Acc, Cnt) -> - <>; -encode_hdtl([Bin|T], Acc, Cnt) when is_binary(Bin) -> - encode_hdtl(T, <<(byte_size(Bin)):32, Bin/binary, Acc/binary>>,Cnt + 1); -encode_hdtl([Bin|T], Acc, Cnt) -> - encode_hdtl(T, <<(iolist_size(Bin)):32, (iolist_to_binary(Bin))/binary, - Acc/binary>>,Cnt + 1). - - - %%%----------------------------------------------------------------- diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 2eaa44b966..78e3ab3697 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -327,7 +327,7 @@ unrecv(S, Data) when is_port(S) -> sendfile(File, _Sock, _Offet, _Bytes, _Opts) when is_pid(File) -> {error, badarg}; sendfile(File, Sock, Offset, Bytes, []) -> - sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, undefined, undefined, + sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, [], [], false, false, false); sendfile(File, Sock, Offset, Bytes, Opts) -> ChunkSize0 = proplists:get_value(chunk_size, Opts, ?MAX_CHUNK_SIZE), @@ -335,8 +335,8 @@ sendfile(File, Sock, Offset, Bytes, Opts) -> ?MAX_CHUNK_SIZE; true -> ChunkSize0 end, - Headers = proplists:get_value(headers, Opts), - Trailers = proplists:get_value(trailers, Opts), + Headers = proplists:get_value(headers, Opts, []), + Trailers = proplists:get_value(trailers, Opts, []), sendfile(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, lists:member(sf_nodiskio,Opts),lists:member(sf_mnowait,Opts), lists:member(sf_sync,Opts)). @@ -432,11 +432,13 @@ sendfile(_,_,_,_,_,_,_,_,_,_) -> %%% sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) - when is_list(Headers) == false -> + when Headers == []; is_integer(Headers) -> case sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) of - {ok, BytesSent} when is_list(Trailers),is_integer(Headers) -> + {ok, BytesSent} when is_list(Trailers), + Trailers =/= [], + is_integer(Headers) -> sendfile_send(Sock, Trailers, BytesSent+Headers); - {ok, BytesSent} when is_list(Trailers) -> + {ok, BytesSent} when is_list(Trailers), Trailers =/= [] -> sendfile_send(Sock, Trailers, BytesSent); {ok, BytesSent} when is_integer(Headers) -> {ok, BytesSent + Headers}; diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index 876d266c4b..d66caad2d8 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -370,7 +370,7 @@ t_sendfile_hdtl(Config) -> end, SendHdTl = fun(Sock) -> - Headers = [<<"header1">>,"header2"], + Headers = [<<"header1">>,<<0:(1024*8)>>,"header2"], Trailers = [<<"trailer1">>,"trailer2"], D = Send(Sock,Headers,Trailers, iolist_size([Headers,Trailers])), -- cgit v1.2.3 From 836410f5c9e092be4b77b26b3fc9f7abde0c89de Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Sun, 27 Nov 2011 17:22:05 +0100 Subject: Move sendfile api to file module Since sendfile could in theory be used to send to any type of file descriptor in *nix, it is a better fit to have it in file. --- lib/kernel/src/file.erl | 142 ++++++++++++++++- lib/kernel/src/gen_tcp.erl | 146 +---------------- lib/kernel/test/Makefile | 3 +- lib/kernel/test/gen_tcp_api_SUITE.erl | 286 +-------------------------------- lib/kernel/test/sendfile_SUITE.erl | 291 ++++++++++++++++++++++++++++++++++ 5 files changed, 441 insertions(+), 427 deletions(-) create mode 100644 lib/kernel/test/sendfile_SUITE.erl diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index 706c60caaf..ef1d20b53b 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -51,6 +51,9 @@ -export([pid2name/1]). +%% Sendfile functions +-export([sendfile/2,sendfile/5]). + %%% Obsolete exported functions -export([raw_read_file_info/1, raw_write_file_info/2]). @@ -103,7 +106,10 @@ -type date_time() :: calendar:datetime(). -type posix_file_advise() :: 'normal' | 'sequential' | 'random' | 'no_reuse' | 'will_need' | 'dont_need'. - +-type sendfile_option() :: {chunk_size, non_neg_integer()} | + {headers, Hdrs :: list(iodata())} | + {trailers, Tlrs :: list(iodata())} | + sf_nodiskio | sf_mnowait | sf_sync. %%%----------------------------------------------------------------- %%% General functions @@ -1114,6 +1120,140 @@ change_time(Name, Atime, Mtime) when is_tuple(Atime), is_tuple(Mtime) -> write_file_info(Name, #file_info{atime=Atime, mtime=Mtime}). +%% +%% Send data using sendfile +%% + +-define(MAX_CHUNK_SIZE, (1 bsl 20)*20). %% 20 MB, has to fit in primary memory + +-spec sendfile(File, Sock, Offset, Bytes, Opts) -> + {'ok', non_neg_integer()} | {'error', inet:posix() | badarg | not_owner} when + File :: file:fd(), + Sock :: inet:socket(), + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(), + Opts :: [sendfile_option()]. +sendfile(File, _Sock, _Offet, _Bytes, _Opts) when is_pid(File) -> + {error, badarg}; +sendfile(File, Sock, Offset, Bytes, []) -> + sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, [], [], + false, false, false); +sendfile(File, Sock, Offset, Bytes, Opts) -> + ChunkSize0 = proplists:get_value(chunk_size, Opts, ?MAX_CHUNK_SIZE), + ChunkSize = if ChunkSize0 > ?MAX_CHUNK_SIZE -> + ?MAX_CHUNK_SIZE; + true -> ChunkSize0 + end, + Headers = proplists:get_value(headers, Opts, []), + Trailers = proplists:get_value(trailers, Opts, []), + sendfile(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + lists:member(sf_nodiskio,Opts),lists:member(sf_mnowait,Opts), + lists:member(sf_sync,Opts)). + +%% sendfile/2 +-spec sendfile(File, Sock) -> + {'ok', non_neg_integer()} | {'error', inet:posix() | badarg | not_owner} + when File :: file:name(), + Sock :: inet:socket(). +sendfile(File, Sock) -> + case file:open(File, [read, raw, binary]) of + {error, Reason} -> + {error, Reason}; + {ok, Fd} -> + Res = sendfile(Fd, Sock, 0, 0, []), + file:close(Fd), + Res + end. + +%% Internal sendfile functions +sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, + ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) + when is_port(Sock) -> + case Mod:sendfile(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, + Nodiskio, MNowait, Sync) of + {error, enotsup} -> + sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers); + Else -> + Else + end; +sendfile(_,_,_,_,_,_,_,_,_,_) -> + {error, badarg}. + +%%% +%% Sendfile Fallback +%%% +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, + Headers, Trailers) + when Headers == []; is_integer(Headers) -> + case sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) of + {ok, BytesSent} when is_list(Trailers), + Trailers =/= [], + is_integer(Headers) -> + sendfile_send(Sock, Trailers, BytesSent+Headers); + {ok, BytesSent} when is_list(Trailers), Trailers =/= [] -> + sendfile_send(Sock, Trailers, BytesSent); + {ok, BytesSent} when is_integer(Headers) -> + {ok, BytesSent + Headers}; + Else -> + Else + end; +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) -> + case sendfile_send(Sock, Headers, 0) of + {ok, BytesSent} -> + sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, BytesSent, + Trailers); + Else -> + Else + end. + + +sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) -> + {ok, CurrPos} = file:position(File, {cur, 0}), + {ok, _NewPos} = file:position(File, {bof, Offset}), + Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0), + file:position(File, {bof, CurrPos}), + Res. + + +sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) + when Bytes > BytesSent; Bytes == 0 -> + Size = if Bytes == 0 -> + ChunkSize; + (Bytes - BytesSent + ChunkSize) > 0 -> + Bytes - BytesSent; + true -> + ChunkSize + end, + case file:read(File, Size) of + {ok, Data} -> + case sendfile_send(Sock, Data, BytesSent) of + {ok,NewBytesSent} -> + sendfile_fallback_int( + File, Sock, Bytes, ChunkSize, + NewBytesSent); + Error -> + Error + end; + eof -> + {ok, BytesSent}; + Error -> + Error + end; +sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> + {ok, BytesSent}. + +sendfile_send(Sock, Data, Old) -> + Len = iolist_size(Data), + case gen_tcp:send(Sock, Data) of + ok -> + {ok, Len+Old}; + Else -> + Else + end. + + + %%%----------------------------------------------------------------- %%% Helpers diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 78e3ab3697..4d6c7f5f1d 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -22,7 +22,7 @@ -export([connect/3, connect/4, listen/2, accept/1, accept/2, shutdown/2, close/1]). --export([send/2, recv/2, recv/3, unrecv/2, sendfile/2, sendfile/5]). +-export([send/2, recv/2, recv/3, unrecv/2]). -export([controlling_process/2]). -export([fdopen/2]). @@ -106,13 +106,8 @@ {tcp_module, module()} | option(). -type socket() :: port(). --type sendfile_option() :: {chunk_size, non_neg_integer()} | - {headers, Hdrs :: list(iodata())} | - {trailers, Tlrs :: list(iodata())} | - sf_nodiskio | sf_mnowait | sf_sync. --export_type([option/0, option_name/0, connect_option/0, listen_option/0, - sendfile_option/0]). +-export_type([option/0, option_name/0, connect_option/0, listen_option/0]). %% %% Connect a socket @@ -308,52 +303,6 @@ unrecv(S, Data) when is_port(S) -> Mod:unrecv(S, Data); Error -> Error - end. - -%% -%% Send data using sendfile -%% - --define(MAX_CHUNK_SIZE, (1 bsl 20)*20). %% 20 MB, has to fit in primary memory - --spec sendfile(File, Sock, Offset, Bytes, Opts) -> - {'ok', non_neg_integer()} | {'error', inet:posix() } | - {'error', not_owner} when - File :: file:fd(), - Sock :: socket(), - Offset :: non_neg_integer(), - Bytes :: non_neg_integer(), - Opts :: [sendfile_option()]. -sendfile(File, _Sock, _Offet, _Bytes, _Opts) when is_pid(File) -> - {error, badarg}; -sendfile(File, Sock, Offset, Bytes, []) -> - sendfile(File, Sock, Offset, Bytes, ?MAX_CHUNK_SIZE, [], [], - false, false, false); -sendfile(File, Sock, Offset, Bytes, Opts) -> - ChunkSize0 = proplists:get_value(chunk_size, Opts, ?MAX_CHUNK_SIZE), - ChunkSize = if ChunkSize0 > ?MAX_CHUNK_SIZE -> - ?MAX_CHUNK_SIZE; - true -> ChunkSize0 - end, - Headers = proplists:get_value(headers, Opts, []), - Trailers = proplists:get_value(trailers, Opts, []), - sendfile(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, - lists:member(sf_nodiskio,Opts),lists:member(sf_mnowait,Opts), - lists:member(sf_sync,Opts)). - -%% sendfile/2 --spec sendfile(File, Sock) -> - {'ok', non_neg_integer()} | {'error', inet:posix() | badarg} - when File :: file:name(), - Sock :: socket(). -sendfile(File, Sock) -> - case file:open(File, [read, raw, binary]) of - {error, Reason} -> - {error, Reason}; - {ok, Fd} -> - Res = sendfile(Fd, Sock, 0, 0, []), - file:close(Fd), - Res end. %% @@ -407,94 +356,3 @@ mod([_|Opts], Address) -> mod([], Address) -> mod(Address). - -%% Internal sendfile functions -sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, - ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) - when is_port(Sock) -> - ok = inet:lock_socket(Sock,true), - {ok, SockFd} = prim_inet:getfd(Sock), - case Mod:sendfile(Fd, SockFd, Offset, Bytes, ChunkSize, Headers, Trailers, - Nodiskio, MNowait, Sync) of - {error, enotsup} -> - ok = inet:lock_socket(Sock,false), - sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, - Headers, Trailers); - Else -> - ok = inet:lock_socket(Sock,false), - Else - end; -sendfile(_,_,_,_,_,_,_,_,_,_) -> - {error, badarg}. - -%%% -%% Sendfile Fallback -%%% -sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, - Headers, Trailers) - when Headers == []; is_integer(Headers) -> - case sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) of - {ok, BytesSent} when is_list(Trailers), - Trailers =/= [], - is_integer(Headers) -> - sendfile_send(Sock, Trailers, BytesSent+Headers); - {ok, BytesSent} when is_list(Trailers), Trailers =/= [] -> - sendfile_send(Sock, Trailers, BytesSent); - {ok, BytesSent} when is_integer(Headers) -> - {ok, BytesSent + Headers}; - Else -> - Else - end; -sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) -> - case sendfile_send(Sock, Headers, 0) of - {ok, BytesSent} -> - sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, BytesSent, - Trailers); - Else -> - Else - end. - - -sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) -> - {ok, CurrPos} = file:position(File, {cur, 0}), - {ok, _NewPos} = file:position(File, {bof, Offset}), - Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0), - file:position(File, {bof, CurrPos}), - Res. - - -sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) - when Bytes > BytesSent; Bytes == 0 -> - Size = if Bytes == 0 -> - ChunkSize; - (Bytes - BytesSent + ChunkSize) > 0 -> - Bytes - BytesSent; - true -> - ChunkSize - end, - case file:read(File, Size) of - {ok, Data} -> - case sendfile_send(Sock, Data, BytesSent) of - {ok,NewBytesSent} -> - sendfile_fallback_int( - File, Sock, Bytes, ChunkSize, - NewBytesSent); - Error -> - Error - end; - eof -> - {ok, BytesSent}; - Error -> - Error - end; -sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> - {ok, BytesSent}. - -sendfile_send(Sock, Data, Old) -> - Len = iolist_size(Data), - case send(Sock, Data) of - ok -> - {ok, Len+Old}; - Else -> - Else - end. diff --git a/lib/kernel/test/Makefile b/lib/kernel/test/Makefile index 82bc3fc6d1..5dcaad3f5e 100644 --- a/lib/kernel/test/Makefile +++ b/lib/kernel/test/Makefile @@ -74,7 +74,8 @@ MODULES= \ wrap_log_reader_SUITE \ cleanup \ zlib_SUITE \ - loose_node + loose_node \ + sendfile_SUITE APP_FILES = \ appinc.app \ diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index d66caad2d8..a7af00c12a 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -24,7 +24,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("kernel/include/inet.hrl"). --include_lib("kernel/include/file.hrl"). -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2, @@ -33,42 +32,21 @@ t_connect_bad/1, t_recv_timeout/1, t_recv_eof/1, t_shutdown_write/1, t_shutdown_both/1, t_shutdown_error/1, - t_fdopen/1, t_implicit_inet6/1, - t_sendfile_small/1, t_sendfile_big/1, - t_sendfile_hdtl/1, t_sendfile_partial/1, - t_sendfile_offset/1, t_sendfile_sendafter/1, - t_sendfile_recvafter/1, t_sendfile_sendduring/1, - t_sendfile_recvduring/1]). - --export([sendfile_server/2]). + t_fdopen/1, t_implicit_inet6/1]). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> [{group, t_accept}, {group, t_connect}, {group, t_recv}, t_shutdown_write, t_shutdown_both, t_shutdown_error, - t_fdopen, t_implicit_inet6,{group,t_sendfile}]. + t_fdopen, t_implicit_inet6]. groups() -> [{t_accept, [], [t_accept_timeout]}, {t_connect, [], [t_connect_timeout, t_connect_bad]}, - {t_recv, [], [t_recv_timeout, t_recv_eof]}, - {t_sendfile, [], [{group, sendfile_all()}]}]. - -sendfile_all() -> - [ - t_sendfile_small - ,t_sendfile_big -% ,t_sendfile_hdtl - ,t_sendfile_partial - ,t_sendfile_offset - ,t_sendfile_sendafter - ,t_sendfile_recvafter - ,t_sendfile_sendduring - ,t_sendfile_recvduring - ]. -% [t_sendfile_big]. -% [t_sendfile_small]. + {t_recv, [], [t_recv_timeout, t_recv_eof]}]. + + init_per_suite(Config) -> Config. @@ -76,31 +54,12 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -init_per_group(t_sendfile, Config) -> - Priv = ?config(priv_dir, Config), - SFilename = filename:join(Priv, "sendfile_small.html"), - {ok, DS} = file:open(SFilename,[write,raw]), - file:write(DS,"yo baby yo"), - file:sync(DS), - file:close(DS), - BFilename = filename:join(Priv, "sendfile_big.html"), - {ok, DB} = file:open(BFilename,[write,raw]), - [file:write(DB,[<<0:(10*8*1024*1024)>>]) || _I <- lists:seq(1,51)], - file:sync(DB), - file:close(DB), - [{small_file, SFilename}, - {file_opts,[raw,binary]}, - {big_file, BFilename}|Config]; init_per_group(_GroupName, Config) -> Config. -end_per_group(t_sendfile, Config) -> - file:delete(proplists:get_value(big_file, Config)); end_per_group(_,_Config) -> ok. - - init_per_testcase(_Func, Config) -> Dog = test_server:timetrap(test_server:seconds(60)), [{watchdog, Dog}|Config]. @@ -278,241 +237,6 @@ implicit_inet6(S, Addr) -> ?line ok = gen_tcp:close(S2), ?line ok = gen_tcp:close(S1). -t_sendfile_small(Config) when is_list(Config) -> - Filename = proplists:get_value(small_file, Config), - - Send = fun(Sock) -> - {Size, Data} = sendfile_file_info(Filename), - {ok, Size} = gen_tcp:sendfile(Filename, Sock), - Data - end, - - ok = sendfile_send(Send). - -t_sendfile_big(Config) when is_list(Config) -> - Filename = proplists:get_value(big_file, Config), - - Send = fun(Sock) -> - {ok, #file_info{size = Size}} = - file:read_file_info(Filename), - {ok, Size} = gen_tcp:sendfile(Filename, Sock), - Size - end, - - ok = sendfile_send("localhost", Send, 0). - -t_sendfile_partial(Config) -> - Filename = proplists:get_value(small_file, Config), - FileOpts = proplists:get_value(file_opts, Config, []), - - SendSingle = fun(Sock) -> - {_Size, <>} = - sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read|FileOpts]), - {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]), - file:close(D), - Data - end, - ok = sendfile_send(SendSingle), - - {_Size, <>} = - sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read|FileOpts]), - {ok, <>} = file:read(D,5), - FSend = fun(Sock) -> - {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]), - FData - end, - - ok = sendfile_send(FSend), - - SSend = fun(Sock) -> - {ok,3} = gen_tcp:sendfile(D,Sock,5,3,[]), - SData - end, - - ok = sendfile_send(SSend), - - {ok, <>} = file:read(D,3), - - file:close(D). - -t_sendfile_offset(Config) -> - Filename = proplists:get_value(small_file, Config), - FileOpts = proplists:get_value(file_opts, Config, []), - - Send = fun(Sock) -> - {_Size, <<_:5/binary,Data:3/binary,_/binary>> = AllData} = - sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read|FileOpts]), - {ok,3} = gen_tcp:sendfile(D,Sock,5,3,[]), - {ok, AllData} = file:read(D,100), - file:close(D), - Data - end, - ok = sendfile_send(Send). - - -t_sendfile_hdtl(Config) -> - Filename = proplists:get_value(small_file, Config), - FileOpts = proplists:get_value(file_opts, Config, []), - - Send = fun(Sock, Headers, Trailers, HdtlSize) -> - {Size, Data} = sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read|FileOpts]), - AllSize = Size+HdtlSize, - {ok, AllSize} = gen_tcp:sendfile( - D, Sock,0,0, - [{headers,Headers}, - {trailers,Trailers}]), - file:close(D), - Data - end, - - SendHdTl = fun(Sock) -> - Headers = [<<"header1">>,<<0:(1024*8)>>,"header2"], - Trailers = [<<"trailer1">>,"trailer2"], - D = Send(Sock,Headers,Trailers, - iolist_size([Headers,Trailers])), - iolist_to_binary([Headers,D,Trailers]) - end, - ok = sendfile_send(SendHdTl), - - SendHd = fun(Sock) -> - Headers = [<<"header1">>,"header2"], - D = Send(Sock,Headers,undefined, - iolist_size([Headers])), - iolist_to_binary([Headers,D]) - end, - ok = sendfile_send(SendHd), - - SendTl = fun(Sock) -> - Trailers = [<<"trailer1">>,"trailer2"], - D = Send(Sock,undefined,Trailers, - iolist_size([Trailers])), - iolist_to_binary([D,Trailers]) - end, - ok = sendfile_send(SendTl). - -t_sendfile_sendafter(Config) -> - Filename = proplists:get_value(small_file, Config), - - Send = fun(Sock) -> - {Size, Data} = sendfile_file_info(Filename), - {ok, Size} = gen_tcp:sendfile(Filename, Sock), - ok = gen_tcp:send(Sock, <<2>>), - <> - end, - - ok = sendfile_send(Send). - -t_sendfile_recvafter(Config) -> - Filename = proplists:get_value(small_file, Config), - - Send = fun(Sock) -> - {Size, Data} = sendfile_file_info(Filename), - {ok, Size} = gen_tcp:sendfile(Filename, Sock), - ok = gen_tcp:send(Sock, <<1>>), - {ok,<<1>>} = gen_tcp:recv(Sock, 1), - <> - end, - - ok = sendfile_send(Send). - -t_sendfile_sendduring(Config) -> - Filename = proplists:get_value(big_file, Config), - - Send = fun(Sock) -> - {ok, #file_info{size = Size}} = - file:read_file_info(Filename), - spawn_link(fun() -> - timer:sleep(10), - ok = gen_tcp:send(Sock, <<2>>) - end), - {ok, Size} = gen_tcp:sendfile(Filename, Sock), - Size+1 - end, - - ok = sendfile_send("localhost", Send, 0). - -t_sendfile_recvduring(Config) -> - Filename = proplists:get_value(big_file, Config), - - Send = fun(Sock) -> - {ok, #file_info{size = Size}} = - file:read_file_info(Filename), - spawn_link(fun() -> - timer:sleep(10), - ok = gen_tcp:send(Sock, <<1>>), - {ok,<<1>>} = gen_tcp:recv(Sock, 1) - end), - {ok, Size} = gen_tcp:sendfile(Filename, Sock), - timer:sleep(1000), - Size+1 - end, - - ok = sendfile_send("localhost", Send, 0). - -%% TODO: consolidate tests and reduce code -sendfile_send(Send) -> - sendfile_send("localhost",Send). -sendfile_send(Host, Send) -> - sendfile_send(Host, Send, []). -sendfile_send(Host, Send, Orig) -> - spawn_link(?MODULE, sendfile_server, [self(), Orig]), - receive - {server, Port} -> - {ok, Sock} = gen_tcp:connect(Host, Port, - [binary,{packet,0}, - {active,false}]), - Data = Send(Sock), - ok = gen_tcp:close(Sock), - receive - {ok, Bin} -> - Data = Bin, - ok - end - end. - -sendfile_server(ClientPid, Orig) -> - {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, - {active, true}, - {reuseaddr, true}]), - {ok, Port} = inet:port(LSock), - ClientPid ! {server, Port}, - {ok, Sock} = gen_tcp:accept(LSock), - {ok, Bin} = sendfile_do_recv(Sock, Orig), - ClientPid ! {ok, Bin}, - gen_tcp:send(Sock, <<1>>). - --define(SENDFILE_TIMEOUT, 10000). -%% f(),{ok, S} = gen_tcp:connect("localhost",7890,[binary]),gen_tcp:sendfile("/ldisk/lukas/otp/sendfiletest.dat",S). -sendfile_do_recv(Sock, Bs) -> - receive - {tcp, Sock, B} -> - case binary:match(B,<<1>>) of - nomatch when is_list(Bs) -> - sendfile_do_recv(Sock, [B|Bs]); - nomatch when is_integer(Bs) -> - sendfile_do_recv(Sock, byte_size(B) + Bs); - _ when is_list(Bs) -> - {ok, iolist_to_binary(lists:reverse([B|Bs]))}; - _ when is_integer(Bs) -> - {ok, byte_size(B) + Bs} - end; - {tcp_closed, Sock} when is_list(Bs) -> - {ok, iolist_to_binary(lists:reverse(Bs))}; - {tcp_closed, Sock} when is_integer(Bs) -> - {ok, Bs} - after ?SENDFILE_TIMEOUT -> - timeout - end. - -sendfile_file_info(File) -> - {ok, #file_info{size = Size}} = file:read_file_info(File), - {ok, Data} = file:read_file(File), - {Size, Data}. - %%% Utilities diff --git a/lib/kernel/test/sendfile_SUITE.erl b/lib/kernel/test/sendfile_SUITE.erl new file mode 100644 index 0000000000..cddb783fe7 --- /dev/null +++ b/lib/kernel/test/sendfile_SUITE.erl @@ -0,0 +1,291 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2011-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +-module(sendfile_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). + +-compile(export_all). + +all() -> + [t_sendfile_small + ,t_sendfile_big +% ,t_sendfile_hdtl + ,t_sendfile_partial + ,t_sendfile_offset + ,t_sendfile_sendafter + ,t_sendfile_recvafter + ,t_sendfile_sendduring + ,t_sendfile_recvduring + ]. + +init_per_suite(Config) -> + Priv = ?config(priv_dir, Config), + SFilename = filename:join(Priv, "sendfile_small.html"), + {ok, DS} = file:open(SFilename,[write,raw]), + file:write(DS,"yo baby yo"), + file:sync(DS), + file:close(DS), + BFilename = filename:join(Priv, "sendfile_big.html"), + {ok, DB} = file:open(BFilename,[write,raw]), + [file:write(DB,[<<0:(10*8*1024*1024)>>]) || _I <- lists:seq(1,51)], + file:sync(DB), + file:close(DB), + [{small_file, SFilename}, + {file_opts,[raw,binary]}, + {big_file, BFilename}|Config]. + +end_per_suite(Config) -> + file:delete(proplists:get_value(big_file, Config)). + +t_sendfile_small(Config) when is_list(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = file:sendfile(Filename, Sock), + Data + end, + + ok = sendfile_send(Send). + +t_sendfile_big(Config) when is_list(Config) -> + Filename = proplists:get_value(big_file, Config), + + Send = fun(Sock) -> + {ok, #file_info{size = Size}} = + file:read_file_info(Filename), + {ok, Size} = file:sendfile(Filename, Sock), + Size + end, + + ok = sendfile_send("localhost", Send, 0). + +t_sendfile_partial(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + SendSingle = fun(Sock) -> + {_Size, <>} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + {ok,5} = file:sendfile(D,Sock,0,5,[]), + file:close(D), + Data + end, + ok = sendfile_send(SendSingle), + + {_Size, <>} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + {ok, <>} = file:read(D,5), + FSend = fun(Sock) -> + {ok,5} = file:sendfile(D,Sock,0,5,[]), + FData + end, + + ok = sendfile_send(FSend), + + SSend = fun(Sock) -> + {ok,3} = file:sendfile(D,Sock,5,3,[]), + SData + end, + + ok = sendfile_send(SSend), + + {ok, <>} = file:read(D,3), + + file:close(D). + +t_sendfile_offset(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + Send = fun(Sock) -> + {_Size, <<_:5/binary,Data:3/binary,_/binary>> = AllData} = + sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + {ok,3} = file:sendfile(D,Sock,5,3,[]), + {ok, AllData} = file:read(D,100), + file:close(D), + Data + end, + ok = sendfile_send(Send). + + +t_sendfile_hdtl(Config) -> + Filename = proplists:get_value(small_file, Config), + FileOpts = proplists:get_value(file_opts, Config, []), + + Send = fun(Sock, Headers, Trailers, HdtlSize) -> + {Size, Data} = sendfile_file_info(Filename), + {ok,D} = file:open(Filename,[read|FileOpts]), + AllSize = Size+HdtlSize, + {ok, AllSize} = file:sendfile( + D, Sock,0,0, + [{headers,Headers}, + {trailers,Trailers}]), + file:close(D), + Data + end, + + SendHdTl = fun(Sock) -> + Headers = [<<"header1">>,<<0:(1024*8)>>,"header2"], + Trailers = [<<"trailer1">>,"trailer2"], + D = Send(Sock,Headers,Trailers, + iolist_size([Headers,Trailers])), + iolist_to_binary([Headers,D,Trailers]) + end, + ok = sendfile_send(SendHdTl), + + SendHd = fun(Sock) -> + Headers = [<<"header1">>,"header2"], + D = Send(Sock,Headers,undefined, + iolist_size([Headers])), + iolist_to_binary([Headers,D]) + end, + ok = sendfile_send(SendHd), + + SendTl = fun(Sock) -> + Trailers = [<<"trailer1">>,"trailer2"], + D = Send(Sock,undefined,Trailers, + iolist_size([Trailers])), + iolist_to_binary([D,Trailers]) + end, + ok = sendfile_send(SendTl). + +t_sendfile_sendafter(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = file:sendfile(Filename, Sock), + ok = gen_tcp:send(Sock, <<2>>), + <> + end, + + ok = sendfile_send(Send). + +t_sendfile_recvafter(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = file:sendfile(Filename, Sock), + ok = gen_tcp:send(Sock, <<1>>), + {ok,<<1>>} = gen_tcp:recv(Sock, 1), + <> + end, + + ok = sendfile_send(Send). + +t_sendfile_sendduring(Config) -> + Filename = proplists:get_value(big_file, Config), + + Send = fun(Sock) -> + {ok, #file_info{size = Size}} = + file:read_file_info(Filename), + spawn_link(fun() -> + timer:sleep(10), + ok = gen_tcp:send(Sock, <<2>>) + end), + {ok, Size} = file:sendfile(Filename, Sock), + Size+1 + end, + + ok = sendfile_send("localhost", Send, 0). + +t_sendfile_recvduring(Config) -> + Filename = proplists:get_value(big_file, Config), + + Send = fun(Sock) -> + {ok, #file_info{size = Size}} = + file:read_file_info(Filename), + spawn_link(fun() -> + timer:sleep(10), + ok = gen_tcp:send(Sock, <<1>>), + {ok,<<1>>} = gen_tcp:recv(Sock, 1) + end), + {ok, Size} = file:sendfile(Filename, Sock), + timer:sleep(1000), + Size+1 + end, + + ok = sendfile_send("localhost", Send, 0). + +%% TODO: consolidate tests and reduce code +sendfile_send(Send) -> + sendfile_send("localhost",Send). +sendfile_send(Host, Send) -> + sendfile_send(Host, Send, []). +sendfile_send(Host, Send, Orig) -> + spawn_link(?MODULE, sendfile_server, [self(), Orig]), + receive + {server, Port} -> + {ok, Sock} = gen_tcp:connect(Host, Port, + [binary,{packet,0}, + {active,false}]), + Data = Send(Sock), + ok = gen_tcp:close(Sock), + receive + {ok, Bin} -> + Data = Bin, + ok + end + end. + +sendfile_server(ClientPid, Orig) -> + {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 0}, + {active, true}, + {reuseaddr, true}]), + {ok, Port} = inet:port(LSock), + ClientPid ! {server, Port}, + {ok, Sock} = gen_tcp:accept(LSock), + {ok, Bin} = sendfile_do_recv(Sock, Orig), + ClientPid ! {ok, Bin}, + gen_tcp:send(Sock, <<1>>). + +-define(SENDFILE_TIMEOUT, 10000). +%% f(),{ok, S} = gen_tcp:connect("localhost",7890,[binary]),file:sendfile("/ldisk/lukas/otp/sendfiletest.dat",S). +sendfile_do_recv(Sock, Bs) -> + receive + {tcp, Sock, B} -> + case binary:match(B,<<1>>) of + nomatch when is_list(Bs) -> + sendfile_do_recv(Sock, [B|Bs]); + nomatch when is_integer(Bs) -> + sendfile_do_recv(Sock, byte_size(B) + Bs); + _ when is_list(Bs) -> + {ok, iolist_to_binary(lists:reverse([B|Bs]))}; + _ when is_integer(Bs) -> + {ok, byte_size(B) + Bs} + end; + {tcp_closed, Sock} when is_list(Bs) -> + {ok, iolist_to_binary(lists:reverse(Bs))}; + {tcp_closed, Sock} when is_integer(Bs) -> + {ok, Bs} + after ?SENDFILE_TIMEOUT -> + timeout + end. + +sendfile_file_info(File) -> + {ok, #file_info{size = Size}} = file:read_file_info(File), + {ok, Data} = file:read_file(File), + {Size, Data}. -- cgit v1.2.3 From 4d198cb07025a8fc341a1e7fd7f9906b5fb714d6 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Sun, 27 Nov 2011 17:24:04 +0100 Subject: Only allow tcp sockets as target for sendfile --- erts/preloaded/src/prim_file.erl | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index fb19521382..6f35162feb 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -545,17 +545,24 @@ write_file(_, _) -> %sendfile(_,_,_,_,_,_,_,_,_,_) -> % {error, enotsup}; sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, - DestFD, Offset, Bytes, ChunkSize, Headers, Trailers, - Nodiskio, MNowait, Sync) -> - drv_command(Port, [<>, - Headers,Trailers]). + Dest, Offset, Bytes, _ChunkSize, _Headers, _Trailers, + _Nodiskio, _MNowait, _Sync) -> + case erlang:port_get_data(Dest) of + Data when Data == inet_tcp; Data == inet6_tcp -> + ok = inet:lock_socket(Dest,true), + {ok, DestFD} = prim_inet:getfd(Dest), + try drv_command(Port, [<>]) + after + ok = inet:lock_socket(Dest,false) + end; + _Else -> + {error,badarg} + end. get_bit(true) -> 1; -- cgit v1.2.3 From 758fab8895755b22ce02f0a6026d4d286c8a9c5a Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Sun, 27 Nov 2011 17:30:06 +0100 Subject: Expand sendfile documentation --- lib/kernel/doc/src/file.xml | 66 ++++++++++++++++++++++++++++++++++++++------- lib/kernel/src/file.erl | 18 ++++++------- 2 files changed, 66 insertions(+), 18 deletions(-) diff --git a/lib/kernel/doc/src/file.xml b/lib/kernel/doc/src/file.xml index 78bf0aff45..7fcc354176 100644 --- a/lib/kernel/doc/src/file.xml +++ b/lib/kernel/doc/src/file.xml @@ -149,6 +149,9 @@ + + + @@ -1574,25 +1577,70 @@ - + send a file to a socket -

Sends Bytes in from the file - referenced by IoDevice beginning at Offset to - Socket. +

Sends the file Filename to Socket. Returns {ok, BytesSent} if successful, otherwise {error, Reason}.

-

Available on Linux, FreeBSD, DragonflyBSD, Solaris, Darwin and Windows

- + send a file to a socket -

Sends the file Filename to Socket. +

Sends Bytes from the file + referenced by RawFile beginning at Offset to + Socket. Returns {ok, BytesSent} if successful, - otherwise {error, Reason}.

-

Available on Linux, FreeBSD, DragonflyBSD, Solaris, Darwin and Windows

+ otherwise {error, Reason}. If Bytes is set to + 0 all data after the given Offset is sent.

+

The file used must be opened using the raw flag, and the process + calling sendfile must be the controlling process of the socket. + See gen_tcp:controlling_process/2

+

If the OS used does not support sendfile, an Erlang fallback + using file:read and gen_tcp:send is used.

+

The option list can contain the following options: + + headers + A list containing data which is to be sent before + the file is sent. If headers is used, Bytes specifies + the total bytes in the header and/or file to be sent. If + Bytes is set to 0, the all headers, the file and + possible trailers will be sent. + trailers + A list containing data which is to be after before + the file is sent. All the trailers will be sent after the + file is sent, no matter what Bytes is set to. + chunk_size + The chunk size used by the erlang fallback to send + data. If using the fallback, this should be set to a value + which comfortably fits in the systems memory. Default is 20 MB. + sf_nodiskio + This flag causes any sendfile() call which would + block on disk I/O to instead return EBUSY. Busy servers may bene- + fit by transferring requests that would block to a separate I/O + worker thread. + sf_mnowait + Do not wait for some kernel resource to become avail- + able, in particular, mbuf and sf_buf. The flag does not make the + sendfile() syscall truly non-blocking, since other resources are + still allocated in a blocking fashion. + sf_sync + sendfile sleeps until the network stack no longer refer- + ences the VM pages of the file, making subsequent modifications to + it safe. Please note that this is not a guarantee that the data + has actually been sent. + +

+

On operating systems with thread support, it is recommended to use + async threads. See the command line flag + +A in erl(1). If it is not + possible to use async threads for sendfile, it is recommended to use + a relatively small value for the send buffer on the socket. Otherwise + the Erlang VM might loose some of its soft realtime guarantees. + Which size to use depends on the OS/hardware and the requirements + of the application.

diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index ef1d20b53b..19eaa7bfcc 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -106,7 +106,7 @@ -type date_time() :: calendar:datetime(). -type posix_file_advise() :: 'normal' | 'sequential' | 'random' | 'no_reuse' | 'will_need' | 'dont_need'. --type sendfile_option() :: {chunk_size, non_neg_integer()} | +-type sendfile_option() :: {chunk_size, pos_integer()} | {headers, Hdrs :: list(iodata())} | {trailers, Tlrs :: list(iodata())} | sf_nodiskio | sf_mnowait | sf_sync. @@ -1126,10 +1126,10 @@ change_time(Name, Atime, Mtime) -define(MAX_CHUNK_SIZE, (1 bsl 20)*20). %% 20 MB, has to fit in primary memory --spec sendfile(File, Sock, Offset, Bytes, Opts) -> +-spec sendfile(RawFile, Socket, Offset, Bytes, Opts) -> {'ok', non_neg_integer()} | {'error', inet:posix() | badarg | not_owner} when - File :: file:fd(), - Sock :: inet:socket(), + RawFile :: file:fd(), + Socket :: inet:socket(), Offset :: non_neg_integer(), Bytes :: non_neg_integer(), Opts :: [sendfile_option()]. @@ -1151,12 +1151,12 @@ sendfile(File, Sock, Offset, Bytes, Opts) -> lists:member(sf_sync,Opts)). %% sendfile/2 --spec sendfile(File, Sock) -> +-spec sendfile(Filename, Socket) -> {'ok', non_neg_integer()} | {'error', inet:posix() | badarg | not_owner} - when File :: file:name(), - Sock :: inet:socket(). -sendfile(File, Sock) -> - case file:open(File, [read, raw, binary]) of + when Filename :: file:name(), + Socket :: inet:socket(). +sendfile(Filename, Sock) -> + case file:open(Filename, [read, raw, binary]) of {error, Reason} -> {error, Reason}; {ok, Fd} -> -- cgit v1.2.3 From 27faa34693f35b6aa41fa67cbfe365bd082a5757 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Sun, 27 Nov 2011 17:33:19 +0100 Subject: Remove windows implementation --- erts/emulator/drivers/win32/win_efile.c | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/erts/emulator/drivers/win32/win_efile.c b/erts/emulator/drivers/win32/win_efile.c index 0f41a09bf6..931bb196f1 100644 --- a/erts/emulator/drivers/win32/win_efile.c +++ b/erts/emulator/drivers/win32/win_efile.c @@ -1581,27 +1581,3 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, errno = ERROR_SUCCESS; return check_error(0, errInfo); } - -int -efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *count) -{ - /* TODO: write proper Windows TransmitFile based implementation */ - /* use overlapped I/O and driver_select on the structure? */ - /* int res = efile_seek(errInfo, in_fd, *offset, EFILE_SEEK_SET, NULL); */ - /* if (res) { */ - /* /\* TODO: could in_fd be shared and require protecting/locking */ - /* efile_seek/SetFilePointerEx? *\/ */ - /* if (TransmitFile((SOCKET) out_fd, (HANDLE) in_fd, *count, */ - /* 0, NULL, NULL, 0)) { */ - /* return check_error(0, errInfo); */ - /* } else { */ - /* /\* TODO: correct error handling? *\/ */ - /* return set_error(errInfo); */ - /* } */ - /* } else { */ - /* return res; */ - /* } */ - errno = ENOTSUP; - return check_error(-1, errInfo); -} -- cgit v1.2.3 From 1bbf8cee44b8836d66d289cc0b5b314ed83de821 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Tue, 29 Nov 2011 11:05:37 +0100 Subject: Remove header/trailer support Since the API for headers/trailers seem to be very awkward to work with when using non-blocking io the feature is dropped for now. See unix_efile.c for more details. --- erts/emulator/drivers/common/efile_drv.c | 72 ++++---------------------------- erts/emulator/drivers/common/erl_efile.h | 2 +- erts/emulator/drivers/unix/unix_efile.c | 14 ++++++- erts/preloaded/src/prim_file.erl | 13 ++---- lib/kernel/doc/src/file.xml | 25 ----------- lib/kernel/src/file.erl | 15 +++---- lib/kernel/test/sendfile_SUITE.erl | 42 ------------------- 7 files changed, 32 insertions(+), 151 deletions(-) diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 7eaafd5af1..5c52b99348 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -431,8 +431,6 @@ struct t_data off_t offset; Uint64 nbytes; Uint64 written; - short flags; - struct t_sendfile_hdtl *hdtl; } sendfile; #endif /* HAVE_SENDFILE */ } c; @@ -521,9 +519,6 @@ static void *ef_safe_realloc(void *op, Uint s) !0) \ : 0) -/* int EV_GET_SYSIOVEC(ErlIoVec *ev, Uint32, int *cnt, SysIOVec **target, int *pp, int *qp) */ -#define EV_GET_SYSIOVEC ev_get_sysiovec - #if 0 @@ -935,43 +930,6 @@ static int reply_eof(file_descriptor *desc) { driver_output2(desc->port, &c, 1, NULL, 0); return 0; } - -static int ev_get_sysiovec(ErlIOVec *ev, Uint32 len, int *cnt, SysIOVec **target, int *pp, int *qp) { - int tmp_p = *pp, tmp_q = *qp, tmp_len = len, i; - SysIOVec *tmp_target; - while (tmp_len != 0) { - if (tmp_len + tmp_p > ev->iov[tmp_q].iov_len) { - - tmp_len -= ev->iov[tmp_q].iov_len - tmp_p; - tmp_q++; - tmp_p = 0; - if (tmp_q == ev->vsize) - return 0; - } else break; - } - *cnt = tmp_q - *qp + 1; - tmp_target = EF_SAFE_ALLOC(sizeof(SysIOVec)* (*cnt)); - *target = tmp_target; - for (i = 0; i < *cnt; i++) { - tmp_target[i].iov_base = ev->iov[*qp].iov_base+*pp; - if (len + *pp <= ev->iov[*qp].iov_len) { - tmp_target[i].iov_len = len; - if (len + *pp == ev->iov[*qp].iov_len) { - *pp = 0; - (*qp)++; - } else - *pp += len; - } else { - tmp_target[i].iov_len = ev->iov[*qp].iov_len - *pp; - len -= ev->iov[*qp].iov_len - *pp; - *pp = 0; - (*qp)++; - } - } - return 1; -} - - static void invoke_name(void *data, int (*f)(Efile_error *, char *)) { @@ -1781,7 +1739,7 @@ static void invoke_sendfile(void *data) int result = 0; d->again = 0; - result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes, &d->c.sendfile.hdtl); + result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes, NULL); d->c.sendfile.written += nbytes; @@ -3426,14 +3384,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { Uint64 nbytes; char flags; - /* DestFD:32, Offset:64, Bytes:64, - ChunkSize:64, - (get_bit(Nodiskio)):1, - (get_bit(MNowait)):1, - (get_bit(Sync)):1,0:5, - (encode_hdtl(Headers))/binary, - (encode_hdtl(Trailers))/binary */ - if (ev->size < 1 + 1 + 5 * sizeof(Uint32) + sizeof(char) + if (ev->size < 1 + 7 * sizeof(Uint32) + sizeof(char) || !EV_GET_UINT32(ev, &out_fd, &p, &q) || !EV_GET_CHAR(ev, &flags, &p, &q) || !EV_GET_UINT32(ev, &offsetH, &p, &q) @@ -3446,6 +3397,12 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } + if (hd_len != 0 || tl_len != 0 || flags != 0) { + // We do not allow header, trailers and/or flags right now + reply_posix_error(desc, EINVAL); + goto done; + } + d = EF_SAFE_ALLOC(sizeof(struct t_data)); d->fd = desc->fd; d->command = command; @@ -3454,7 +3411,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->level = 2; d->c.sendfile.out_fd = (int) out_fd; - d->c.sendfile.flags = (int) flags; d->c.sendfile.written = 0; #if SIZEOF_OFF_T == 4 @@ -3468,18 +3424,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { #endif d->c.sendfile.nbytes = nbytes; - if (hd_len == 0 && tl_len == 0) - d->c.sendfile.hdtl = NULL; - else { - d->c.sendfile.hdtl = EF_SAFE_ALLOC(sizeof(struct t_sendfile_hdtl)); - if (!EV_GET_SYSIOVEC(ev, hd_len, &d->c.sendfile.hdtl->hdr_cnt, &d->c.sendfile.hdtl->headers, &p, &q) - || !EV_GET_SYSIOVEC(ev, tl_len, &d->c.sendfile.hdtl->trl_cnt, &d->c.sendfile.hdtl->trailers, &p, &q)) { - EF_FREE(d->c.sendfile.hdtl); - EF_FREE(d); - reply_posix_error(desc, EINVAL); - goto done; - } - } if (sys_info.async_threads != 0) { SET_BLOCKING(d->c.sendfile.out_fd); diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index b73fb35120..349ab0e17b 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -177,5 +177,5 @@ int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl **hdtl); + off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl *hdtl); #endif /* HAVE_SENDFILE */ diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 138c550fdd..72911641d3 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1471,9 +1471,21 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, #ifdef HAVE_SENDFILE #define SENDFILE_CHUNK_SIZE ((1 << 30) -1) +/* + * sendfile: The implementation of the sendfile system call varies + * a lot on different *nix platforms so to make the api similar in all + * we have to emulate some things in linux and play with variables on + * bsd/darwin. + * + * It could be possible to implement header/trailer in sendfile, though + * you would have to emulate it in linux and on BSD/Darwin some complex + * calculations have to be made when using a non blocking socket to figure + * out how much of the header/file/trailer was sent in each command. + */ + int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl** hdtl) + off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl* hdtl) { Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index 6f35162feb..7316e0be99 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -545,7 +545,7 @@ write_file(_, _) -> %sendfile(_,_,_,_,_,_,_,_,_,_) -> % {error, enotsup}; sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, - Dest, Offset, Bytes, _ChunkSize, _Headers, _Trailers, + Dest, Offset, Bytes, _ChunkSize, Headers, Trailers, _Nodiskio, _MNowait, _Sync) -> case erlang:port_get_data(Dest) of Data when Data == inet_tcp; Data == inet6_tcp -> @@ -555,8 +555,9 @@ sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, 0:8, Offset:64/unsigned, Bytes:64/unsigned, - 0:32/unsigned, - 0:32/unsigned>>]) + (iolist_size(Headers)):32/unsigned, + (iolist_size(Trailers)):32/unsigned>>, + Headers,Trailers]) after ok = inet:lock_socket(Dest,false) end; @@ -564,12 +565,6 @@ sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, {error,badarg} end. -get_bit(true) -> - 1; -get_bit(false) -> - 0. - - %%%----------------------------------------------------------------- %%% Functions operating on files without handle to the file. ?DRV. diff --git a/lib/kernel/doc/src/file.xml b/lib/kernel/doc/src/file.xml index 7fcc354176..c6a1f25dd9 100644 --- a/lib/kernel/doc/src/file.xml +++ b/lib/kernel/doc/src/file.xml @@ -1602,35 +1602,10 @@ using file:read and gen_tcp:send is used.

The option list can contain the following options: - headers - A list containing data which is to be sent before - the file is sent. If headers is used, Bytes specifies - the total bytes in the header and/or file to be sent. If - Bytes is set to 0, the all headers, the file and - possible trailers will be sent. - trailers - A list containing data which is to be after before - the file is sent. All the trailers will be sent after the - file is sent, no matter what Bytes is set to. chunk_size The chunk size used by the erlang fallback to send data. If using the fallback, this should be set to a value which comfortably fits in the systems memory. Default is 20 MB. - sf_nodiskio - This flag causes any sendfile() call which would - block on disk I/O to instead return EBUSY. Busy servers may bene- - fit by transferring requests that would block to a separate I/O - worker thread. - sf_mnowait - Do not wait for some kernel resource to become avail- - able, in particular, mbuf and sf_buf. The flag does not make the - sendfile() syscall truly non-blocking, since other resources are - still allocated in a blocking fashion. - sf_sync - sendfile sleeps until the network stack no longer refer- - ences the VM pages of the file, making subsequent modifications to - it safe. Please note that this is not a guarantee that the data - has actually been sent.

On operating systems with thread support, it is recommended to use diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index 19eaa7bfcc..0b0f91d86a 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -106,10 +106,7 @@ -type date_time() :: calendar:datetime(). -type posix_file_advise() :: 'normal' | 'sequential' | 'random' | 'no_reuse' | 'will_need' | 'dont_need'. --type sendfile_option() :: {chunk_size, pos_integer()} | - {headers, Hdrs :: list(iodata())} | - {trailers, Tlrs :: list(iodata())} | - sf_nodiskio | sf_mnowait | sf_sync. +-type sendfile_option() :: {chunk_size, non_neg_integer()}. %%%----------------------------------------------------------------- %%% General functions @@ -1144,11 +1141,11 @@ sendfile(File, Sock, Offset, Bytes, Opts) -> ?MAX_CHUNK_SIZE; true -> ChunkSize0 end, - Headers = proplists:get_value(headers, Opts, []), - Trailers = proplists:get_value(trailers, Opts, []), - sendfile(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, - lists:member(sf_nodiskio,Opts),lists:member(sf_mnowait,Opts), - lists:member(sf_sync,Opts)). + %% Support for headers, trailers and options has been removed because the + %% Darwin and BSD API for using it does not play nice with + %% non-blocking sockets. See unix_efile.c for more info. + sendfile(File, Sock, Offset, Bytes, ChunkSize, [], [], + false,false,false). %% sendfile/2 -spec sendfile(Filename, Socket) -> diff --git a/lib/kernel/test/sendfile_SUITE.erl b/lib/kernel/test/sendfile_SUITE.erl index cddb783fe7..5312508918 100644 --- a/lib/kernel/test/sendfile_SUITE.erl +++ b/lib/kernel/test/sendfile_SUITE.erl @@ -27,7 +27,6 @@ all() -> [t_sendfile_small ,t_sendfile_big -% ,t_sendfile_hdtl ,t_sendfile_partial ,t_sendfile_offset ,t_sendfile_sendafter @@ -130,47 +129,6 @@ t_sendfile_offset(Config) -> ok = sendfile_send(Send). -t_sendfile_hdtl(Config) -> - Filename = proplists:get_value(small_file, Config), - FileOpts = proplists:get_value(file_opts, Config, []), - - Send = fun(Sock, Headers, Trailers, HdtlSize) -> - {Size, Data} = sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read|FileOpts]), - AllSize = Size+HdtlSize, - {ok, AllSize} = file:sendfile( - D, Sock,0,0, - [{headers,Headers}, - {trailers,Trailers}]), - file:close(D), - Data - end, - - SendHdTl = fun(Sock) -> - Headers = [<<"header1">>,<<0:(1024*8)>>,"header2"], - Trailers = [<<"trailer1">>,"trailer2"], - D = Send(Sock,Headers,Trailers, - iolist_size([Headers,Trailers])), - iolist_to_binary([Headers,D,Trailers]) - end, - ok = sendfile_send(SendHdTl), - - SendHd = fun(Sock) -> - Headers = [<<"header1">>,"header2"], - D = Send(Sock,Headers,undefined, - iolist_size([Headers])), - iolist_to_binary([Headers,D]) - end, - ok = sendfile_send(SendHd), - - SendTl = fun(Sock) -> - Trailers = [<<"trailer1">>,"trailer2"], - D = Send(Sock,undefined,Trailers, - iolist_size([Trailers])), - iolist_to_binary([D,Trailers]) - end, - ok = sendfile_send(SendTl). - t_sendfile_sendafter(Config) -> Filename = proplists:get_value(small_file, Config), -- cgit v1.2.3 From fb8839de7ee499dfb4b534ad615114e01105e52c Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Tue, 29 Nov 2011 11:07:15 +0100 Subject: Skip recv/send during tests for fallback platforms --- lib/kernel/test/sendfile_SUITE.erl | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lib/kernel/test/sendfile_SUITE.erl b/lib/kernel/test/sendfile_SUITE.erl index 5312508918..f7bb6fb85f 100644 --- a/lib/kernel/test/sendfile_SUITE.erl +++ b/lib/kernel/test/sendfile_SUITE.erl @@ -54,6 +54,30 @@ init_per_suite(Config) -> end_per_suite(Config) -> file:delete(proplists:get_value(big_file, Config)). +init_per_testcase(TC,Config) when TC == t_sendfile_recvduring; + TC == t_sendfile_sendduring -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {_Size, Data} = sendfile_file_info(Filename), + {ok,D} = file:open(Filename, [raw,binary,read]), + prim_file:sendfile(D, Sock, 0, 0, 0, + [],[],false,false,false), + Data + end, + + %% Check if sendfile is supported on this platform + case catch sendfile_send(Send) of + ok -> + Config; + Error -> + ct:log("Error: ~p",[Error]), + {skip,"Not supported"} + end; +init_per_testcase(_Tc,Config) -> + Config. + + t_sendfile_small(Config) when is_list(Config) -> Filename = proplists:get_value(small_file, Config), -- cgit v1.2.3 From 62fffa75e2003b3f19eb7614307942028a400fd1 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 30 Nov 2011 11:09:29 +0100 Subject: Add sendfile server printouts --- lib/kernel/test/sendfile_SUITE.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/kernel/test/sendfile_SUITE.erl b/lib/kernel/test/sendfile_SUITE.erl index f7bb6fb85f..04af16a6b9 100644 --- a/lib/kernel/test/sendfile_SUITE.erl +++ b/lib/kernel/test/sendfile_SUITE.erl @@ -255,15 +255,20 @@ sendfile_do_recv(Sock, Bs) -> nomatch when is_integer(Bs) -> sendfile_do_recv(Sock, byte_size(B) + Bs); _ when is_list(Bs) -> + ct:log("Stopped due to a 1"), {ok, iolist_to_binary(lists:reverse([B|Bs]))}; _ when is_integer(Bs) -> + ct:log("Stopped due to a 1"), {ok, byte_size(B) + Bs} end; {tcp_closed, Sock} when is_list(Bs) -> + ct:log("Stopped due to close"), {ok, iolist_to_binary(lists:reverse(Bs))}; {tcp_closed, Sock} when is_integer(Bs) -> + ct:log("Stopped due to close"), {ok, Bs} after ?SENDFILE_TIMEOUT -> + ct:log("Sendfile timeout"), timeout end. -- cgit v1.2.3