From 020e988424cf0d15ebab8de50638492defb6f2b5 Mon Sep 17 00:00:00 2001
From: Lukas Larsson <lukas@erlang-solutions.com>
Date: Fri, 11 Nov 2011 16:19:07 +0100
Subject: Implement sendfile when there are no async threads

When there are no async threads sendfile will use the
ready_output select on the socket fd to know when to send
data.

The file_desc will also be put in the sending sendfile_state
which buffers all other commands to that file until the
sendfile is done.
---
 erts/emulator/drivers/common/efile_drv.c | 143 +++++++++++++++++++++----------
 erts/emulator/drivers/common/erl_efile.h |   4 +-
 erts/emulator/drivers/unix/unix_efile.c  |  30 +++----
 3 files changed, 112 insertions(+), 65 deletions(-)

(limited to 'erts')

diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c
index 4ed6aa4891..a318384479 100644
--- a/erts/emulator/drivers/common/efile_drv.c
+++ b/erts/emulator/drivers/common/efile_drv.c
@@ -76,11 +76,6 @@
 
 #define FILE_OPT_DELAYED_WRITE 0
 #define FILE_OPT_READ_AHEAD    1
-#define FILE_SENDFILE_OFFSET 0x10
-#define FILE_SENDFILE_NBYTES 0x08
-#define FILE_SENDFILE_NODISKIO 0x4
-#define FILE_SENDFILE_MNOWAIT 0x2
-#define FILE_SENDFILE_SYNC 0x1
 
 /* IPREAD variants */
 
@@ -223,6 +218,7 @@ typedef unsigned char uchar;
 static ErlDrvData file_start(ErlDrvPort port, char* command);
 static int file_init(void);
 static void file_stop(ErlDrvData);
+static void file_ready_output(ErlDrvData data, ErlDrvEvent event);
 static void file_output(ErlDrvData, char* buf, int len);
 static int file_control(ErlDrvData, unsigned int command, 
 			char* buf, int len, char **rbuf, int rlen);
@@ -230,10 +226,12 @@ static void file_timeout(ErlDrvData);
 static void file_outputv(ErlDrvData, ErlIOVec*);
 static void file_async_ready(ErlDrvData, ErlDrvThreadData);
 static void file_flush(ErlDrvData);
+static void file_stop_select(ErlDrvEvent event, void* _);
 
 
 
 enum e_timer {timer_idle, timer_again, timer_write};
+enum e_sendfile {sending, not_sending};
 
 struct t_data;
 
@@ -248,6 +246,7 @@ typedef struct {
     struct t_data  *cq_head;  /* Queue of incoming commands */
     struct t_data  *cq_tail;  /* -""- */
     enum e_timer    timer_state;
+    enum e_sendfile sendfile_state;
     size_t          read_bufsize;
     ErlDrvBinary   *read_binp;
     size_t          read_offset;
@@ -268,9 +267,9 @@ struct erl_drv_entry efile_driver_entry = {
     file_init,
     file_start,
     file_stop,
+    file_output,
     NULL,
-    NULL,
-    NULL,
+    file_ready_output,
     "efile",
     NULL,
     NULL,
@@ -287,7 +286,7 @@ struct erl_drv_entry efile_driver_entry = {
     ERL_DRV_FLAG_USE_PORT_LOCKING,
     NULL,
     NULL,
-    NULL
+    file_stop_select
 };
 
 
@@ -339,6 +338,13 @@ struct t_readdir_buf {
      char buf[READDIR_BUFSIZE];
 };
 
+struct t_sendfile_hdtl {
+    int hdr_cnt; 	   /* number of header iovecs */
+    struct iovec *headers;  /* pointer to header iovecs */
+    int trl_cnt; 	   /* number of trailer iovecs */
+    struct iovec *trailers; /* pointer to trailer iovecs */
+};
+
 struct t_data
 {
     struct t_data *next;
@@ -406,18 +412,14 @@ struct t_data
 	    Sint64 length;
 	    int advise;
 	} fadvise;
-#ifdef HAVE_SENDFILE
 	struct {
 	    int out_fd;
 	    off_t offset;
 	    size_t nbytes;
-	    int flags;
-	    int hdr_cnt; 	   /* number of header iovecs */
-	    struct iovec *headers;  /* pointer to header iovecs */
-	    int trl_cnt; 	   /* number of trailer iovecs */
-	    struct iovec *trailers; /* pointer to trailer iovecs */
+	    Uint64 written;
+	    short flags;
+	    struct t_sendfile_hdtl *hdtl;
 	} sendfile;
-#endif
     } c;
     char b[1];
 };
