diff options
Diffstat (limited to 'erts/emulator/drivers/common/efile_drv.c')
-rw-r--r-- | erts/emulator/drivers/common/efile_drv.c | 319 |
1 files changed, 276 insertions, 43 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 901d98c09d..b132991a3b 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -55,6 +55,7 @@ #define FILE_READ_LINE 29 #define FILE_FDATASYNC 30 #define FILE_FADVISE 31 +#define FILE_SENDFILE 32 /* Return codes */ @@ -98,7 +99,14 @@ # include "config.h" #endif #include <stdlib.h> + +// Need (NON)BLOCKING macros for sendfile +#ifndef WANT_NONBLOCKING +#define WANT_NONBLOCKING +#endif + #include "sys.h" + #include "erl_driver.h" #include "erl_efile.h" #include "erl_threads.h" @@ -140,6 +148,22 @@ static ErlDrvSysInfo sys_info; #define MUTEX_UNLOCK(m) #endif + +/** + * On DARWIN sendfile can deadlock with close if called in + * different threads. So until Apple fixes so that sendfile + * is not buggy we disable usage of the async pool for + * DARWIN. The testcase t_sendfile_crashduring reproduces + * this error when using +A 10. + */ +#if !defined(DARWIN) +#define USE_THRDS_FOR_SENDFILE (sys_info.async_threads > 0) +#else +#define USE_THRDS_FOR_SENDFILE 0 +#endif /* !DARWIN */ + + + #if 0 /* Experimental, for forcing all file operations to use the same thread. */ static unsigned file_fixed_key = 1; @@ -217,17 +241,25 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); -static void file_output(ErlDrvData, char* buf, int len); -static int file_control(ErlDrvData, unsigned int command, - char* buf, int len, char **rbuf, int rlen); +static void file_output(ErlDrvData, char* buf, ErlDrvSizeT len); +static ErlDrvSSizeT file_control(ErlDrvData, unsigned int command, + char* buf, ErlDrvSizeT len, + char **rbuf, ErlDrvSizeT rlen); static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); +#ifdef HAVE_SENDFILE +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); +static void file_stop_select(ErlDrvEvent event, void* _); +#endif /* HAVE_SENDFILE */ enum e_timer {timer_idle, timer_again, timer_write}; +#ifdef HAVE_SENDFILE +enum e_sendfile {sending, not_sending}; +#endif /* HAVE_SENDFILE */ struct t_data; @@ -242,6 +274,9 @@ typedef struct { struct t_data *cq_head; /* Queue of incoming commands */ struct t_data *cq_tail; /* -""- */ enum e_timer timer_state; +#ifdef HAVE_SENDFILE + enum e_sendfile sendfile_state; +#endif /* HAVE_SENDFILE */ size_t read_bufsize; ErlDrvBinary *read_binp; size_t read_offset; @@ -264,7 +299,11 @@ struct erl_drv_entry efile_driver_entry = { file_stop, file_output, NULL, +#ifdef HAVE_SENDFILE + file_ready_output, +#else NULL, +#endif /* HAVE_SENDFILE */ "efile", NULL, NULL, @@ -279,7 +318,13 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_EXTENDED_MAJOR_VERSION, ERL_DRV_EXTENDED_MINOR_VERSION, ERL_DRV_FLAG_USE_PORT_LOCKING, + NULL, + NULL, +#ifdef HAVE_SENDFILE + file_stop_select +#else NULL +#endif /* HAVE_SENDFILE */ }; @@ -398,6 +443,14 @@ struct t_data Sint64 length; int advise; } fadvise; +#ifdef HAVE_SENDFILE + struct { + int out_fd; + off_t offset; + Uint64 nbytes; + Uint64 written; + } sendfile; +#endif /* HAVE_SENDFILE */ } c; char b[1]; }; @@ -485,7 +538,6 @@ static void *ef_safe_realloc(void *op, Uint s) : 0) - #if 0 static void ev_clear(ErlIOVec *ev) { @@ -613,7 +665,6 @@ static struct t_data *cq_deq(file_descriptor *desc) { } - /********************************************************************* * Driver entry point -> init */ @@ -628,6 +679,7 @@ file_init(void) ? atoi(buf) : 0); driver_system_info(&sys_info, sizeof(ErlDrvSysInfo)); + return 0; } @@ -655,6 +707,9 @@ file_start(ErlDrvPort port, char* command) desc->cq_head = NULL; desc->cq_tail = NULL; desc->timer_state = timer_idle; +#ifdef HAVE_SENDFILE + desc->sendfile_state = not_sending; +#endif desc->read_bufsize = 0; desc->read_binp = NULL; desc->read_offset = 0; @@ -697,6 +752,15 @@ file_stop(ErlDrvData e) TRACE_C('p'); +#ifdef HAVE_SENDFILE + if (desc->sendfile_state == sending && !USE_THRDS_FOR_SENDFILE) { + driver_select(desc->port,(ErlDrvEvent)(long)desc->d->c.sendfile.out_fd, + ERL_DRV_WRITE|ERL_DRV_USE,0); + } else if (desc->sendfile_state == sending) { + SET_NONBLOCKING(desc->d->c.sendfile.out_fd); + } +#endif /* HAVE_SENDFILE */ + if (desc->fd != FILE_FD_INVALID) { do_close(desc->flags, desc->fd); desc->fd = FILE_FD_INVALID; @@ -762,7 +826,16 @@ static void reply_Uint_posix_error(file_descriptor *desc, Uint num, driver_output2(desc->port, response, t-response, NULL, 0); } +static void reply_string_error(file_descriptor *desc, char* str) { + char response[256]; /* Response buffer. */ + char* s; + char* t; + response[0] = FILE_RESP_ERROR; + for (s = str, t = response+1; *s; s++, t++) + *t = tolower(*s); + driver_output2(desc->port, response, t-response, NULL, 0); +} static int reply_error(file_descriptor *desc, Efile_error *errInfo) /* The error codes. */ @@ -893,8 +966,6 @@ static int reply_eof(file_descriptor *desc) { driver_output2(desc->port, &c, 1, NULL, 0); return 0; } - - static void invoke_name(void *data, int (*f)(Efile_error *, char *)) { @@ -1013,7 +1084,7 @@ static void invoke_read_line(void *data) d->c.read_line.read_offset - d->c.read_line.read_size; if (size == 0) { /* Need more place */ - size_t need = (d->c.read_line.read_size >= DEFAULT_LINEBUF_SIZE) ? + ErlDrvSizeT need = (d->c.read_line.read_size >= DEFAULT_LINEBUF_SIZE) ? d->c.read_line.read_size + DEFAULT_LINEBUF_SIZE : DEFAULT_LINEBUF_SIZE; ErlDrvBinary *newbin = driver_alloc_binary(need); if (newbin == NULL) { @@ -1694,6 +1765,66 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } +#ifdef HAVE_SENDFILE +static void invoke_sendfile(void *data) +{ + struct t_data *d = (struct t_data *)data; + int fd = d->fd; + int out_fd = (int)d->c.sendfile.out_fd; + Uint64 nbytes = d->c.sendfile.nbytes; + int result = 0; + d->again = 0; + + result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes, NULL); + + d->c.sendfile.written += nbytes; + + if (result == 1) { + if (USE_THRDS_FOR_SENDFILE) { + d->result_ok = 0; + } else if (d->c.sendfile.nbytes == 0 && nbytes != 0) { + d->result_ok = 1; + } else if ((d->c.sendfile.nbytes - nbytes) != 0) { + d->result_ok = 1; + d->c.sendfile.nbytes -= nbytes; + } else { + d->result_ok = 0; + } + } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN + || d->errInfo.posix_errno == EINTR)) { + d->result_ok = 1; + } else { + d->result_ok = -1; + } +} + +static void free_sendfile(void *data) { + EF_FREE(data); +} + +static void file_ready_output(ErlDrvData data, ErlDrvEvent event) +{ + file_descriptor* fd = (file_descriptor*) data; + + switch (fd->d->command) { + case FILE_SENDFILE: + driver_select(fd->port, event, + (int)ERL_DRV_WRITE,(int) 0); + invoke_sendfile((void *)fd->d); + file_async_ready(data, (ErlDrvThreadData)fd->d); + break; + default: + break; + } +} + +static void file_stop_select(ErlDrvEvent event, void* _) +{ + +} +#endif /* HAVE_SENDFILE */ + + static void free_readdir(void *data) { struct t_data *d = (struct t_data *) data; @@ -1755,6 +1886,10 @@ static void cq_execute(file_descriptor *desc) { register void *void_ptr; /* Soft cast variable */ if (desc->timer_state == timer_again) return; +#ifdef HAVE_SENDFILE + if (desc->sendfile_state == sending) + return; +#endif if (! (d = cq_deq(desc))) return; TRACE_F(("x%i", (int) d->command)); @@ -2021,24 +2156,25 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) if (d->result_ok) { resbuf[0] = FILE_RESP_INFO; - put_int32(d->info.size_high, &resbuf[1 + (0 * 4)]); - put_int32(d->info.size_low, &resbuf[1 + (1 * 4)]); - put_int32(d->info.type, &resbuf[1 + (2 * 4)]); - - PUT_TIME(d->info.accessTime, resbuf + 1 + 3*4); - PUT_TIME(d->info.modifyTime, resbuf + 1 + 9*4); - PUT_TIME(d->info.cTime, resbuf + 1 + 15*4); - - put_int32(d->info.mode, &resbuf[1 + (21 * 4)]); - put_int32(d->info.links, &resbuf[1 + (22 * 4)]); - put_int32(d->info.major_device, &resbuf[1 + (23 * 4)]); - put_int32(d->info.minor_device, &resbuf[1 + (24 * 4)]); - put_int32(d->info.inode, &resbuf[1 + (25 * 4)]); - put_int32(d->info.uid, &resbuf[1 + (26 * 4)]); - put_int32(d->info.gid, &resbuf[1 + (27 * 4)]); - put_int32(d->info.access, &resbuf[1 + (28 * 4)]); - -#define RESULT_SIZE (1 + (29 * 4)) + put_int32(d->info.size_high, &resbuf[1 + ( 0 * 4)]); + put_int32(d->info.size_low, &resbuf[1 + ( 1 * 4)]); + put_int32(d->info.type, &resbuf[1 + ( 2 * 4)]); + + /* Note 64 bit indexing in resbuf here */ + put_int64(d->info.accessTime, &resbuf[1 + ( 3 * 4)]); + put_int64(d->info.modifyTime, &resbuf[1 + ( 5 * 4)]); + put_int64(d->info.cTime, &resbuf[1 + ( 7 * 4)]); + + put_int32(d->info.mode, &resbuf[1 + ( 9 * 4)]); + put_int32(d->info.links, &resbuf[1 + (10 * 4)]); + put_int32(d->info.major_device, &resbuf[1 + (11 * 4)]); + put_int32(d->info.minor_device, &resbuf[1 + (12 * 4)]); + put_int32(d->info.inode, &resbuf[1 + (13 * 4)]); + put_int32(d->info.uid, &resbuf[1 + (14 * 4)]); + put_int32(d->info.gid, &resbuf[1 + (15 * 4)]); + put_int32(d->info.access, &resbuf[1 + (16 * 4)]); + +#define RESULT_SIZE (1 + (17 * 4)) TRACE_C('R'); driver_output2(desc->port, resbuf, RESULT_SIZE, NULL, 0); #undef RESULT_SIZE @@ -2105,6 +2241,42 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } free_preadv(data); break; +#ifdef HAVE_SENDFILE + case FILE_SENDFILE: + if (d->result_ok == -1) { + desc->sendfile_state = not_sending; + if (d->errInfo.posix_errno == ECONNRESET || + d->errInfo.posix_errno == ENOTCONN || + d->errInfo.posix_errno == EPIPE) + reply_string_error(desc,"closed"); + else + reply_error(desc, &d->errInfo); + if (USE_THRDS_FOR_SENDFILE) { + SET_NONBLOCKING(d->c.sendfile.out_fd); + free_sendfile(data); + } else { + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, + ERL_DRV_USE, 0); + free_sendfile(data); + } + } else if (d->result_ok == 0) { + desc->sendfile_state = not_sending; + reply_Sint64(desc, d->c.sendfile.written); + if (USE_THRDS_FOR_SENDFILE) { + SET_NONBLOCKING(d->c.sendfile.out_fd); + free_sendfile(data); + } else { + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0); + free_sendfile(data); + } + } else if (d->result_ok == 1) { // If we are using select to send the rest of the data + desc->sendfile_state = sending; + desc->d = d; + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + } + break; +#endif default: abort(); } @@ -2120,7 +2292,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) * Driver entry point -> output */ static void -file_output(ErlDrvData e, char* buf, int count) +file_output(ErlDrvData e, char* buf, ErlDrvSizeT count) { file_descriptor* desc = (file_descriptor*)e; Efile_error errInfo; /* The error codes for the last operation. */ @@ -2355,15 +2527,16 @@ file_output(ErlDrvData e, char* buf, int count) case FILE_WRITE_INFO: { d = EF_SAFE_ALLOC(sizeof(struct t_data) - 1 - + FILENAME_BYTELEN(buf+21*4) + FILENAME_CHARSIZE); + + FILENAME_BYTELEN(buf + 9*4) + FILENAME_CHARSIZE); - d->info.mode = get_int32(buf + 0 * 4); - d->info.uid = get_int32(buf + 1 * 4); - d->info.gid = get_int32(buf + 2 * 4); - GET_TIME(d->info.accessTime, buf + 3 * 4); - GET_TIME(d->info.modifyTime, buf + 9 * 4); - GET_TIME(d->info.cTime, buf + 15 * 4); - FILENAME_COPY(d->b, buf+21*4); + d->info.mode = get_int32(buf + 0 * 4); + d->info.uid = get_int32(buf + 1 * 4); + d->info.gid = get_int32(buf + 2 * 4); + d->info.accessTime = (time_t)((Sint64)get_int64(buf + 3 * 4)); + d->info.modifyTime = (time_t)((Sint64)get_int64(buf + 5 * 4)); + d->info.cTime = (time_t)((Sint64)get_int64(buf + 7 * 4)); + + FILENAME_COPY(d->b, buf + 9*4); d->command = command; d->invoke = invoke_write_info; d->free = free_data; @@ -2496,9 +2669,9 @@ file_flush(ErlDrvData e) { /********************************************************************* * Driver entry point -> control */ -static int +static ErlDrvSSizeT file_control(ErlDrvData e, unsigned int command, - char* buf, int len, char **rbuf, int rlen) { + char* buf, ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen) { /* * warning: variable ‘desc’ set but not used * [-Wunused-but-set-variable] @@ -2799,8 +2972,8 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { cq_enq(desc, d); } goto done; case FILE_WRITE: { - int skip = 1; - int size = ev->size - skip; + ErlDrvSizeT skip = 1; + ErlDrvSizeT size = ev->size - skip; if (lseek_flush_read(desc, &err) < 0) { reply_posix_error(desc, err); goto done; @@ -2809,7 +2982,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { reply_posix_error(desc, EBADF); goto done; } - if (size <= 0) { + if (size == 0) { reply_Uint(desc, size); goto done; } @@ -2923,7 +3096,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { EF_FREE(d); reply_Uint(desc, 0); } else { - size_t skip = 1 + 4 + 8*(2*n); + ErlDrvSizeT skip = 1 + 4 + 8*(2*n); if (skip + total != ev->size) { /* Actual amount of data does not match * total of all pos/size specs @@ -3245,9 +3418,69 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ - + + case FILE_SENDFILE: { + +#ifdef HAVE_SENDFILE + struct t_data *d; + Uint32 out_fd, offsetH, offsetL, hd_len, tl_len; + Uint64 nbytes; + char flags; + + if (ev->size < 1 + 7 * sizeof(Uint32) + sizeof(char) + || !EV_GET_UINT32(ev, &out_fd, &p, &q) + || !EV_GET_CHAR(ev, &flags, &p, &q) + || !EV_GET_UINT32(ev, &offsetH, &p, &q) + || !EV_GET_UINT32(ev, &offsetL, &p, &q) + || !EV_GET_UINT64(ev, &nbytes, &p, &q) + || !EV_GET_UINT32(ev, &hd_len, &p, &q) + || !EV_GET_UINT32(ev, &tl_len, &p, &q)) { + /* Buffer has wrong length to contain all the needed values */ + reply_posix_error(desc, EINVAL); + goto done; + } + + if (hd_len != 0 || tl_len != 0 || flags != 0) { + // We do not allow header, trailers and/or flags right now + reply_posix_error(desc, EINVAL); + goto done; + } + + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = desc->fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = NULL; + d->level = 2; + + d->c.sendfile.out_fd = (int) out_fd; + d->c.sendfile.written = 0; + + #if SIZEOF_OFF_T == 4 + if (offsetH != 0) { + reply_posix_error(desc, EINVAL); + goto done; + } + d->c.sendfile.offset = (off_t) offsetL; + #else + d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; + #endif + + d->c.sendfile.nbytes = nbytes; + + if (USE_THRDS_FOR_SENDFILE) { + SET_BLOCKING(d->c.sendfile.out_fd); + } + + cq_enq(desc, d); +#else + reply_posix_error(desc, ENOTSUP); +#endif + goto done; + } /* case FILE_SENDFILE: */ + } /* switch(command) */ - + if (lseek_flush_read(desc, &err) < 0) { reply_posix_error(desc, err); goto done; |