aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/drivers/common/efile_drv.c102
-rw-r--r--erts/emulator/drivers/unix/unix_efile.c21
-rw-r--r--lib/kernel/test/sendfile_SUITE.erl86
3 files changed, 141 insertions, 68 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c
index 36ed108b76..a251b064da 100644
--- a/erts/emulator/drivers/common/efile_drv.c
+++ b/erts/emulator/drivers/common/efile_drv.c
@@ -156,11 +156,11 @@ static ErlDrvSysInfo sys_info;
* DARWIN. The testcase t_sendfile_crashduring reproduces
* this error when using +A 10.
*/
-#if !defined(DARWIN)
-#define USE_THRDS_FOR_SENDFILE (sys_info.async_threads > 0)
-#else
+#if defined(__APPLE__) && defined(__MACH__)
#define USE_THRDS_FOR_SENDFILE 0
-#endif /* !DARWIN */
+#else
+#define USE_THRDS_FOR_SENDFILE (sys_info.async_threads > 0)
+#endif /* defined(__APPLE__) && defined(__MACH__) */
@@ -259,6 +259,7 @@ static void file_stop_select(ErlDrvEvent event, void* _);
enum e_timer {timer_idle, timer_again, timer_write};
#ifdef HAVE_SENDFILE
enum e_sendfile {sending, not_sending};
+static void free_sendfile(void *data);
#endif /* HAVE_SENDFILE */
struct t_data;
@@ -445,6 +446,8 @@ struct t_data
} fadvise;
#ifdef HAVE_SENDFILE
struct {
+ ErlDrvPort port;
+ ErlDrvPDL q_mtx;
int out_fd;
off_t offset;
Uint64 nbytes;
@@ -752,15 +755,6 @@ file_stop(ErlDrvData e)
TRACE_C('p');
-#ifdef HAVE_SENDFILE
- if (desc->sendfile_state == sending && !USE_THRDS_FOR_SENDFILE) {
- driver_select(desc->port,(ErlDrvEvent)(long)desc->d->c.sendfile.out_fd,
- ERL_DRV_WRITE|ERL_DRV_USE,0);
- } else if (desc->sendfile_state == sending) {
- SET_NONBLOCKING(desc->d->c.sendfile.out_fd);
- }
-#endif /* HAVE_SENDFILE */
-
if (desc->fd != FILE_FD_INVALID) {
do_close(desc->flags, desc->fd);
desc->fd = FILE_FD_INVALID;
@@ -1783,26 +1777,31 @@ static void invoke_sendfile(void *data)
d->c.sendfile.written += nbytes;
- if (result == 1) {
- if (USE_THRDS_FOR_SENDFILE) {
- d->result_ok = 0;
- } else if (d->c.sendfile.nbytes == 0 && nbytes != 0) {
- d->result_ok = 1;
- } else if ((d->c.sendfile.nbytes - nbytes) != 0) {
- d->result_ok = 1;
- d->c.sendfile.nbytes -= nbytes;
- } else {
- d->result_ok = 0;
- }
+ if (result == 1 || (result == 0 && USE_THRDS_FOR_SENDFILE)) {
+ d->result_ok = 0;
} else if (result == 0 && (d->errInfo.posix_errno == EAGAIN
|| d->errInfo.posix_errno == EINTR)) {
+ if ((d->c.sendfile.nbytes - nbytes) != 0) {
d->result_ok = 1;
+ if (d->c.sendfile.nbytes != 0)
+ d->c.sendfile.nbytes -= nbytes;
+ } else
+ d->result_ok = 0;
} else {
d->result_ok = -1;
}
}
static void free_sendfile(void *data) {
+ struct t_data *d = (struct t_data *)data;
+ if (USE_THRDS_FOR_SENDFILE) {
+ SET_NONBLOCKING(d->c.sendfile.out_fd);
+ } else {
+ MUTEX_LOCK(d->c.sendfile.q_mtx);
+ driver_deq(d->c.sendfile.port,1);
+ MUTEX_UNLOCK(d->c.sendfile.q_mtx);
+ driver_select(d->c.sendfile.port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE_NO_CALLBACK|ERL_DRV_WRITE, 0);
+ }
EF_FREE(data);
}
@@ -1812,7 +1811,7 @@ static void file_ready_output(ErlDrvData data, ErlDrvEvent event)
switch (fd->d->command) {
case FILE_SENDFILE:
- driver_select(fd->port, event,
+ driver_select(fd->d->c.sendfile.port, event,
(int)ERL_DRV_WRITE,(int) 0);
invoke_sendfile((void *)fd->d);
file_async_ready(data, (ErlDrvThreadData)fd->d);
@@ -1826,6 +1825,15 @@ static void file_stop_select(ErlDrvEvent event, void* _)
{
}
+
+static int flush_sendfile(file_descriptor *desc,void *_) {
+ if (desc->sendfile_state == sending) {
+ desc->d->result_ok = -1;
+ desc->d->errInfo.posix_errno = ECONNABORTED;
+ file_async_ready((ErlDrvData)desc,(ErlDrvThreadData)desc->d);
+ }
+ return 1;
+}
#endif /* HAVE_SENDFILE */
@@ -2248,36 +2256,23 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
#ifdef HAVE_SENDFILE
case FILE_SENDFILE:
if (d->result_ok == -1) {
- desc->sendfile_state = not_sending;
if (d->errInfo.posix_errno == ECONNRESET ||
d->errInfo.posix_errno == ENOTCONN ||
d->errInfo.posix_errno == EPIPE)
reply_string_error(desc,"closed");
else
reply_error(desc, &d->errInfo);
- if (USE_THRDS_FOR_SENDFILE) {
- SET_NONBLOCKING(d->c.sendfile.out_fd);
- free_sendfile(data);
- } else {
- driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd,
- ERL_DRV_USE, 0);
- free_sendfile(data);
- }
- } else if (d->result_ok == 0) {
desc->sendfile_state = not_sending;
+ free_sendfile(data);
+ } else if (d->result_ok == 0) {
reply_Sint64(desc, d->c.sendfile.written);
- if (USE_THRDS_FOR_SENDFILE) {
- SET_NONBLOCKING(d->c.sendfile.out_fd);
- free_sendfile(data);
- } else {
- driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0);
- free_sendfile(data);
- }
+ desc->sendfile_state = not_sending;
+ free_sendfile(data);
} else if (d->result_ok == 1) { // If we are using select to send the rest of the data
desc->sendfile_state = sending;
desc->d = d;
driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd,
- ERL_DRV_USE|ERL_DRV_WRITE, 1);
+ ERL_DRV_USE_NO_CALLBACK|ERL_DRV_WRITE, 1);
}
break;
#endif
@@ -2655,6 +2650,10 @@ file_flush(ErlDrvData e) {
TRACE_C('f');
+#ifdef HAVE_SENDFILE
+ flush_sendfile(desc, NULL);
+#endif
+
#ifdef DEBUG
r =
#endif
@@ -3454,11 +3453,13 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
d->fd = desc->fd;
d->command = command;
d->invoke = invoke_sendfile;
- d->free = NULL;
+ d->free = free_sendfile;
d->level = 2;
d->c.sendfile.out_fd = (int) out_fd;
d->c.sendfile.written = 0;
+ d->c.sendfile.port = desc->port;
+ d->c.sendfile.q_mtx = desc->q_mtx;
#if SIZEOF_OFF_T == 4
if (offsetH != 0) {
@@ -3474,6 +3475,19 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
if (USE_THRDS_FOR_SENDFILE) {
SET_BLOCKING(d->c.sendfile.out_fd);
+ } else {
+ /**
+ * Write a place holder to queue in order to force file_flush
+ * to be called before the driver is closed.
+ */
+ char tmp[1] = "";
+ MUTEX_LOCK(d->c.sendfile.q_mtx);
+ if (driver_enq(d->c.sendfile.port, tmp, 1)) {
+ MUTEX_UNLOCK(d->c.sendfile.q_mtx);
+ reply_posix_error(desc, ENOMEM);
+ goto done;
+ }
+ MUTEX_UNLOCK(d->c.sendfile.q_mtx);
}
cq_enq(desc, d);
diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c
index 796843a735..dfb6cece14 100644
--- a/erts/emulator/drivers/unix/unix_efile.c
+++ b/erts/emulator/drivers/unix/unix_efile.c
@@ -1426,6 +1426,14 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset,
* you would have to emulate it in linux and on BSD/Darwin some complex
* calculations have to be made when using a non blocking socket to figure
* out how much of the header/file/trailer was sent in each command.
+ *
+ * The semantics of the API is this:
+ * Return value: 1 if all data was sent and the function does not need to
+ * be called again. 0 if an error occures OR if there is more data which
+ * has to be sent (EAGAIN or EINTR will be set appropriately)
+ *
+ * The amount of data written in a call is returned through nbytes.
+ *
*/
int
@@ -1446,8 +1454,11 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
*nbytes -= retval;
}
} while (retval == SENDFILE_CHUNK_SIZE);
- *nbytes = written;
- return check_error(retval == -1 ? -1 : 0, errInfo);
+ if (written != 0) {
+ // -1 is not returned by the linux API so we have to simulate it
+ retval = -1;
+ errno = EAGAIN;
+ }
#elif defined(__sun) && defined(__SVR4) && defined(HAVE_SENDFILEV)
ssize_t retval;
size_t len;
@@ -1469,8 +1480,6 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
written += len;
}
} while (len == SENDFILE_CHUNK_SIZE);
- *nbytes = written;
- return check_error(retval == -1 ? -1 : 0, errInfo);
#elif defined(DARWIN)
int retval;
off_t len;
@@ -1487,8 +1496,6 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
written += len;
}
} while (len == SENDFILE_CHUNK_SIZE);
- *nbytes = written;
- return check_error(retval, errInfo);
#elif defined(__FreeBSD__) || defined(__DragonFly__)
off_t len;
int retval;
@@ -1504,8 +1511,8 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
written += len;
}
} while(len == SENDFILE_CHUNK_SIZE);
+#endif
*nbytes = written;
return check_error(retval, errInfo);
-#endif
}
#endif /* HAVE_SENDFILE */
diff --git a/lib/kernel/test/sendfile_SUITE.erl b/lib/kernel/test/sendfile_SUITE.erl
index 6d0848ee05..03704b0c04 100644
--- a/lib/kernel/test/sendfile_SUITE.erl
+++ b/lib/kernel/test/sendfile_SUITE.erl
@@ -26,7 +26,9 @@
all() ->
[t_sendfile_small
- ,t_sendfile_big
+ ,t_sendfile_big_all
+ ,t_sendfile_big_size
+ ,t_sendfile_many_small
,t_sendfile_partial
,t_sendfile_offset
,t_sendfile_sendafter
@@ -38,20 +40,25 @@ all() ->
].
init_per_suite(Config) ->
- Priv = ?config(priv_dir, Config),
- SFilename = filename:join(Priv, "sendfile_small.html"),
- {ok, DS} = file:open(SFilename,[write,raw]),
- file:write(DS,"yo baby yo"),
- file:sync(DS),
- file:close(DS),
- BFilename = filename:join(Priv, "sendfile_big.html"),
- {ok, DB} = file:open(BFilename,[write,raw]),
- [file:write(DB,[<<0:(10*8*1024*1024)>>]) || _I <- lists:seq(1,51)],
- file:sync(DB),
- file:close(DB),
- [{small_file, SFilename},
- {file_opts,[raw,binary]},
- {big_file, BFilename}|Config].
+ case {os:type(),os:version()} of
+ {{unix,sunos}, {5,8,_}} ->
+ {skip, "Solaris 8 not supported for now"};
+ _ ->
+ Priv = ?config(priv_dir, Config),
+ SFilename = filename:join(Priv, "sendfile_small.html"),
+ {ok, DS} = file:open(SFilename,[write,raw]),
+ file:write(DS,"yo baby yo"),
+ file:sync(DS),
+ file:close(DS),
+ BFilename = filename:join(Priv, "sendfile_big.html"),
+ {ok, DB} = file:open(BFilename,[write,raw]),
+ [file:write(DB,[<<0:(10*8*1024*1024)>>]) || _I <- lists:seq(1,51)],
+ file:sync(DB),
+ file:close(DB),
+ [{small_file, SFilename},
+ {file_opts,[raw,binary]},
+ {big_file, BFilename}|Config]
+ end.
end_per_suite(Config) ->
file:delete(proplists:get_value(big_file, Config)).
@@ -91,7 +98,34 @@ t_sendfile_small(Config) when is_list(Config) ->
ok = sendfile_send(Send).
-t_sendfile_big(Config) when is_list(Config) ->
+t_sendfile_many_small(Config) when is_list(Config) ->
+ Filename = proplists:get_value(small_file, Config),
+ FileOpts = proplists:get_value(file_opts, Config, []),
+
+ error_logger:add_report_handler(?MODULE,[self()]),
+
+ Send = fun(Sock) ->
+ {Size,_} = sendfile_file_info(Filename),
+ N = 10000,
+ {ok,D} = file:open(Filename,[read|FileOpts]),
+ [begin
+ {ok,Size} = file:sendfile(D,Sock,0,0,[])
+ end || _I <- lists:seq(1,N)],
+ file:close(D),
+ Size*N
+ end,
+
+ ok = sendfile_send({127,0,0,1}, Send, 0),
+
+ receive
+ {stolen,Reason} ->
+ exit(Reason)
+ after 200 ->
+ ok
+ end.
+
+
+t_sendfile_big_all(Config) when is_list(Config) ->
Filename = proplists:get_value(big_file, Config),
Send = fun(Sock) ->
@@ -103,6 +137,20 @@ t_sendfile_big(Config) when is_list(Config) ->
ok = sendfile_send({127,0,0,1}, Send, 0).
+t_sendfile_big_size(Config) ->
+ Filename = proplists:get_value(big_file, Config),
+ FileOpts = proplists:get_value(file_opts, Config, []),
+
+ SendAll = fun(Sock) ->
+ {ok, #file_info{size = Size}} =
+ file:read_file_info(Filename),
+ {ok,D} = file:open(Filename,[read|FileOpts]),
+ {ok, Size} = file:sendfile(D, Sock,0,Size,[]),
+ Size
+ end,
+
+ ok = sendfile_send({127,0,0,1}, SendAll, 0).
+
t_sendfile_partial(Config) ->
Filename = proplists:get_value(small_file, Config),
FileOpts = proplists:get_value(file_opts, Config, []),
@@ -310,6 +358,10 @@ sendfile_server(ClientPid, Orig) ->
-define(SENDFILE_TIMEOUT, 10000).
sendfile_do_recv(Sock, Bs) ->
+ TimeoutMul = case os:type() of
+ {win32, _} -> 6;
+ _ -> 1
+ end,
receive
stop when Bs /= 0,is_integer(Bs) ->
gen_tcp:close(Sock),
@@ -333,7 +385,7 @@ sendfile_do_recv(Sock, Bs) ->
{tcp_closed, Sock} when is_integer(Bs) ->
ct:log("Stopped due to close"),
{ok, Bs}
- after ?SENDFILE_TIMEOUT ->
+ after ?SENDFILE_TIMEOUT * TimeoutMul ->
ct:log("Sendfile timeout"),
timeout
end.