aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
authorLukas Larsson <lukas@erlang-solutions.com>2011-11-11 16:19:07 +0100
committerLukas Larsson <lukas@erlang-solutions.com>2011-12-01 14:10:03 +0100
commit020e988424cf0d15ebab8de50638492defb6f2b5 (patch)
treea953bbef07196e93619fd7b57c2d6f2783a993b9 /erts/emulator
parent54bdd9a15d2e130c76f76ca322af56b306d02078 (diff)
downloadotp-020e988424cf0d15ebab8de50638492defb6f2b5.tar.gz
otp-020e988424cf0d15ebab8de50638492defb6f2b5.tar.bz2
otp-020e988424cf0d15ebab8de50638492defb6f2b5.zip
Implement sendfile when there are no async threads
When there are no async threads sendfile will use the ready_output select on the socket fd to know when to send data. The file_desc will also be put in the sending sendfile_state which buffers all other commands to that file until the sendfile is done.
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/drivers/common/efile_drv.c143
-rw-r--r--erts/emulator/drivers/common/erl_efile.h4
-rw-r--r--erts/emulator/drivers/unix/unix_efile.c30
3 files changed, 112 insertions, 65 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c
index 4ed6aa4891..a318384479 100644
--- a/erts/emulator/drivers/common/efile_drv.c
+++ b/erts/emulator/drivers/common/efile_drv.c
@@ -76,11 +76,6 @@
#define FILE_OPT_DELAYED_WRITE 0
#define FILE_OPT_READ_AHEAD 1
-#define FILE_SENDFILE_OFFSET 0x10
-#define FILE_SENDFILE_NBYTES 0x08
-#define FILE_SENDFILE_NODISKIO 0x4
-#define FILE_SENDFILE_MNOWAIT 0x2
-#define FILE_SENDFILE_SYNC 0x1
/* IPREAD variants */
@@ -223,6 +218,7 @@ typedef unsigned char uchar;
static ErlDrvData file_start(ErlDrvPort port, char* command);
static int file_init(void);
static void file_stop(ErlDrvData);
+static void file_ready_output(ErlDrvData data, ErlDrvEvent event);
static void file_output(ErlDrvData, char* buf, int len);
static int file_control(ErlDrvData, unsigned int command,
char* buf, int len, char **rbuf, int rlen);
@@ -230,10 +226,12 @@ static void file_timeout(ErlDrvData);
static void file_outputv(ErlDrvData, ErlIOVec*);
static void file_async_ready(ErlDrvData, ErlDrvThreadData);
static void file_flush(ErlDrvData);
+static void file_stop_select(ErlDrvEvent event, void* _);
enum e_timer {timer_idle, timer_again, timer_write};
+enum e_sendfile {sending, not_sending};
struct t_data;
@@ -248,6 +246,7 @@ typedef struct {
struct t_data *cq_head; /* Queue of incoming commands */
struct t_data *cq_tail; /* -""- */
enum e_timer timer_state;
+ enum e_sendfile sendfile_state;
size_t read_bufsize;
ErlDrvBinary *read_binp;
size_t read_offset;
@@ -268,9 +267,9 @@ struct erl_drv_entry efile_driver_entry = {
file_init,
file_start,
file_stop,
+ file_output,
NULL,
- NULL,
- NULL,
+ file_ready_output,
"efile",
NULL,
NULL,
@@ -287,7 +286,7 @@ struct erl_drv_entry efile_driver_entry = {
ERL_DRV_FLAG_USE_PORT_LOCKING,
NULL,
NULL,
- NULL
+ file_stop_select
};
@@ -339,6 +338,13 @@ struct t_readdir_buf {
char buf[READDIR_BUFSIZE];
};
+struct t_sendfile_hdtl {
+ int hdr_cnt; /* number of header iovecs */
+ struct iovec *headers; /* pointer to header iovecs */
+ int trl_cnt; /* number of trailer iovecs */
+ struct iovec *trailers; /* pointer to trailer iovecs */
+};
+
struct t_data
{
struct t_data *next;
@@ -406,18 +412,14 @@ struct t_data
Sint64 length;
int advise;
} fadvise;
-#ifdef HAVE_SENDFILE
struct {
int out_fd;
off_t offset;
size_t nbytes;
- int flags;
- int hdr_cnt; /* number of header iovecs */
- struct iovec *headers; /* pointer to header iovecs */
- int trl_cnt; /* number of trailer iovecs */
- struct iovec *trailers; /* pointer to trailer iovecs */
+ Uint64 written;
+ short flags;
+ struct t_sendfile_hdtl *hdtl;
} sendfile;
-#endif
} c;
char b[1];
};
@@ -632,6 +634,7 @@ static struct t_data *cq_deq(file_descriptor *desc) {
return d;
}
+
/*********************************************************************
* Driver entry point -> init
*/
@@ -674,6 +677,7 @@ file_start(ErlDrvPort port, char* command)
desc->cq_head = NULL;
desc->cq_tail = NULL;
desc->timer_state = timer_idle;
+ desc->sendfile_state = not_sending;
desc->read_bufsize = 0;
desc->read_binp = NULL;
desc->read_offset = 0;
@@ -1713,30 +1717,65 @@ static void invoke_fadvise(void *data)
d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise);
}
-static void free_sendfile(void *data) {
- EF_FREE(data);
-}
-
static void invoke_sendfile(void *data)
{
- struct t_data *d = (struct t_data *) data;
- int fd = (int)d->fd;
- int out_fd = (int) d->c.sendfile.out_fd;
- off_t offset = (off_t) d->c.sendfile.offset;
- size_t nbytes = (size_t) d->c.sendfile.nbytes;
-
- d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes);
- d->c.sendfile.offset = offset;
- d->c.sendfile.nbytes = nbytes;
+ struct t_data *d = (struct t_data *)data;
+ int fd = d->fd;
+ int out_fd = d->c.sendfile.out_fd;
+ size_t nbytes = d->c.sendfile.nbytes;
+ int result = 0;
d->again = 0;
- if (d->result_ok) {
- printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes);
+ result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes);
+
+ /* printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n",
+ result, errno, d->c.sendfile.offset,nbytes);*/
+ d->c.sendfile.written += nbytes;
+
+ if (result == 1) {
+ if (d->c.sendfile.nbytes == 0 && nbytes != 0) {
+ d->result_ok = 1;
+ } else if ((d->c.sendfile.nbytes - nbytes) != 0) {
+ d->result_ok = 1;
+ d->c.sendfile.nbytes -= nbytes;
+ } else {
+ printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written);
+ d->result_ok = 0;
+ }
+ } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN
+ || d->errInfo.posix_errno == EINTR)) {
+ d->result_ok = 1;
} else {
- printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno));
+ d->result_ok = -1;
+ printf("==> sendfile ERROR %s\r\n", erl_errno_id(d->errInfo.posix_errno));
}
}
+static void free_sendfile(void *data) {
+ EF_FREE(data);
+}
+
+static void file_ready_output(ErlDrvData data, ErlDrvEvent event)
+{
+ file_descriptor* fd = (file_descriptor*) data;
+
+ switch (fd->d->command) {
+ case FILE_SENDFILE:
+ driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd,
+ (int)ERL_DRV_WRITE,(int) 0);
+ invoke_sendfile((void *)fd->d);
+ file_async_ready((ErlDrvData)fd, (ErlDrvThreadData)fd->d);
+ break;
+ default:
+ break;
+ }
+}
+
+static void file_stop_select(ErlDrvEvent event, void* _)
+{
+ /* TODO: close socket? */
+}
+
static void free_readdir(void *data)
{
struct t_data *d = (struct t_data *) data;
@@ -1796,7 +1835,8 @@ static int try_again(file_descriptor *desc, struct t_data *d) {
static void cq_execute(file_descriptor *desc) {
struct t_data *d;
register void *void_ptr; /* Soft cast variable */
- if (desc->timer_state == timer_again)
+ if (desc->timer_state == timer_again ||
+ desc->sendfile_state == sending)
return;
if (! (d = cq_deq(desc)))
return;
@@ -2149,12 +2189,21 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
free_preadv(data);
break;
case FILE_SENDFILE:
- if (!d->result_ok) {
+ //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok);
+ if (d->result_ok == -1) {
+ desc->sendfile_state = not_sending;
reply_error(desc, &d->errInfo);
- } else {
- reply_Sint64(desc, d->c.sendfile.nbytes);
+ free_sendfile(data);
+ } else if (d->result_ok == 0) {
+ desc->sendfile_state = not_sending;
+ reply_Sint64(desc, d->c.sendfile.written);
+ free_sendfile(data);
+ } else if (d->result_ok == 1) { // If we are using select to send the rest of the data
+ desc->sendfile_state = sending;
+ desc->d = d;
+ driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd,
+ ERL_DRV_USE|ERL_DRV_WRITE, 1);
}
- free_sendfile(data);
break;
default:
abort();
@@ -3296,11 +3345,12 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
goto done;
} /* case FILE_OPT_DELAYED_WRITE: */
} ASSERT(0); goto done; /* case FILE_SETOPT: */
+
case FILE_SENDFILE: {
- struct t_data *d;
- Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL;
- char flags;
+ struct t_data *d;
+ Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL;
+ char flags;
/* DestFD:32, Offset:64, Bytes:64,
ChunkSize:64,
@@ -3330,26 +3380,27 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
d->c.sendfile.out_fd = (int) out_fd;
d->c.sendfile.flags = (int) flags;
+ d->c.sendfile.written = 0;
-#if SIZEOF_OFF_T == 4
+ #if SIZEOF_OFF_T == 4
if (offsetH != 0) {
reply_posix_error(desc, EINVAL);
goto done;
}
d->c.sendfile.offset = (off_t) offsetT;
-#else
+ #else
d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL;
-#endif
+ #endif
-#if SIZEOF_SIZE_T == 4
+ #if SIZEOF_SIZE_T == 4
if (nbytesH != 0) {
reply_posix_error(desc, EINVAL);
goto done;
}
d->c.sendfile.nbytes = (size_t) nbytesT;
-#else
+ #else
d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL;
-#endif
+ #endif
printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags);
@@ -3357,7 +3408,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
cq_enq(desc, d);
goto done;
- } /* case FILE_SENDFILE: */
+ } /* case FILE_SENDFILE: */
} /* switch(command) */
diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h
index 864b867955..e0b8cfca03 100644
--- a/erts/emulator/drivers/common/erl_efile.h
+++ b/erts/emulator/drivers/common/erl_efile.h
@@ -162,5 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new);
int efile_may_openfile(Efile_error* errInfo, char *name);
int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length,
int advise);
-int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t offset,
- size_t *count);
+int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
+ off_t *offset, size_t *nbytes);
diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c
index 05c2f1fce9..3a966757d9 100644
--- a/erts/emulator/drivers/unix/unix_efile.c
+++ b/erts/emulator/drivers/unix/unix_efile.c
@@ -1469,31 +1469,27 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset,
}
#ifdef HAVE_SENDFILE
+#define SENDFILE_CHUNK_SIZE ((1 << 30) - 1)
int
efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
- off_t offset, size_t *nbytes)
+ off_t *offset, size_t *nbytes)
{
#if defined(__linux__) || (defined(__sun) && defined(__SVR4))
- ssize_t retval, nbytes_sent = 0;
+ ssize_t retval, written = 0;
+ // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes);
if (*nbytes == 0) {
- *nbytes = (1 << 20) - 1;
do {
- retval = sendfile(out_fd, in_fd, &offset, *nbytes);
- nbytes_sent += retval;
- printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes);
- } while ((retval == -1 && errno == EINTR)
- || (retval > 0 && errno == EAGAIN));
+ *nbytes = SENDFILE_CHUNK_SIZE; // chunk size
+ retval = sendfile(out_fd, in_fd, offset, *nbytes);
+ if (retval > 0)
+ written += retval;
+ } while (retval == SENDFILE_CHUNK_SIZE);
} else {
- do {
- retval = sendfile(out_fd, in_fd, &offset, *nbytes);
- if (retval > 0) {
- nbytes_sent += retval;
- *nbytes -= retval;
- }
- } while ((retval == -1 && errno == EINTR)
- || (*nbytes > 0 && errno == EAGAIN));
+ retval = sendfile(out_fd, in_fd, offset, *nbytes);
+ if (retval > 0)
+ written = retval;
}
- *nbytes = nbytes_sent;
+ *nbytes = written;
return check_error(retval == -1 ? -1 : 0, errInfo);
#elif defined(DARWIN)
off_t len = *nbytes;