@@ -632,6 +634,7 @@ static struct t_data *cq_deq(file_descriptor *desc) {
     return d;
 }
 
+
 /*********************************************************************
  * Driver entry point -> init
  */
@@ -674,6 +677,7 @@ file_start(ErlDrvPort port, char* command)
     desc->cq_head = NULL;
     desc->cq_tail = NULL;
     desc->timer_state = timer_idle;
+    desc->sendfile_state = not_sending;
     desc->read_bufsize = 0;
     desc->read_binp = NULL;
     desc->read_offset = 0;
@@ -1713,30 +1717,65 @@ static void invoke_fadvise(void *data)
     d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise);
 }
 
-static void free_sendfile(void *data) {
-    EF_FREE(data);
-}
-
 static void invoke_sendfile(void *data)
 {
-    struct t_data *d = (struct t_data *) data;
-    int fd = (int)d->fd;
-    int out_fd = (int) d->c.sendfile.out_fd;
-    off_t offset = (off_t) d->c.sendfile.offset;
-    size_t nbytes = (size_t) d->c.sendfile.nbytes;
-
-    d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes);
-    d->c.sendfile.offset = offset;
-    d->c.sendfile.nbytes = nbytes;
+    struct t_data *d = (struct t_data *)data;
+    int fd = d->fd;
+    int out_fd = d->c.sendfile.out_fd;
+    size_t nbytes = d->c.sendfile.nbytes;
+    int result = 0;
     d->again = 0;
 
-    if (d->result_ok) {
-	printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes);
+    result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes);
+
+    /* printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n",
+	    result, errno, d->c.sendfile.offset,nbytes);*/
+    d->c.sendfile.written += nbytes;
+
+    if (result == 1) {
+	if (d->c.sendfile.nbytes == 0 && nbytes != 0) {
+	    d->result_ok = 1;
+	} else if ((d->c.sendfile.nbytes - nbytes) != 0) {
+	    d->result_ok = 1;
+	    d->c.sendfile.nbytes -= nbytes;
+	} else {
+	    printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written);
+	    d->result_ok = 0;
+	}
+    } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN
+				 || d->errInfo.posix_errno == EINTR)) {
+	d->result_ok = 1;
     } else {
-	printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno));
+	d->result_ok = -1;
+	printf("==> sendfile ERROR %s\r\n", erl_errno_id(d->errInfo.posix_errno));
     }
 }
 
