diff options
author | Lukas Larsson <[email protected]> | 2011-11-09 11:52:14 +0100 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2011-12-01 14:10:02 +0100 |
commit | 54bdd9a15d2e130c76f76ca322af56b306d02078 (patch) | |
tree | 13a2f1d7c3d38291464d94b805b63a6feeeaee82 | |
parent | bfa81856150b59ea4578e0eef79b97ab0decb8f7 (diff) | |
download | otp-54bdd9a15d2e130c76f76ca322af56b306d02078.tar.gz otp-54bdd9a15d2e130c76f76ca322af56b306d02078.tar.bz2 otp-54bdd9a15d2e130c76f76ca322af56b306d02078.zip |
Implement blocking calls for sendfile
Move sendfile data to invoke data instead of file_descr.
Remove usage of ready_output when doing a send.
If told to send 0 bytes, file_sendfile now sends the entire file
for linux.
-rw-r--r-- | erts/emulator/drivers/common/efile_drv.c | 206 | ||||
-rw-r--r-- | erts/emulator/drivers/unix/unix_efile.c | 31 | ||||
-rw-r--r-- | erts/preloaded/src/prim_file.erl | 7 | ||||
-rw-r--r-- | lib/kernel/src/gen_tcp.erl | 23 | ||||
-rw-r--r-- | lib/kernel/test/gen_tcp_api_SUITE.erl | 59 |
5 files changed, 177 insertions, 149 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index b14f5844b2..4ed6aa4891 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -76,6 +76,11 @@ #define FILE_OPT_DELAYED_WRITE 0 #define FILE_OPT_READ_AHEAD 1 +#define FILE_SENDFILE_OFFSET 0x10 +#define FILE_SENDFILE_NBYTES 0x08 +#define FILE_SENDFILE_NODISKIO 0x4 +#define FILE_SENDFILE_MNOWAIT 0x2 +#define FILE_SENDFILE_SYNC 0x1 /* IPREAD variants */ @@ -218,7 +223,6 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); -static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -226,7 +230,6 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); -static void file_stop_select(ErlDrvEvent event, void* _); @@ -256,18 +259,6 @@ typedef struct { ErlDrvPDL q_mtx; /* Mutex for the driver queue, known by the emulator. Also used for mutual exclusion when accessing field(s) below. */ size_t write_buffered; - ErlDrvTermData caller; /* recipient of sync reply */ - /* sendfile call state to retry/resume on event */ - int command; /* same as d->command. for sendfile. TODO: this seems wrong */ - struct { - int eagain; - int out_fd; - /* TODO: Use Sint64 instead? What about 32-bit off_t linux */ - off_t offset; - size_t count; - size_t chunksize; - ErlDrvSInt64 written; - } sendfile; } file_descriptor; @@ -279,7 +270,7 @@ struct erl_drv_entry efile_driver_entry = { file_stop, NULL, NULL, - file_ready_output, + NULL, "efile", NULL, NULL, @@ -296,7 +287,7 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, NULL, - file_stop_select + NULL }; @@ -415,6 +406,18 @@ struct t_data Sint64 length; int advise; } fadvise; +#ifdef HAVE_SENDFILE + struct { + int out_fd; + off_t offset; + size_t nbytes; + int flags; + int hdr_cnt; /* number of header iovecs */ + struct iovec *headers; /* pointer to header iovecs */ + int trl_cnt; /* number of trailer iovecs */ + struct iovec *trailers; /* pointer to trailer iovecs */ + } sendfile; +#endif } c; char b[1]; }; @@ -1710,71 +1713,27 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } - - -static void do_sendfile(file_descriptor *desc); -static void file_ready_output(ErlDrvData data, ErlDrvEvent event) -{ - file_descriptor* d = (file_descriptor*) data; - - switch (d->command) { - case FILE_SENDFILE: - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_WRITE, 0); - do_sendfile(d); - break; - default: - break; - } -} - -static void file_stop_select(ErlDrvEvent event, void* _) -{ - /* TODO: close socket? */ +static void free_sendfile(void *data) { + EF_FREE(data); } static void invoke_sendfile(void *data) { - ((struct t_data *)data)->again = 0; -} - -static void do_sendfile(file_descriptor *d) -{ - int fd = d->fd; - int out_fd = d->sendfile.out_fd; - off_t offset = d->sendfile.offset; - size_t count = d->sendfile.count; - size_t chunksize = count < d->sendfile.chunksize - ? count : d->sendfile.chunksize; - int result_ok = 0; - Efile_error errInfo; - - result_ok = efile_sendfile(&errInfo, fd, out_fd, &offset, &chunksize); - - if (result_ok) { - d->sendfile.offset += chunksize; - d->sendfile.written += chunksize; - d->sendfile.count -= chunksize; - if (d->sendfile.count > 0) { - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); - } else { - printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain); - reply_Uint(d, d->sendfile.written); - } - } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) { - if (chunksize > 0) { - d->sendfile.offset += chunksize; - d->sendfile.written += chunksize; - d->sendfile.count -= chunksize; - } - d->sendfile.eagain++; + struct t_data *d = (struct t_data *) data; + int fd = (int)d->fd; + int out_fd = (int) d->c.sendfile.out_fd; + off_t offset = (off_t) d->c.sendfile.offset; + size_t nbytes = (size_t) d->c.sendfile.nbytes; + + d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes); + d->c.sendfile.offset = offset; + d->c.sendfile.nbytes = nbytes; + d->again = 0; - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); + if (d->result_ok) { + printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes); } else { - printf("==> sendfile ERROR %s\n", erl_errno_id(errInfo.posix_errno)); - ef_send_posix_error(d, d->caller, errInfo.posix_errno); + printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno)); } } @@ -2190,9 +2149,12 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); - free_data(data); + if (!d->result_ok) { + reply_error(desc, &d->errInfo); + } else { + reply_Sint64(desc, d->c.sendfile.nbytes); + } + free_sendfile(data); break; default: abort(); @@ -3334,37 +3296,71 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ - case FILE_SENDFILE: - { + case FILE_SENDFILE: { + struct t_data *d; - d = EF_SAFE_ALLOC(sizeof(struct t_data)); - d->fd = desc->fd; - d->command = command; - d->invoke = invoke_sendfile; - d->free = free_data; - d->level = 2; - desc->sendfile.out_fd = get_int32((uchar*) buf); - /* TODO: are off_t and size_t 64bit on all platforms? - off_t is 32bit on win32 msvc. maybe configurable in msvc. - Maybe use '#if SIZEOF_SIZE_T == 4'? */ - desc->sendfile.offset = get_int64(((uchar*) buf) - + sizeof(Sint32)); - desc->sendfile.count = get_int64(((uchar*) buf) - + sizeof(Sint32) - + sizeof(Sint64)); - desc->sendfile.chunksize = get_int64(((uchar*) buf) - + sizeof(Sint32) - + 2*sizeof(Sint64)); - desc->sendfile.written = 0; - desc->sendfile.eagain = 0; - /* TODO: shouldn't d->command be enough? */ - desc->command = command; - desc->caller = driver_caller(desc->port); + Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + char flags; + + /* DestFD:32, Offset:64, Bytes:64, + ChunkSize:64, + (get_bit(Nodiskio)):1, + (get_bit(MNowait)):1, + (get_bit(Sync)):1,0:5, + (encode_hdtl(Headers))/binary, + (encode_hdtl(Trailers))/binary */ + if (ev->size < 1 + 1 + 5 * sizeof(Uint32) + sizeof(char) + || !EV_GET_UINT32(ev, &out_fd, &p, &q) + || !EV_GET_CHAR(ev, &flags, &p, &q) + || !EV_GET_UINT32(ev, &offsetH, &p, &q) + || !EV_GET_UINT32(ev, &offsetL, &p, &q) + || !EV_GET_UINT32(ev, &nbytesH, &p, &q) + || !EV_GET_UINT32(ev, &nbytesL, &p, &q)) { + /* Buffer has wrong length to contain all the needed values */ + reply_posix_error(desc, EINVAL); goto done; } - + + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = desc->fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = free_sendfile; + d->level = 2; + + d->c.sendfile.out_fd = (int) out_fd; + d->c.sendfile.flags = (int) flags; + +#if SIZEOF_OFF_T == 4 + if (offsetH != 0) { + reply_posix_error(desc, EINVAL); + goto done; + } + d->c.sendfile.offset = (off_t) offsetT; +#else + d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; +#endif + +#if SIZEOF_SIZE_T == 4 + if (nbytesH != 0) { + reply_posix_error(desc, EINVAL); + goto done; + } + d->c.sendfile.nbytes = (size_t) nbytesT; +#else + d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; +#endif + + printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); + + /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ + + cq_enq(desc, d); + goto done; + } /* case FILE_SENDFILE: */ + } /* switch(command) */ - + if (lseek_flush_read(desc, &err) < 0) { reply_posix_error(desc, err); goto done; diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 8b612164da..05c2f1fce9 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1471,19 +1471,34 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t offset, size_t *ret_nbytes) + off_t offset, size_t *nbytes) { #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval; - do { - retval = sendfile(out_fd, in_fd, &offset, *ret_nbytes); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - *ret_nbytes = retval; + ssize_t retval, nbytes_sent = 0; + if (*nbytes == 0) { + *nbytes = (1 << 20) - 1; + do { + retval = sendfile(out_fd, in_fd, &offset, *nbytes); + nbytes_sent += retval; + printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes); + } while ((retval == -1 && errno == EINTR) + || (retval > 0 && errno == EAGAIN)); + } else { + do { + retval = sendfile(out_fd, in_fd, &offset, *nbytes); + if (retval > 0) { + nbytes_sent += retval; + *nbytes -= retval; + } + } while ((retval == -1 && errno == EINTR) + || (*nbytes > 0 && errno == EAGAIN)); + } + *nbytes = nbytes_sent; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) - off_t len = *ret_nbytes; + off_t len = *nbytes; int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); - *ret_nbytes = len; + *nbytes = len; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) off_t len = 0; diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index 0767067682..6bdf5f6e2e 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -547,12 +547,13 @@ write_file(_, _) -> sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}}, DestFD, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) -> - - drv_command(Port, <<?FILE_SENDFILE, DestFD:32, Offset:64, Bytes:64, - ChunkSize:64, + drv_command(Port, <<?FILE_SENDFILE, DestFD:32, (get_bit(Nodiskio)):1, (get_bit(MNowait)):1, (get_bit(Sync)):1,0:5, + Offset:64/unsigned, + Bytes:64/unsigned, + ChunkSize:64, (encode_hdtl(Headers))/binary, (encode_hdtl(Trailers))/binary>>). diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index ea25dc3dc3..26afed4ff9 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -314,14 +314,14 @@ unrecv(S, Data) when is_port(S) -> %% Send data using sendfile %% --define(MAX_CHUNK_SIZE, (1 bsl 30)*2-1). +-define(MAX_CHUNK_SIZE, (1 bsl 20)*20). %% 20 MB, has to fit in primary memory -spec sendfile(File, Sock, Offset, Bytes, Opts) -> {'ok', non_neg_integer()} | {'error', inet:posix()} when File :: file:io_device(), Sock :: socket(), - Offset :: non_neg_integer() | undefined, - Bytes :: non_neg_integer() | undefined, + Offset :: non_neg_integer(), + Bytes :: non_neg_integer(), Opts :: [sendfile_option()]. sendfile(File, Sock, Offset, Bytes, Opts) when is_pid(File) -> Ref = erlang:monitor(process, File), @@ -359,8 +359,7 @@ sendfile(File, Sock) -> {error, Reason} -> {error, Reason}; {ok, Fd} -> - {ok, #file_info{size = Bytes}} = file:read_file_info(File), - Res = sendfile(Fd, Sock, 0, Bytes, []), + Res = sendfile(Fd, Sock, 0, 0, []), file:close(Fd), Res end. @@ -461,21 +460,17 @@ sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) -> end. -sendfile_fallback(File, Sock, undefined, Bytes, ChunkSize) -> - sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0); sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) -> - {ok, CurrPos} = file:position(File, 0), + {ok, CurrPos} = file:position(File, {cur, 0}), {ok, _NewPos} = file:position(File, {bof, Offset}), Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0), file:position(File, {bof, CurrPos}), Res. -sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> - {ok, BytesSent}; sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) - when Bytes > BytesSent; Bytes == undefined -> - Size = if Bytes == undefined -> + when Bytes > BytesSent; Bytes == 0 -> + Size = if Bytes == 0 -> ChunkSize; (Bytes - BytesSent + ChunkSize) > 0 -> Bytes - BytesSent; @@ -496,7 +491,9 @@ sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent) {ok, BytesSent}; Error -> Error - end. + end; +sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) -> + {ok, BytesSent}. sendfile_send(Sock, Data, Old) -> Len = iolist_size(Data), diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl index 492c60f521..60a3dcb2f4 100644 --- a/lib/kernel/test/gen_tcp_api_SUITE.erl +++ b/lib/kernel/test/gen_tcp_api_SUITE.erl @@ -34,7 +34,8 @@ t_recv_timeout/1, t_recv_eof/1, t_shutdown_write/1, t_shutdown_both/1, t_shutdown_error/1, t_fdopen/1, t_implicit_inet6/1, - t_sendfile/0, t_sendfile/1, t_sendfile_hdtl/1, t_sendfile_partial/1, + t_sendfile_small/1, t_sendfile_big/1, + t_sendfile_hdtl/1, t_sendfile_partial/1, t_sendfile_offset/1]). -export([sendfile_server/1]). @@ -56,7 +57,8 @@ groups() -> {t_sendfile_ioserv, [], sendfile_all()}]. sendfile_all() -> - [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset]. +% [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset]. + [t_sendfile_big]. init_per_suite(Config) -> Config. @@ -66,19 +68,25 @@ end_per_suite(_Config) -> init_per_group(t_sendfile, Config) -> Priv = ?config(priv_dir, Config), - Filename = filename:join(Priv, "sendfile_small.html"), - {ok, D} = file:open(Filename,[write]), - io:format(D,"yo baby yo",[]), - file:sync(D), - file:close(D), - [{small_file, Filename}|Config]; + SFilename = filename:join(Priv, "sendfile_small.html"), + {ok, DS} = file:open(SFilename,[write]), + io:format(DS,"yo baby yo",[]), + file:sync(DS), + file:close(DS), + BFilename = filename:join(Priv, "sendfile_big.html"), + {ok, DB} = file:open(BFilename,[write,raw]), + [file:write(DB,[<<0:(10*8*1024*1024)>>]) || + _I <- lists:seq(1,51)], + file:sync(DB), + file:close(DB), + [{small_file, SFilename},{big_file, BFilename}|Config]; init_per_group(t_sendfile_raw, Config) -> [{file_opts, [raw]}|Config]; init_per_group(_GroupName, Config) -> Config. end_per_group(_GroupName, Config) -> - Config. + file:delete(proplists:get_value(big_file, Config)). init_per_testcase(_Func, Config) -> @@ -258,12 +266,19 @@ implicit_inet6(S, Addr) -> ?line ok = gen_tcp:close(S2), ?line ok = gen_tcp:close(S1). +t_sendfile_small(Config) when is_list(Config) -> + Filename = proplists:get_value(small_file, Config), + + Send = fun(Sock) -> + {Size, Data} = sendfile_file_info(Filename), + {ok, Size} = gen_tcp:sendfile(Filename, Sock), + Data + end, -t_sendfile() -> - [{timetrap, {seconds, 5}}]. + ok = sendfile_send(Send). -t_sendfile(Config) when is_list(Config) -> - Filename = proplists:get_value(small_file, Config), +t_sendfile_big(Config) when is_list(Config) -> + Filename = proplists:get_value(big_file, Config), Send = fun(Sock) -> {Size, Data} = sendfile_file_info(Filename), @@ -281,7 +296,7 @@ t_sendfile_partial(Config) -> {_Size, <<Data:5/binary,_/binary>>} = sendfile_file_info(Filename), {ok,D} = file:open(Filename,[read|FileOpts]), - {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]), + {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]), file:close(D), Data end, @@ -289,20 +304,24 @@ t_sendfile_partial(Config) -> {_Size, <<FData:5/binary,SData:3/binary,_/binary>>} = sendfile_file_info(Filename), - {ok,D} = file:open(Filename,[read|FileOpts]), + {ok,D} = file:open(Filename,[read,binary|FileOpts]), + {ok, <<FData/binary>>} = file:read(D,5), FSend = fun(Sock) -> - {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]), + {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]), FData end, ok = sendfile_send(FSend), SSend = fun(Sock) -> - {ok,3} = gen_tcp:sendfile(D,Sock,undefined,3,[]), + {ok,3} = gen_tcp:sendfile(D,Sock,5,3,[]), SData end, ok = sendfile_send(SSend), + + {ok, <<SData/binary>>} = file:read(D,3), + file:close(D). t_sendfile_offset(Config) -> @@ -330,7 +349,7 @@ t_sendfile_hdtl(Config) -> {ok,D} = file:open(Filename,[read|FileOpts]), AllSize = Size+HdtlSize, {ok, AllSize} = gen_tcp:sendfile( - D, Sock,undefined,undefined, + D, Sock,0,0, [{headers,Headers}, {trailers,Trailers}]), file:close(D), @@ -356,9 +375,9 @@ t_sendfile_hdtl(Config) -> SendTl = fun(Sock) -> Trailers = [<<"trailer1">>,"trailer2"], - D = Send(Sock,Trailers,undefined, + D = Send(Sock,undefined,Trailers, iolist_size([Trailers])), - iolist_to_binary([Trailers,D]) + iolist_to_binary([D,Trailers]) end, ok = sendfile_send(SendTl). |