diff options
-rw-r--r-- | erts/configure.in | 4 | ||||
-rw-r--r-- | erts/emulator/drivers/common/efile_drv.c | 50 | ||||
-rw-r--r-- | erts/emulator/drivers/common/erl_efile.h | 7 | ||||
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 18 | ||||
-rw-r--r-- | erts/emulator/drivers/unix/unix_efile.c | 40 | ||||
-rw-r--r-- | lib/kernel/src/file.erl | 6 | ||||
-rw-r--r-- | lib/kernel/test/sendfile_SUITE.erl | 97 |
7 files changed, 193 insertions, 29 deletions
diff --git a/erts/configure.in b/erts/configure.in index e6c412e666..50f8908f7a 100644 --- a/erts/configure.in +++ b/erts/configure.in @@ -1698,7 +1698,9 @@ case $host_os in AC_CHECK_FUNCS([sendfile]) ;; solaris*) - AC_SEARCH_LIBS(sendfile, sendfile, AC_DEFINE(HAVE_SENDFILE, 1)) + AC_SEARCH_LIBS(sendfilev, sendfile, + AC_DEFINE([HAVE_SENDFILEV],[1], + [Define to 1 if you have the `sendfilev' function.])) ;; win32) LIBS="$LIBS -lmswsock" diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index e7235c30f7..d9282dbb12 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -104,6 +104,7 @@ #ifndef WANT_NONBLOCKING #define WANT_NONBLOCKING #endif + #include "sys.h" #include "erl_driver.h" @@ -147,6 +148,22 @@ static ErlDrvSysInfo sys_info; #define MUTEX_UNLOCK(m) #endif + +/** + * On DARWIN sendfile can deadlock with close if called in + * different threads. So until Apple fixes so that sendfile + * is not buggy we disable usage of the async pool for + * DARWIN. The testcase t_sendfile_crashduring reproduces + * this error when using +A 10. + */ +#if !defined(DARWIN) +#define USE_THRDS_FOR_SENDFILE (sys_info.async_threads > 0) +#else +#define USE_THRDS_FOR_SENDFILE 0 +#endif /* !DARWIN */ + + + #if 0 /* Experimental, for forcing all file operations to use the same thread. */ static unsigned file_fixed_key = 1; @@ -734,6 +751,15 @@ file_stop(ErlDrvData e) TRACE_C('p'); +#ifdef HAVE_SENDFILE + if (desc->sendfile_state == sending && !USE_THRDS_FOR_SENDFILE) { + driver_select(desc->port,(ErlDrvEvent)(long)desc->d->c.sendfile.out_fd, + ERL_DRV_WRITE|ERL_DRV_USE,0); + } else if (desc->sendfile_state == sending) { + SET_NONBLOCKING(desc->d->c.sendfile.out_fd); + } +#endif /* HAVE_SENDFILE */ + if (desc->fd != FILE_FD_INVALID) { do_close(desc->flags, desc->fd); desc->fd = FILE_FD_INVALID; @@ -799,7 +825,16 @@ static void reply_Uint_posix_error(file_descriptor *desc, Uint num, driver_output2(desc->port, response, t-response, NULL, 0); } +static void reply_string_error(file_descriptor *desc, char* str) { + char response[256]; /* Response buffer. */ + char* s; + char* t; + response[0] = FILE_RESP_ERROR; + for (s = str, t = response+1; *s; s++, t++) + *t = tolower(*s); + driver_output2(desc->port, response, t-response, NULL, 0); +} static int reply_error(file_descriptor *desc, Efile_error *errInfo) /* The error codes. */ @@ -1744,7 +1779,7 @@ static void invoke_sendfile(void *data) d->c.sendfile.written += nbytes; if (result == 1) { - if (sys_info.async_threads != 0) { + if (USE_THRDS_FOR_SENDFILE) { d->result_ok = 0; } else if (d->c.sendfile.nbytes == 0 && nbytes != 0) { d->result_ok = 1; @@ -2209,8 +2244,13 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) case FILE_SENDFILE: if (d->result_ok == -1) { desc->sendfile_state = not_sending; - reply_error(desc, &d->errInfo); - if (sys_info.async_threads != 0) { + if (d->errInfo.posix_errno == ECONNRESET || + d->errInfo.posix_errno == ENOTCONN || + d->errInfo.posix_errno == EPIPE) + reply_string_error(desc,"closed"); + else + reply_error(desc, &d->errInfo); + if (USE_THRDS_FOR_SENDFILE) { SET_NONBLOCKING(d->c.sendfile.out_fd); free_sendfile(data); } else { @@ -2221,7 +2261,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; reply_Sint64(desc, d->c.sendfile.written); - if (sys_info.async_threads != 0) { + if (USE_THRDS_FOR_SENDFILE) { SET_NONBLOCKING(d->c.sendfile.out_fd); free_sendfile(data); } else { @@ -3427,7 +3467,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->c.sendfile.nbytes = nbytes; - if (sys_info.async_threads != 0) { + if (USE_THRDS_FOR_SENDFILE) { SET_BLOCKING(d->c.sendfile.out_fd); } diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 5c0b89e850..be1faa13f5 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -67,6 +67,11 @@ #define FILENAMES_16BIT 1 #endif +// We use sendfilev if it exist on solaris +#if !defined(HAVE_SENDFILE) && defined(HAVE_SENDFILEV) +#define HAVE_SENDFILE +#endif + /* * An handle to an open directory. To be cast to the correct type * in the system-dependent directory functions. @@ -122,7 +127,7 @@ typedef struct _Efile_info { #ifdef HAVE_SENDFILE /* - * Described the structure of header/trailers for sendfile + * Describes the structure of headers/trailers for sendfile */ struct t_sendfile_hdtl { SysIOVec *headers; diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index b452acba32..ee5ebdf646 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -516,7 +516,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) driver_select(port, e, mode | (on?ERL_DRV_USE:0), on) #define sock_select(d, flags, onoff) do { \ - ASSERT(!onoff || !(d)->is_ignored); \ + ASSERT(!(d)->is_ignored); \ (d)->event_mask = (onoff) ? \ ((d)->event_mask | (flags)) : \ ((d)->event_mask & ~(flags)); \ @@ -950,9 +950,9 @@ typedef struct { double send_avg; /* average packet size sent */ subs_list empty_out_q_subs; /* Empty out queue subscribers */ - int is_ignored; /* if a fd is ignored by from the inet_drv, - this should be set to true when the fd is used - outside of inet_drv. */ + int is_ignored; /* if a fd is ignored by the inet_drv. + This flag should be set to true when + the fd is used outside of inet_drv. */ } inet_descriptor; @@ -3816,7 +3816,13 @@ static void desc_close(inet_descriptor* desc) desc->forced_events = 0; desc->send_would_block = 0; #endif - driver_select(desc->port, (ErlDrvEvent)(long)desc->event, ERL_DRV_USE, 0); + // We should close the fd here, but the other driver might still + // be selecting on it. + if (!desc->is_ignored) + driver_select(desc->port,(ErlDrvEvent)(long)desc->event, + ERL_DRV_USE, 0); + else + inet_stop_select((ErlDrvEvent)(long)desc->event,NULL); desc->event = INVALID_EVENT; /* closed by stop_select callback */ desc->s = INVALID_SOCKET; desc->event_mask = 0; @@ -7732,8 +7738,8 @@ static int inet_ctl(inet_descriptor* desc, int cmd, char* buf, int len, return ctl_error(EINVAL, rbuf, rsize); if (*buf == 1 && !desc->is_ignored) { - desc->is_ignored = INET_IGNORE_READ; sock_select(desc, (FD_READ|FD_WRITE|FD_CLOSE|ERL_DRV_USE_NO_CALLBACK), 0); + desc->is_ignored = INET_IGNORE_READ; } else if (*buf == 0 && desc->is_ignored) { int flags = (FD_READ|FD_CLOSE|((desc->is_ignored & INET_IGNORE_WRITE)?FD_WRITE:0)); desc->is_ignored = INET_IGNORE_NONE; diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index d2d8713c1e..9160e2aed2 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1427,6 +1427,9 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, } #ifdef HAVE_SENDFILE + +// For some reason the maximum size_t cannot be used as the max size +// 3GB seems to work on all platforms #define SENDFILE_CHUNK_SIZE ((1 << 30) -1) /* @@ -1435,7 +1438,13 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, * we have to emulate some things in linux and play with variables on * bsd/darwin. * - * It could be possible to implement header/trailer in sendfile, though + * All of the calls will split a command which tries to send more than + * SENDFILE_CHUNK_SIZE of data at once. + * + * On platforms where *nbytes of 0 does not mean the entire file, this is + * simulated. + * + * It could be possible to implement header/trailer in sendfile. Though * you would have to emulate it in linux and on BSD/Darwin some complex * calculations have to be made when using a non blocking socket to figure * out how much of the header/file/trailer was sent in each command. @@ -1446,10 +1455,10 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl* hdtl) { Uint64 written = 0; -#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) +#if defined(__linux__) ssize_t retval; do { - // check if *nbytes is 0 or greater than the largest size_t + // check if *nbytes is 0 or greater than chunk size if (*nbytes == 0 || *nbytes > SENDFILE_CHUNK_SIZE) retval = sendfile(out_fd, in_fd, offset, SENDFILE_CHUNK_SIZE); else @@ -1461,11 +1470,34 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, } while (retval != -1 && retval == SENDFILE_CHUNK_SIZE); *nbytes = written; return check_error(retval == -1 ? -1 : 0, errInfo); +#elif defined(__sun) && defined(__SVR4) && defined(HAVE_SENDFILEV) + ssize_t retval; + size_t len; + sendfilevec_t fdrec; + fdrec.sfv_fd = in_fd; + fdrec.sfv_flag = 0; + do { + fdrec.sfv_off = *offset; + len = 0; + // check if *nbytes is 0 or greater than chunk size + if (*nbytes == 0 || *nbytes > SENDFILE_CHUNK_SIZE) + fdrec.sfv_len = SENDFILE_CHUNK_SIZE; + else + fdrec.sfv_len = *nbytes; + retval = sendfilev(out_fd, &fdrec, 1, &len); + if (retval != -1 || errno == EAGAIN || errno == EINTR) { + *offset += len; + *nbytes -= len; + written += len; + } + } while (len == SENDFILE_CHUNK_SIZE); + *nbytes = written; + return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) int retval; off_t len; do { - // check if *nbytes is 0 or greater than the largest off_t + // check if *nbytes is 0 or greater than chunk size if(*nbytes > SENDFILE_CHUNK_SIZE) len = SENDFILE_CHUNK_SIZE; else diff --git a/lib/kernel/src/file.erl b/lib/kernel/src/file.erl index 6dc2a26816..4028dd4f0b 100644 --- a/lib/kernel/src/file.erl +++ b/lib/kernel/src/file.erl @@ -1163,7 +1163,8 @@ change_time(Name, {{AY, AM, AD}, {AH, AMin, ASec}}=Atime, -define(MAX_CHUNK_SIZE, (1 bsl 20)*20). %% 20 MB, has to fit in primary memory -spec sendfile(RawFile, Socket, Offset, Bytes, Opts) -> - {'ok', non_neg_integer()} | {'error', inet:posix() | badarg | not_owner} when + {'ok', non_neg_integer()} | {'error', inet:posix() | + closed | badarg | not_owner} when RawFile :: file:fd(), Socket :: inet:socket(), Offset :: non_neg_integer(), @@ -1188,7 +1189,8 @@ sendfile(File, Sock, Offset, Bytes, Opts) -> %% sendfile/2 -spec sendfile(Filename, Socket) -> - {'ok', non_neg_integer()} | {'error', inet:posix() | badarg | not_owner} + {'ok', non_neg_integer()} | {'error', inet:posix() | + closed | badarg | not_owner} when Filename :: file:name(), Socket :: inet:socket(). sendfile(Filename, Sock) -> diff --git a/lib/kernel/test/sendfile_SUITE.erl b/lib/kernel/test/sendfile_SUITE.erl index 04af16a6b9..6d0848ee05 100644 --- a/lib/kernel/test/sendfile_SUITE.erl +++ b/lib/kernel/test/sendfile_SUITE.erl @@ -33,6 +33,8 @@ all() -> ,t_sendfile_recvafter ,t_sendfile_sendduring ,t_sendfile_recvduring + ,t_sendfile_closeduring + ,t_sendfile_crashduring ]. init_per_suite(Config) -> @@ -99,7 +101,7 @@ t_sendfile_big(Config) when is_list(Config) -> Size end, - ok = sendfile_send("localhost", Send, 0). + ok = sendfile_send({127,0,0,1}, Send, 0). t_sendfile_partial(Config) -> Filename = proplists:get_value(small_file, Config), @@ -185,14 +187,14 @@ t_sendfile_sendduring(Config) -> {ok, #file_info{size = Size}} = file:read_file_info(Filename), spawn_link(fun() -> - timer:sleep(10), + timer:sleep(50), ok = gen_tcp:send(Sock, <<2>>) end), {ok, Size} = file:sendfile(Filename, Sock), Size+1 end, - ok = sendfile_send("localhost", Send, 0). + ok = sendfile_send({127,0,0,1}, Send, 0). t_sendfile_recvduring(Config) -> Filename = proplists:get_value(big_file, Config), @@ -201,7 +203,7 @@ t_sendfile_recvduring(Config) -> {ok, #file_info{size = Size}} = file:read_file_info(Filename), spawn_link(fun() -> - timer:sleep(10), + timer:sleep(50), ok = gen_tcp:send(Sock, <<1>>), {ok,<<1>>} = gen_tcp:recv(Sock, 1) end), @@ -210,21 +212,83 @@ t_sendfile_recvduring(Config) -> Size+1 end, - ok = sendfile_send("localhost", Send, 0). + ok = sendfile_send({127,0,0,1}, Send, 0). -%% TODO: consolidate tests and reduce code +t_sendfile_closeduring(Config) -> + Filename = proplists:get_value(big_file, Config), + + Send = fun(Sock,SFServPid) -> + spawn_link(fun() -> + timer:sleep(50), + SFServPid ! stop + end), + case erlang:system_info(thread_pool_size) of + 0 -> + {error, closed} = file:sendfile(Filename, Sock); + _Else -> + %% This can return how much has been sent or + %% {error,closed} depending on OS. + %% How much is sent impossible to know as + %% the socket was closed mid sendfile + case file:sendfile(Filename, Sock) of + {error, closed} -> + ok; + {ok, Size} when is_integer(Size) -> + ok + end + end, + -1 + end, + + ok = sendfile_send({127,0,0,1}, Send, 0). + +t_sendfile_crashduring(Config) -> + Filename = proplists:get_value(big_file, Config), + + error_logger:add_report_handler(?MODULE,[self()]), + + Send = fun(Sock) -> + spawn_link(fun() -> + timer:sleep(50), + exit(die) + end), + {error, closed} = file:sendfile(Filename, Sock), + -1 + end, + process_flag(trap_exit,true), + spawn_link(fun() -> + ok = sendfile_send({127,0,0,1}, Send, 0) + end), + receive + {stolen,Reason} -> + process_flag(trap_exit,false), + ct:fail(Reason) + after 200 -> + receive + {'EXIT',_,Reason} -> + process_flag(trap_exit,false), + die = Reason + end + end. + +%% Generic sendfile server code sendfile_send(Send) -> - sendfile_send("localhost",Send). + sendfile_send({127,0,0,1},Send). sendfile_send(Host, Send) -> sendfile_send(Host, Send, []). sendfile_send(Host, Send, Orig) -> - spawn_link(?MODULE, sendfile_server, [self(), Orig]), + SFServer = spawn_link(?MODULE, sendfile_server, [self(), Orig]), receive {server, Port} -> {ok, Sock} = gen_tcp:connect(Host, Port, [binary,{packet,0}, {active,false}]), - Data = Send(Sock), + Data = case proplists:get_value(arity,erlang:fun_info(Send)) of + 1 -> + Send(Sock); + 2 -> + Send(Sock, SFServer) + end, ok = gen_tcp:close(Sock), receive {ok, Bin} -> @@ -245,9 +309,11 @@ sendfile_server(ClientPid, Orig) -> gen_tcp:send(Sock, <<1>>). -define(SENDFILE_TIMEOUT, 10000). -%% f(),{ok, S} = gen_tcp:connect("localhost",7890,[binary]),file:sendfile("/ldisk/lukas/otp/sendfiletest.dat",S). sendfile_do_recv(Sock, Bs) -> receive + stop when Bs /= 0,is_integer(Bs) -> + gen_tcp:close(Sock), + {ok, -1}; {tcp, Sock, B} -> case binary:match(B,<<1>>) of nomatch when is_list(Bs) -> @@ -276,3 +342,14 @@ sendfile_file_info(File) -> {ok, #file_info{size = Size}} = file:read_file_info(File), {ok, Data} = file:read_file(File), {Size, Data}. + + +%% Error handler + +init([Proc]) -> {ok,Proc}. + +handle_event({error,noproc,{emulator,Format,Args}}, Proc) -> + Proc ! {stolen,lists:flatten(io_lib:format(Format,Args))}, + {ok,Proc}; +handle_event(_, Proc) -> + {ok,Proc}. |