aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2011-11-09 11:52:14 +0100
committerLukas Larsson <[email protected]>2011-12-01 14:10:02 +0100
commit54bdd9a15d2e130c76f76ca322af56b306d02078 (patch)
tree13a2f1d7c3d38291464d94b805b63a6feeeaee82
parentbfa81856150b59ea4578e0eef79b97ab0decb8f7 (diff)
downloadotp-54bdd9a15d2e130c76f76ca322af56b306d02078.tar.gz
otp-54bdd9a15d2e130c76f76ca322af56b306d02078.tar.bz2
otp-54bdd9a15d2e130c76f76ca322af56b306d02078.zip
Implement blocking calls for sendfile
Move sendfile data to invoke data instead of file_descr. Remove usage of ready_output when doing a send. If told to send 0 bytes, file_sendfile now sends the entire file for linux.
-rw-r--r--erts/emulator/drivers/common/efile_drv.c206
-rw-r--r--erts/emulator/drivers/unix/unix_efile.c31
-rw-r--r--erts/preloaded/src/prim_file.erl7
-rw-r--r--lib/kernel/src/gen_tcp.erl23
-rw-r--r--lib/kernel/test/gen_tcp_api_SUITE.erl59
5 files changed, 177 insertions, 149 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c
index b14f5844b2..4ed6aa4891 100644
--- a/erts/emulator/drivers/common/efile_drv.c
+++ b/erts/emulator/drivers/common/efile_drv.c
@@ -76,6 +76,11 @@
#define FILE_OPT_DELAYED_WRITE 0
#define FILE_OPT_READ_AHEAD 1
+#define FILE_SENDFILE_OFFSET 0x10
+#define FILE_SENDFILE_NBYTES 0x08
+#define FILE_SENDFILE_NODISKIO 0x4
+#define FILE_SENDFILE_MNOWAIT 0x2
+#define FILE_SENDFILE_SYNC 0x1
/* IPREAD variants */
@@ -218,7 +223,6 @@ typedef unsigned char uchar;
static ErlDrvData file_start(ErlDrvPort port, char* command);
static int file_init(void);
static void file_stop(ErlDrvData);
-static void file_ready_output(ErlDrvData data, ErlDrvEvent event);
static void file_output(ErlDrvData, char* buf, int len);
static int file_control(ErlDrvData, unsigned int command,
char* buf, int len, char **rbuf, int rlen);
@@ -226,7 +230,6 @@ static void file_timeout(ErlDrvData);
static void file_outputv(ErlDrvData, ErlIOVec*);
static void file_async_ready(ErlDrvData, ErlDrvThreadData);
static void file_flush(ErlDrvData);
-static void file_stop_select(ErlDrvEvent event, void* _);
@@ -256,18 +259,6 @@ typedef struct {
ErlDrvPDL q_mtx; /* Mutex for the driver queue, known by the emulator. Also used for
mutual exclusion when accessing field(s) below. */
size_t write_buffered;
- ErlDrvTermData caller; /* recipient of sync reply */
- /* sendfile call state to retry/resume on event */
- int command; /* same as d->command. for sendfile. TODO: this seems wrong */
- struct {
- int eagain;
- int out_fd;
- /* TODO: Use Sint64 instead? What about 32-bit off_t linux */
- off_t offset;
- size_t count;
- size_t chunksize;
- ErlDrvSInt64 written;
- } sendfile;
} file_descriptor;
@@ -279,7 +270,7 @@ struct erl_drv_entry efile_driver_entry = {
file_stop,
NULL,
NULL,
- file_ready_output,
+ NULL,
"efile",
NULL,
NULL,
@@ -296,7 +287,7 @@ struct erl_drv_entry efile_driver_entry = {
ERL_DRV_FLAG_USE_PORT_LOCKING,
NULL,
NULL,
- file_stop_select
+ NULL
};
@@ -415,6 +406,18 @@ struct t_data
Sint64 length;
int advise;
} fadvise;
+#ifdef HAVE_SENDFILE
+ struct {
+ int out_fd;
+ off_t offset;
+ size_t nbytes;
+ int flags;
+ int hdr_cnt; /* number of header iovecs */
+ struct iovec *headers; /* pointer to header iovecs */
+ int trl_cnt; /* number of trailer iovecs */
+ struct iovec *trailers; /* pointer to trailer iovecs */
+ } sendfile;
+#endif
} c;
char b[1];
};
@@ -1710,71 +1713,27 @@ static void invoke_fadvise(void *data)
d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise);
}
-
-
-static void do_sendfile(file_descriptor *desc);
-static void file_ready_output(ErlDrvData data, ErlDrvEvent event)
-{
- file_descriptor* d = (file_descriptor*) data;
-
- switch (d->command) {
- case FILE_SENDFILE:
- driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd,
- ERL_DRV_WRITE, 0);
- do_sendfile(d);
- break;
- default:
- break;
- }
-}
-
-static void file_stop_select(ErlDrvEvent event, void* _)
-{
- /* TODO: close socket? */
+static void free_sendfile(void *data) {
+ EF_FREE(data);
}
static void invoke_sendfile(void *data)
{
- ((struct t_data *)data)->again = 0;
-}
-
-static void do_sendfile(file_descriptor *d)
-{
- int fd = d->fd;
- int out_fd = d->sendfile.out_fd;
- off_t offset = d->sendfile.offset;
- size_t count = d->sendfile.count;
- size_t chunksize = count < d->sendfile.chunksize
- ? count : d->sendfile.chunksize;
- int result_ok = 0;
- Efile_error errInfo;
-
- result_ok = efile_sendfile(&errInfo, fd, out_fd, &offset, &chunksize);
-
- if (result_ok) {
- d->sendfile.offset += chunksize;
- d->sendfile.written += chunksize;
- d->sendfile.count -= chunksize;
- if (d->sendfile.count > 0) {
- driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd,
- ERL_DRV_USE|ERL_DRV_WRITE, 1);
- } else {
- printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain);
- reply_Uint(d, d->sendfile.written);
- }
- } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) {
- if (chunksize > 0) {
- d->sendfile.offset += chunksize;
- d->sendfile.written += chunksize;
- d->sendfile.count -= chunksize;
- }
- d->sendfile.eagain++;
+ struct t_data *d = (struct t_data *) data;
+ int fd = (int)d->fd;
+ int out_fd = (int) d->c.sendfile.out_fd;
+ off_t offset = (off_t) d->c.sendfile.offset;
+ size_t nbytes = (size_t) d->c.sendfile.nbytes;
+
+ d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes);
+ d->c.sendfile.offset = offset;
+ d->c.sendfile.nbytes = nbytes;
+ d->again = 0;
- driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd,
- ERL_DRV_USE|ERL_DRV_WRITE, 1);
+ if (d->result_ok) {
+ printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes);
} else {
- printf("==> sendfile ERROR %s\n", erl_errno_id(errInfo.posix_errno));
- ef_send_posix_error(d, d->caller, errInfo.posix_errno);
+ printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno));
}
}
@@ -2190,9 +2149,12 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
free_preadv(data);
break;
case FILE_SENDFILE:
- driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd,
- ERL_DRV_USE|ERL_DRV_WRITE, 1);
- free_data(data);
+ if (!d->result_ok) {
+ reply_error(desc, &d->errInfo);
+ } else {
+ reply_Sint64(desc, d->c.sendfile.nbytes);
+ }
+ free_sendfile(data);
break;
default:
abort();
@@ -3334,37 +3296,71 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
goto done;
} /* case FILE_OPT_DELAYED_WRITE: */
} ASSERT(0); goto done; /* case FILE_SETOPT: */
- case FILE_SENDFILE:
- {
+ case FILE_SENDFILE: {
+
struct t_data *d;
- d = EF_SAFE_ALLOC(sizeof(struct t_data));
- d->fd = desc->fd;
- d->command = command;
- d->invoke = invoke_sendfile;
- d->free = free_data;
- d->level = 2;
- desc->sendfile.out_fd = get_int32((uchar*) buf);
- /* TODO: are off_t and size_t 64bit on all platforms?
- off_t is 32bit on win32 msvc. maybe configurable in msvc.
- Maybe use '#if SIZEOF_SIZE_T == 4'? */
- desc->sendfile.offset = get_int64(((uchar*) buf)
- + sizeof(Sint32));
- desc->sendfile.count = get_int64(((uchar*) buf)
- + sizeof(Sint32)
- + sizeof(Sint64));
- desc->sendfile.chunksize = get_int64(((uchar*) buf)
- + sizeof(Sint32)
- + 2*sizeof(Sint64));
- desc->sendfile.written = 0;
- desc->sendfile.eagain = 0;
- /* TODO: shouldn't d->command be enough? */
- desc->command = command;
- desc->caller = driver_caller(desc->port);
+ Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL;
+ char flags;
+
+ /* DestFD:32, Offset:64, Bytes:64,
+ ChunkSize:64,
+ (get_bit(Nodiskio)):1,
+ (get_bit(MNowait)):1,
+ (get_bit(Sync)):1,0:5,
+ (encode_hdtl(Headers))/binary,
+ (encode_hdtl(Trailers))/binary */
+ if (ev->size < 1 + 1 + 5 * sizeof(Uint32) + sizeof(char)
+ || !EV_GET_UINT32(ev, &out_fd, &p, &q)
+ || !EV_GET_CHAR(ev, &flags, &p, &q)
+ || !EV_GET_UINT32(ev, &offsetH, &p, &q)
+ || !EV_GET_UINT32(ev, &offsetL, &p, &q)
+ || !EV_GET_UINT32(ev, &nbytesH, &p, &q)
+ || !EV_GET_UINT32(ev, &nbytesL, &p, &q)) {
+ /* Buffer has wrong length to contain all the needed values */
+ reply_posix_error(desc, EINVAL);
goto done;
}
-
+
+ d = EF_SAFE_ALLOC(sizeof(struct t_data));
+ d->fd = desc->fd;
+ d->command = command;
+ d->invoke = invoke_sendfile;
+ d->free = free_sendfile;
+ d->level = 2;
+
+ d->c.sendfile.out_fd = (int) out_fd;
+ d->c.sendfile.flags = (int) flags;
+
+#if SIZEOF_OFF_T == 4
+ if (offsetH != 0) {
+ reply_posix_error(desc, EINVAL);
+ goto done;
+ }
+ d->c.sendfile.offset = (off_t) offsetT;
+#else
+ d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL;
+#endif
+
+#if SIZEOF_SIZE_T == 4
+ if (nbytesH != 0) {
+ reply_posix_error(desc, EINVAL);
+ goto done;
+ }
+ d->c.sendfile.nbytes = (size_t) nbytesT;
+#else
+ d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL;
+#endif
+
+ printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags);
+
+ /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */
+
+ cq_enq(desc, d);
+ goto done;
+ } /* case FILE_SENDFILE: */
+
} /* switch(command) */
-
+
if (lseek_flush_read(desc, &err) < 0) {
reply_posix_error(desc, err);
goto done;
diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c
index 8b612164da..05c2f1fce9 100644
--- a/erts/emulator/drivers/unix/unix_efile.c
+++ b/erts/emulator/drivers/unix/unix_efile.c
@@ -1471,19 +1471,34 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset,
#ifdef HAVE_SENDFILE
int
efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
- off_t offset, size_t *ret_nbytes)
+ off_t offset, size_t *nbytes)
{
#if defined(__linux__) || (defined(__sun) && defined(__SVR4))
- ssize_t retval;
- do {
- retval = sendfile(out_fd, in_fd, &offset, *ret_nbytes);
- } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
- *ret_nbytes = retval;
+ ssize_t retval, nbytes_sent = 0;
+ if (*nbytes == 0) {
+ *nbytes = (1 << 20) - 1;
+ do {
+ retval = sendfile(out_fd, in_fd, &offset, *nbytes);
+ nbytes_sent += retval;
+ printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes);
+ } while ((retval == -1 && errno == EINTR)
+ || (retval > 0 && errno == EAGAIN));
+ } else {
+ do {
+ retval = sendfile(out_fd, in_fd, &offset, *nbytes);
+ if (retval > 0) {
+ nbytes_sent += retval;
+ *nbytes -= retval;
+ }
+ } while ((retval == -1 && errno == EINTR)
+ || (*nbytes > 0 && errno == EAGAIN));
+ }
+ *nbytes = nbytes_sent;
return check_error(retval == -1 ? -1 : 0, errInfo);
#elif defined(DARWIN)
- off_t len = *ret_nbytes;
+ off_t len = *nbytes;
int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0);
- *ret_nbytes = len;
+ *nbytes = len;
return check_error(retval, errInfo);
#elif defined(__FreeBSD__) || defined(__DragonFly__)
off_t len = 0;
diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl
index 0767067682..6bdf5f6e2e 100644
--- a/erts/preloaded/src/prim_file.erl
+++ b/erts/preloaded/src/prim_file.erl
@@ -547,12 +547,13 @@ write_file(_, _) ->
sendfile(#file_descriptor{module = ?MODULE, data = {Port, _}},
DestFD, Offset, Bytes, ChunkSize, Headers, Trailers,
Nodiskio, MNowait, Sync) ->
-
- drv_command(Port, <<?FILE_SENDFILE, DestFD:32, Offset:64, Bytes:64,
- ChunkSize:64,
+ drv_command(Port, <<?FILE_SENDFILE, DestFD:32,
(get_bit(Nodiskio)):1,
(get_bit(MNowait)):1,
(get_bit(Sync)):1,0:5,
+ Offset:64/unsigned,
+ Bytes:64/unsigned,
+ ChunkSize:64,
(encode_hdtl(Headers))/binary,
(encode_hdtl(Trailers))/binary>>).
diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl
index ea25dc3dc3..26afed4ff9 100644
--- a/lib/kernel/src/gen_tcp.erl
+++ b/lib/kernel/src/gen_tcp.erl
@@ -314,14 +314,14 @@ unrecv(S, Data) when is_port(S) ->
%% Send data using sendfile
%%
--define(MAX_CHUNK_SIZE, (1 bsl 30)*2-1).
+-define(MAX_CHUNK_SIZE, (1 bsl 20)*20). %% 20 MB, has to fit in primary memory
-spec sendfile(File, Sock, Offset, Bytes, Opts) ->
{'ok', non_neg_integer()} | {'error', inet:posix()} when
File :: file:io_device(),
Sock :: socket(),
- Offset :: non_neg_integer() | undefined,
- Bytes :: non_neg_integer() | undefined,
+ Offset :: non_neg_integer(),
+ Bytes :: non_neg_integer(),
Opts :: [sendfile_option()].
sendfile(File, Sock, Offset, Bytes, Opts) when is_pid(File) ->
Ref = erlang:monitor(process, File),
@@ -359,8 +359,7 @@ sendfile(File, Sock) ->
{error, Reason} ->
{error, Reason};
{ok, Fd} ->
- {ok, #file_info{size = Bytes}} = file:read_file_info(File),
- Res = sendfile(Fd, Sock, 0, Bytes, []),
+ Res = sendfile(Fd, Sock, 0, 0, []),
file:close(Fd),
Res
end.
@@ -461,21 +460,17 @@ sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize, Headers, Trailers) ->
end.
-sendfile_fallback(File, Sock, undefined, Bytes, ChunkSize) ->
- sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0);
sendfile_fallback(File, Sock, Offset, Bytes, ChunkSize) ->
- {ok, CurrPos} = file:position(File, 0),
+ {ok, CurrPos} = file:position(File, {cur, 0}),
{ok, _NewPos} = file:position(File, {bof, Offset}),
Res = sendfile_fallback_int(File, Sock, Bytes, ChunkSize, 0),
file:position(File, {bof, CurrPos}),
Res.
-sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) ->
- {ok, BytesSent};
sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent)
- when Bytes > BytesSent; Bytes == undefined ->
- Size = if Bytes == undefined ->
+ when Bytes > BytesSent; Bytes == 0 ->
+ Size = if Bytes == 0 ->
ChunkSize;
(Bytes - BytesSent + ChunkSize) > 0 ->
Bytes - BytesSent;
@@ -496,7 +491,9 @@ sendfile_fallback_int(File, Sock, Bytes, ChunkSize, BytesSent)
{ok, BytesSent};
Error ->
Error
- end.
+ end;
+sendfile_fallback_int(_File, _Sock, BytesSent, _ChunkSize, BytesSent) ->
+ {ok, BytesSent}.
sendfile_send(Sock, Data, Old) ->
Len = iolist_size(Data),
diff --git a/lib/kernel/test/gen_tcp_api_SUITE.erl b/lib/kernel/test/gen_tcp_api_SUITE.erl
index 492c60f521..60a3dcb2f4 100644
--- a/lib/kernel/test/gen_tcp_api_SUITE.erl
+++ b/lib/kernel/test/gen_tcp_api_SUITE.erl
@@ -34,7 +34,8 @@
t_recv_timeout/1, t_recv_eof/1,
t_shutdown_write/1, t_shutdown_both/1, t_shutdown_error/1,
t_fdopen/1, t_implicit_inet6/1,
- t_sendfile/0, t_sendfile/1, t_sendfile_hdtl/1, t_sendfile_partial/1,
+ t_sendfile_small/1, t_sendfile_big/1,
+ t_sendfile_hdtl/1, t_sendfile_partial/1,
t_sendfile_offset/1]).
-export([sendfile_server/1]).
@@ -56,7 +57,8 @@ groups() ->
{t_sendfile_ioserv, [], sendfile_all()}].
sendfile_all() ->
- [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset].
+% [t_sendfile,t_sendfile_hdtl, t_sendfile_partial, t_sendfile_offset].
+ [t_sendfile_big].
init_per_suite(Config) ->
Config.
@@ -66,19 +68,25 @@ end_per_suite(_Config) ->
init_per_group(t_sendfile, Config) ->
Priv = ?config(priv_dir, Config),
- Filename = filename:join(Priv, "sendfile_small.html"),
- {ok, D} = file:open(Filename,[write]),
- io:format(D,"yo baby yo",[]),
- file:sync(D),
- file:close(D),
- [{small_file, Filename}|Config];
+ SFilename = filename:join(Priv, "sendfile_small.html"),
+ {ok, DS} = file:open(SFilename,[write]),
+ io:format(DS,"yo baby yo",[]),
+ file:sync(DS),
+ file:close(DS),
+ BFilename = filename:join(Priv, "sendfile_big.html"),
+ {ok, DB} = file:open(BFilename,[write,raw]),
+ [file:write(DB,[<<0:(10*8*1024*1024)>>]) ||
+ _I <- lists:seq(1,51)],
+ file:sync(DB),
+ file:close(DB),
+ [{small_file, SFilename},{big_file, BFilename}|Config];
init_per_group(t_sendfile_raw, Config) ->
[{file_opts, [raw]}|Config];
init_per_group(_GroupName, Config) ->
Config.
end_per_group(_GroupName, Config) ->
- Config.
+ file:delete(proplists:get_value(big_file, Config)).
init_per_testcase(_Func, Config) ->
@@ -258,12 +266,19 @@ implicit_inet6(S, Addr) ->
?line ok = gen_tcp:close(S2),
?line ok = gen_tcp:close(S1).
+t_sendfile_small(Config) when is_list(Config) ->
+ Filename = proplists:get_value(small_file, Config),
+
+ Send = fun(Sock) ->
+ {Size, Data} = sendfile_file_info(Filename),
+ {ok, Size} = gen_tcp:sendfile(Filename, Sock),
+ Data
+ end,
-t_sendfile() ->
- [{timetrap, {seconds, 5}}].
+ ok = sendfile_send(Send).
-t_sendfile(Config) when is_list(Config) ->
- Filename = proplists:get_value(small_file, Config),
+t_sendfile_big(Config) when is_list(Config) ->
+ Filename = proplists:get_value(big_file, Config),
Send = fun(Sock) ->
{Size, Data} = sendfile_file_info(Filename),
@@ -281,7 +296,7 @@ t_sendfile_partial(Config) ->
{_Size, <<Data:5/binary,_/binary>>} =
sendfile_file_info(Filename),
{ok,D} = file:open(Filename,[read|FileOpts]),
- {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]),
+ {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]),
file:close(D),
Data
end,
@@ -289,20 +304,24 @@ t_sendfile_partial(Config) ->
{_Size, <<FData:5/binary,SData:3/binary,_/binary>>} =
sendfile_file_info(Filename),
- {ok,D} = file:open(Filename,[read|FileOpts]),
+ {ok,D} = file:open(Filename,[read,binary|FileOpts]),
+ {ok, <<FData/binary>>} = file:read(D,5),
FSend = fun(Sock) ->
- {ok,5} = gen_tcp:sendfile(D,Sock,undefined,5,[]),
+ {ok,5} = gen_tcp:sendfile(D,Sock,0,5,[]),
FData
end,
ok = sendfile_send(FSend),
SSend = fun(Sock) ->
- {ok,3} = gen_tcp:sendfile(D,Sock,undefined,3,[]),
+ {ok,3} = gen_tcp:sendfile(D,Sock,5,3,[]),
SData
end,
ok = sendfile_send(SSend),
+
+ {ok, <<SData/binary>>} = file:read(D,3),
+
file:close(D).
t_sendfile_offset(Config) ->
@@ -330,7 +349,7 @@ t_sendfile_hdtl(Config) ->
{ok,D} = file:open(Filename,[read|FileOpts]),
AllSize = Size+HdtlSize,
{ok, AllSize} = gen_tcp:sendfile(
- D, Sock,undefined,undefined,
+ D, Sock,0,0,
[{headers,Headers},
{trailers,Trailers}]),
file:close(D),
@@ -356,9 +375,9 @@ t_sendfile_hdtl(Config) ->
SendTl = fun(Sock) ->
Trailers = [<<"trailer1">>,"trailer2"],
- D = Send(Sock,Trailers,undefined,
+ D = Send(Sock,undefined,Trailers,
iolist_size([Trailers])),
- iolist_to_binary([Trailers,D])
+ iolist_to_binary([D,Trailers])
end,
ok = sendfile_send(SendTl).