From 020e988424cf0d15ebab8de50638492defb6f2b5 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 11 Nov 2011 16:19:07 +0100 Subject: Implement sendfile when there are no async threads When there are no async threads sendfile will use the ready_output select on the socket fd to know when to send data. The file_desc will also be put in the sending sendfile_state which buffers all other commands to that file until the sendfile is done. --- erts/emulator/drivers/common/efile_drv.c | 143 +++++++++++++++++++++---------- erts/emulator/drivers/common/erl_efile.h | 4 +- erts/emulator/drivers/unix/unix_efile.c | 30 +++---- 3 files changed, 112 insertions(+), 65 deletions(-) (limited to 'erts/emulator/drivers') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 4ed6aa4891..a318384479 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -76,11 +76,6 @@ #define FILE_OPT_DELAYED_WRITE 0 #define FILE_OPT_READ_AHEAD 1 -#define FILE_SENDFILE_OFFSET 0x10 -#define FILE_SENDFILE_NBYTES 0x08 -#define FILE_SENDFILE_NODISKIO 0x4 -#define FILE_SENDFILE_MNOWAIT 0x2 -#define FILE_SENDFILE_SYNC 0x1 /* IPREAD variants */ @@ -223,6 +218,7 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -230,10 +226,12 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); +static void file_stop_select(ErlDrvEvent event, void* _); enum e_timer {timer_idle, timer_again, timer_write}; +enum e_sendfile {sending, not_sending}; struct t_data; @@ -248,6 +246,7 @@ typedef struct { struct t_data *cq_head; /* Queue of incoming commands */ struct t_data *cq_tail; /* -""- */ enum e_timer timer_state; + enum e_sendfile sendfile_state; size_t read_bufsize; ErlDrvBinary *read_binp; size_t read_offset; @@ -268,9 +267,9 @@ struct erl_drv_entry efile_driver_entry = { file_init, file_start, file_stop, + file_output, NULL, - NULL, - NULL, + file_ready_output, "efile", NULL, NULL, @@ -287,7 +286,7 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, NULL, - NULL + file_stop_select }; @@ -339,6 +338,13 @@ struct t_readdir_buf { char buf[READDIR_BUFSIZE]; }; +struct t_sendfile_hdtl { + int hdr_cnt; /* number of header iovecs */ + struct iovec *headers; /* pointer to header iovecs */ + int trl_cnt; /* number of trailer iovecs */ + struct iovec *trailers; /* pointer to trailer iovecs */ +}; + struct t_data { struct t_data *next; @@ -406,18 +412,14 @@ struct t_data Sint64 length; int advise; } fadvise; -#ifdef HAVE_SENDFILE struct { int out_fd; off_t offset; size_t nbytes; - int flags; - int hdr_cnt; /* number of header iovecs */ - struct iovec *headers; /* pointer to header iovecs */ - int trl_cnt; /* number of trailer iovecs */ - struct iovec *trailers; /* pointer to trailer iovecs */ + Uint64 written; + short flags; + struct t_sendfile_hdtl *hdtl; } sendfile; -#endif } c; char b[1]; }; @@ -632,6 +634,7 @@ static struct t_data *cq_deq(file_descriptor *desc) { return d; } + /********************************************************************* * Driver entry point -> init */ @@ -674,6 +677,7 @@ file_start(ErlDrvPort port, char* command) desc->cq_head = NULL; desc->cq_tail = NULL; desc->timer_state = timer_idle; + desc->sendfile_state = not_sending; desc->read_bufsize = 0; desc->read_binp = NULL; desc->read_offset = 0; @@ -1713,30 +1717,65 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } -static void free_sendfile(void *data) { - EF_FREE(data); -} - static void invoke_sendfile(void *data) { - struct t_data *d = (struct t_data *) data; - int fd = (int)d->fd; - int out_fd = (int) d->c.sendfile.out_fd; - off_t offset = (off_t) d->c.sendfile.offset; - size_t nbytes = (size_t) d->c.sendfile.nbytes; - - d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes); - d->c.sendfile.offset = offset; - d->c.sendfile.nbytes = nbytes; + struct t_data *d = (struct t_data *)data; + int fd = d->fd; + int out_fd = d->c.sendfile.out_fd; + size_t nbytes = d->c.sendfile.nbytes; + int result = 0; d->again = 0; - if (d->result_ok) { - printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes); + result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); + + /* printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n", + result, errno, d->c.sendfile.offset,nbytes);*/ + d->c.sendfile.written += nbytes; + + if (result == 1) { + if (d->c.sendfile.nbytes == 0 && nbytes != 0) { + d->result_ok = 1; + } else if ((d->c.sendfile.nbytes - nbytes) != 0) { + d->result_ok = 1; + d->c.sendfile.nbytes -= nbytes; + } else { + printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); + d->result_ok = 0; + } + } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN + || d->errInfo.posix_errno == EINTR)) { + d->result_ok = 1; } else { - printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno)); + d->result_ok = -1; + printf("==> sendfile ERROR %s\r\n", erl_errno_id(d->errInfo.posix_errno)); } } +static void free_sendfile(void *data) { + EF_FREE(data); +} + +static void file_ready_output(ErlDrvData data, ErlDrvEvent event) +{ + file_descriptor* fd = (file_descriptor*) data; + + switch (fd->d->command) { + case FILE_SENDFILE: + driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd, + (int)ERL_DRV_WRITE,(int) 0); + invoke_sendfile((void *)fd->d); + file_async_ready((ErlDrvData)fd, (ErlDrvThreadData)fd->d); + break; + default: + break; + } +} + +static void file_stop_select(ErlDrvEvent event, void* _) +{ + /* TODO: close socket? */ +} + static void free_readdir(void *data) { struct t_data *d = (struct t_data *) data; @@ -1796,7 +1835,8 @@ static int try_again(file_descriptor *desc, struct t_data *d) { static void cq_execute(file_descriptor *desc) { struct t_data *d; register void *void_ptr; /* Soft cast variable */ - if (desc->timer_state == timer_again) + if (desc->timer_state == timer_again || + desc->sendfile_state == sending) return; if (! (d = cq_deq(desc))) return; @@ -2149,12 +2189,21 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - if (!d->result_ok) { + //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); + if (d->result_ok == -1) { + desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); - } else { - reply_Sint64(desc, d->c.sendfile.nbytes); + free_sendfile(data); + } else if (d->result_ok == 0) { + desc->sendfile_state = not_sending; + reply_Sint64(desc, d->c.sendfile.written); + free_sendfile(data); + } else if (d->result_ok == 1) { // If we are using select to send the rest of the data + desc->sendfile_state = sending; + desc->d = d; + driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); } - free_sendfile(data); break; default: abort(); @@ -3296,11 +3345,12 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ + case FILE_SENDFILE: { - struct t_data *d; - Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; - char flags; + struct t_data *d; + Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + char flags; /* DestFD:32, Offset:64, Bytes:64, ChunkSize:64, @@ -3330,26 +3380,27 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->c.sendfile.out_fd = (int) out_fd; d->c.sendfile.flags = (int) flags; + d->c.sendfile.written = 0; -#if SIZEOF_OFF_T == 4 + #if SIZEOF_OFF_T == 4 if (offsetH != 0) { reply_posix_error(desc, EINVAL); goto done; } d->c.sendfile.offset = (off_t) offsetT; -#else + #else d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; -#endif + #endif -#if SIZEOF_SIZE_T == 4 + #if SIZEOF_SIZE_T == 4 if (nbytesH != 0) { reply_posix_error(desc, EINVAL); goto done; } d->c.sendfile.nbytes = (size_t) nbytesT; -#else + #else d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; -#endif + #endif printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); @@ -3357,7 +3408,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { cq_enq(desc, d); goto done; - } /* case FILE_SENDFILE: */ + } /* case FILE_SENDFILE: */ } /* switch(command) */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 864b867955..e0b8cfca03 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,5 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); -int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t offset, - size_t *count); +int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *nbytes); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 05c2f1fce9..3a966757d9 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1469,31 +1469,27 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, } #ifdef HAVE_SENDFILE +#define SENDFILE_CHUNK_SIZE ((1 << 30) - 1) int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t offset, size_t *nbytes) + off_t *offset, size_t *nbytes) { #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval, nbytes_sent = 0; + ssize_t retval, written = 0; + // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); if (*nbytes == 0) { - *nbytes = (1 << 20) - 1; do { - retval = sendfile(out_fd, in_fd, &offset, *nbytes); - nbytes_sent += retval; - printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes); - } while ((retval == -1 && errno == EINTR) - || (retval > 0 && errno == EAGAIN)); + *nbytes = SENDFILE_CHUNK_SIZE; // chunk size + retval = sendfile(out_fd, in_fd, offset, *nbytes); + if (retval > 0) + written += retval; + } while (retval == SENDFILE_CHUNK_SIZE); } else { - do { - retval = sendfile(out_fd, in_fd, &offset, *nbytes); - if (retval > 0) { - nbytes_sent += retval; - *nbytes -= retval; - } - } while ((retval == -1 && errno == EINTR) - || (*nbytes > 0 && errno == EAGAIN)); + retval = sendfile(out_fd, in_fd, offset, *nbytes); + if (retval > 0) + written = retval; } - *nbytes = nbytes_sent; + *nbytes = written; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) off_t len = *nbytes; -- cgit v1.2.3