+static void free_sendfile(void *data) {
+    EF_FREE(data);
+}
+
+static void file_ready_output(ErlDrvData data, ErlDrvEvent event)
+{
+    file_descriptor* fd = (file_descriptor*) data;
+
+    switch (fd->d->command) {
+    case FILE_SENDFILE:
+	driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd,
+		      (int)ERL_DRV_WRITE,(int) 0);
+	invoke_sendfile((void *)fd->d);
+	file_async_ready((ErlDrvData)fd, (ErlDrvThreadData)fd->d);
+	break;
+    default:
+	break;
+    }
+}
+
+static void file_stop_select(ErlDrvEvent event, void* _)
+{
+    /* TODO: close socket? */
+}
+
 static void free_readdir(void *data)
 {
     struct t_data *d = (struct t_data *) data;
@@ -1796,7 +1835,8 @@ static int try_again(file_descriptor *desc, struct t_data *d) {
 static void cq_execute(file_descriptor *desc) {
     struct t_data *d;
     register void *void_ptr; /* Soft cast variable */
-    if (desc->timer_state == timer_again)
+    if (desc->timer_state == timer_again ||
+	    desc->sendfile_state == sending)
 	return;
     if (! (d = cq_deq(desc)))
 	return;
@@ -2149,12 +2189,21 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
 	  free_preadv(data);
 	  break;
       case FILE_SENDFILE:
-	  if (!d->result_ok) {
+	  //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok);
+	  if (d->result_ok == -1) {
+	      desc->sendfile_state = not_sending;
 	      reply_error(desc, &d->errInfo);
-	  } else {
-	      reply_Sint64(desc, d->c.sendfile.nbytes);
+	      free_sendfile(data);
+	  } else if (d->result_ok == 0) {
+	      desc->sendfile_state = not_sending;
+	      reply_Sint64(desc, d->c.sendfile.written);
+	      free_sendfile(data);
+	  } else if (d->result_ok == 1) { // If we are using select to send the rest of the data
+	      desc->sendfile_state = sending;
+	      desc->d = d;
+	      driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd,
+			    ERL_DRV_USE|ERL_DRV_WRITE, 1);
 	  }
-	  free_sendfile(data);
 	  break;
       default:
 	abort();
@@ -3296,11 +3345,12 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
 	    goto done;
 	} /* case FILE_OPT_DELAYED_WRITE: */
     } ASSERT(0); goto done; /* case FILE_SETOPT: */
+
     case FILE_SENDFILE: {
 
-	struct t_data *d;
-	Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL;
-	char flags;
+		struct t_data *d;
+		Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL;
+		char flags;
 
 	/* DestFD:32, Offset:64, Bytes:64,
 	 ChunkSize:64,
@@ -3330,26 +3380,27 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
 
 	d->c.sendfile.out_fd = (int) out_fd;
 	d->c.sendfile.flags = (int) flags;
+	d->c.sendfile.written = 0;
 
-#if SIZEOF_OFF_T == 4
+    #if SIZEOF_OFF_T == 4
 	if (offsetH != 0) {
 	    reply_posix_error(desc, EINVAL);
 	    goto done;
 	}
 	d->c.sendfile.offset = (off_t) offsetT;
-#else
+    #else
 	d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL;
-#endif
+    #endif
 
-#if SIZEOF_SIZE_T == 4
+    #if SIZEOF_SIZE_T == 4
 	if (nbytesH != 0) {
 	    reply_posix_error(desc, EINVAL);
 	    goto done;
 	}
 	d->c.sendfile.nbytes = (size_t) nbytesT;
-#else
+    #else
 	d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL;
-#endif
+    #endif
 
 	printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags);
 
@@ -3357,7 +3408,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
 
 	cq_enq(desc, d);
 	goto done;
-    } /* case FILE_SENDFILE: */
+        } /* case FILE_SENDFILE: */
 
     } /* switch(command) */
 
diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h
index 864b867955..e0b8cfca03 100644
--- a/erts/emulator/drivers/common/erl_efile.h
+++ b/erts/emulator/drivers/common/erl_efile.h
@@ -162,5 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new);
 int efile_may_openfile(Efile_error* errInfo, char *name);
 int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length,
 		  int advise);
-int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t offset,
-		   size_t *count);
+int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
+		      off_t *offset, size_t *nbytes);
diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c
index 05c2f1fce9..3a966757d9 100644
--- a/erts/emulator/drivers/unix/unix_efile.c
+++ b/erts/emulator/drivers/unix/unix_efile.c
@@ -1469,31 +1469,27 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset,
 }
 
 #ifdef HAVE_SENDFILE
+#define SENDFILE_CHUNK_SIZE ((1 << 30) - 1)
 int
 efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd,
-	       off_t offset, size_t *nbytes)
+	       off_t *offset, size_t *nbytes)
 {
 #if defined(__linux__) || (defined(__sun) && defined(__SVR4))
-    ssize_t retval, nbytes_sent = 0;
+    ssize_t retval, written = 0;
+    // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes);
     if (*nbytes == 0) {
-	*nbytes = (1 << 20) - 1;
 	do {
-	    retval = sendfile(out_fd, in_fd, &offset, *nbytes);
-	    nbytes_sent += retval;
-	    printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes);
-	} while ((retval == -1 && errno == EINTR)
-		|| (retval > 0 && errno == EAGAIN));
+	    *nbytes = SENDFILE_CHUNK_SIZE; // chunk size
+	    retval = sendfile(out_fd, in_fd, offset, *nbytes);
+	    if (retval > 0)
+		written += retval;
+	} while (retval == SENDFILE_CHUNK_SIZE);
     } else {
-	do {
-	    retval = sendfile(out_fd, in_fd, &offset, *nbytes);
-	    if (retval > 0) {
-		nbytes_sent += retval;
-		*nbytes -= retval;
-	    }
-	} while ((retval == -1 && errno == EINTR)
-		|| (*nbytes > 0 && errno == EAGAIN));
+	retval =  sendfile(out_fd, in_fd, offset, *nbytes);
+	if (retval > 0)
+	    written = retval;
     }
-    *nbytes = nbytes_sent;
+    *nbytes = written;
     return check_error(retval == -1 ? -1 : 0, errInfo);
 #elif defined(DARWIN)
     off_t len = *nbytes;
-- 
cgit v1.2.3