From 195e1f19b06095f39a4fb0da46dfab2ec5b10e9a Mon Sep 17 00:00:00 2001 From: Tuncer Ayaz Date: Thu, 13 Jan 2011 12:36:14 +0100 Subject: Implement file:sendfile Allow Erlang code to use sendfile() where available by wrapping it as file:sendfile/4 and file:sendfile/2. sendfile(2) - Linux man page: "sendfile() copies data between one file descriptor and another. Because this copying is done within the kernel, sendfile() is more efficient than the combination of read(2) and write(2), which would require transferring data to and from user space." --- erts/emulator/drivers/common/efile_drv.c | 234 ++++++++++++++++++++++++++++++- erts/emulator/drivers/common/erl_efile.h | 2 + erts/emulator/drivers/unix/unix_efile.c | 44 ++++++ erts/emulator/drivers/win32/win_efile.c | 24 ++++ 4 files changed, 302 insertions(+), 2 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 901d98c09d..509c4fe48c 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -55,6 +55,7 @@ #define FILE_READ_LINE 29 #define FILE_FDATASYNC 30 #define FILE_FADVISE 31 +#define FILE_SENDFILE 32 /* Return codes */ @@ -217,6 +218,7 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -224,6 +226,7 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); +static void file_stop_select(ErlDrvEvent event, void* _); @@ -253,6 +256,18 @@ typedef struct { ErlDrvPDL q_mtx; /* Mutex for the driver queue, known by the emulator. Also used for mutual exclusion when accessing field(s) below. */ size_t write_buffered; + ErlDrvTermData caller; /* recipient of sync reply */ + /* sendfile call state to retry/resume on event */ + int command; /* same as d->command. for sendfile. TODO: this seems wrong */ + struct { + int eagain; + int out_fd; + /* TODO: Use Sint64 instead? What about 32-bit off_t linux */ + off_t offset; + size_t count; + size_t chunksize; + ErlDrvSInt64 written; + } sendfile; } file_descriptor; @@ -264,7 +279,7 @@ struct erl_drv_entry efile_driver_entry = { file_stop, file_output, NULL, - NULL, + file_ready_output, "efile", NULL, NULL, @@ -279,7 +294,9 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_EXTENDED_MAJOR_VERSION, ERL_DRV_EXTENDED_MINOR_VERSION, ERL_DRV_FLAG_USE_PORT_LOCKING, - NULL + NULL, + NULL, + file_stop_select }; @@ -613,6 +630,111 @@ static struct t_data *cq_deq(file_descriptor *desc) { } +/********************************************************************* + * Command queue functions + */ + +static ErlDrvTermData am_ok; +static ErlDrvTermData am_error; +static ErlDrvTermData am_efile_reply; + +#define INIT_ATOM(NAME) am_ ## NAME = driver_mk_atom(#NAME) + +#define LOAD_ATOM_CNT 2 +#define LOAD_ATOM(vec, i, atom) \ + (((vec)[(i)] = ERL_DRV_ATOM), \ + ((vec)[(i)+1] = (atom)), \ + ((i)+LOAD_ATOM_CNT)) + +#define LOAD_INT_CNT 2 +#define LOAD_INT(vec, i, val) \ + (((vec)[(i)] = ERL_DRV_INT), \ + ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ + ((i)+LOAD_INT_CNT)) + +#define LOAD_INT64_CNT 2 +#define LOAD_INT64(vec, i, val) \ + (((vec)[(i)] = ERL_DRV_INT64), \ + ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ + ((i)+LOAD_INT64_CNT)) + +#define LOAD_PORT_CNT 2 +#define LOAD_PORT(vec, i, port) \ + (((vec)[(i)] = ERL_DRV_PORT), \ + ((vec)[(i)+1] = (port)), \ + ((i)+LOAD_PORT_CNT)) + +#define LOAD_PID_CNT 2 +#define LOAD_PID(vec, i, pid) \ + (((vec)[(i)] = ERL_DRV_PID), \ + ((vec)[(i)+1] = (pid)), \ + ((i)+LOAD_PID_CNT)) + +#define LOAD_TUPLE_CNT 2 +#define LOAD_TUPLE(vec, i, size) \ + (((vec)[(i)] = ERL_DRV_TUPLE), \ + ((vec)[(i)+1] = (size)), \ + ((i)+LOAD_TUPLE_CNT)) + +/* send: +** {efile_reply, Pid, Port, {ok, int64()}} +*/ + +static int ef_send_ok_int64(file_descriptor *desc, ErlDrvTermData caller, + ErlDrvSInt64 *n) +{ + ErlDrvTermData spec[2*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT + + LOAD_INT64_CNT + 2*LOAD_TUPLE_CNT]; + int i = 0; + + i = LOAD_ATOM(spec, i, am_efile_reply); + i = LOAD_PID(spec, i, caller); + i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); + i = LOAD_ATOM(spec, i, am_ok); + i = LOAD_INT64(spec, i, n); + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 4); + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + return driver_send_term(desc->port, caller, spec, i); +} + +static ErlDrvTermData error_atom(int err) +{ + char errstr[256]; + char* s; + char* t; + + for (s = erl_errno_id(err), t = errstr; *s; s++, t++) + *t = tolower(*s); + *t = '\0'; + return driver_mk_atom(errstr); +} + +/* send: +** {efile_reply, Pid, Port, {error, posix_error()} +*/ + +static int ef_send_posix_error(file_descriptor *desc, ErlDrvTermData caller, + int e) +{ + ErlDrvTermData spec[3*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT + + 2*LOAD_TUPLE_CNT]; + int i = 0; + + i = LOAD_ATOM(spec, i, am_efile_reply); + i = LOAD_PID(spec, i, caller); + i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); + i = LOAD_ATOM(spec, i, am_error); + /* TODO: safe? set of error codes should be limited and safe */ + i = LOAD_ATOM(spec, i, error_atom(e)); + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 4); + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + desc->caller = 0; + return driver_send_term(desc->port, caller, spec, i); +} /********************************************************************* * Driver entry point -> init @@ -628,6 +750,11 @@ file_init(void) ? atoi(buf) : 0); driver_system_info(&sys_info, sizeof(ErlDrvSysInfo)); + + INIT_ATOM(ok); + INIT_ATOM(error); + INIT_ATOM(efile_reply); + return 0; } @@ -1694,6 +1821,74 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } + + +static void do_sendfile(file_descriptor *desc); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event) +{ + file_descriptor* d = (file_descriptor*) data; + + switch (d->command) { + case FILE_SENDFILE: + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_WRITE, 0); + do_sendfile(d); + break; + default: + break; + } +} + +static void file_stop_select(ErlDrvEvent event, void* _) +{ + /* TODO: close socket? */ +} + +static void invoke_sendfile(void *data) +{ + ((struct t_data *)data)->again = 0; +} + +static void do_sendfile(file_descriptor *d) +{ + int fd = d->fd; + int out_fd = d->sendfile.out_fd; + off_t offset = d->sendfile.offset; + size_t count = d->sendfile.count; + size_t chunksize = count < d->sendfile.chunksize + ? count : d->sendfile.chunksize; + int result_ok = 0; + Efile_error errInfo; + + result_ok = efile_sendfile(&errInfo, fd, out_fd, &offset, &chunksize); + + if (result_ok) { + d->sendfile.offset += chunksize; + d->sendfile.written += chunksize; + d->sendfile.count -= chunksize; + if (d->sendfile.count > 0) { + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + } else { + printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain); + ef_send_ok_int64(d, d->caller, &d->sendfile.written); + } + } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) { + if (chunksize > 0) { + d->sendfile.offset += chunksize; + d->sendfile.written += chunksize; + d->sendfile.count -= chunksize; + } + d->sendfile.eagain++; + + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + } else { + printf("==> sendfile ERROR %s\n", erl_errno_id(errInfo.posix_errno)); + ef_send_posix_error(d, d->caller, errInfo.posix_errno); + } +} + static void free_readdir(void *data) { struct t_data *d = (struct t_data *) data; @@ -2105,6 +2300,13 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } free_preadv(data); break; + case FILE_SENDFILE: + /* Return 'ok' and let prim_file:sendfile wait for message */ + reply_ok(desc); + driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + free_data(data); + break; default: abort(); } @@ -2452,6 +2654,34 @@ file_output(ErlDrvData e, char* buf, int count) goto done; } + case FILE_SENDFILE: + { + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = free_data; + d->level = 2; + desc->sendfile.out_fd = get_int32((uchar*) buf); + /* TODO: are off_t and size_t 64bit on all platforms? + off_t is 32bit on win32 msvc. maybe configurable in msvc. + Maybe use '#if SIZEOF_SIZE_T == 4'? */ + desc->sendfile.offset = get_int64(((uchar*) buf) + + sizeof(Sint32)); + desc->sendfile.count = get_int64(((uchar*) buf) + + sizeof(Sint32) + + sizeof(Sint64)); + desc->sendfile.chunksize = get_int64(((uchar*) buf) + + sizeof(Sint32) + + 2*sizeof(Sint64)); + desc->sendfile.written = 0; + desc->sendfile.eagain = 0; + /* TODO: shouldn't d->command be enough? */ + desc->command = command; + desc->caller = driver_caller(desc->port); + goto done; + } + } /* diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 3097ded3f1..3c6c2ec2db 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,3 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); +int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, + size_t *count); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 4b3934657c..5b001b3819 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -33,6 +33,9 @@ #include #include #endif +#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) +#include +#endif #if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__) #define DARWIN 1 @@ -1464,3 +1467,44 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, return check_error(0, errInfo); #endif } + +#ifdef HAVE_SENDFILE +int +efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *count) +{ +#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) + ssize_t retval = sendfile(out_fd, in_fd, offset, *count); + if (retval >= 0) { + if (retval != *count) { + *count = retval; + retval = -1; + errno = EAGAIN; + } else { + *count = retval; + } + } else if (retval == -1 && (errno == EINTR || errno == EAGAIN)) { + *count = 0; + } + return check_error(retval == -1 ? -1 : 0, errInfo); +#elif defined(DARWIN) + off_t len = *count; + int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); + *count = len; + return check_error(retval, errInfo); +#elif defined(__FreeBSD__) || defined(__DragonFly__) + off_t len = 0; + int retval = sendfile(in_fd, out_fd, *offset, *count, NULL, &len, 0); + *count = len; + return check_error(retval, errInfo); +#endif +} +#else /* no sendfile() */ +int +efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *count) +{ + errno = ENOTSUP; + return check_error(-1, errInfo); +} +#endif diff --git a/erts/emulator/drivers/win32/win_efile.c b/erts/emulator/drivers/win32/win_efile.c index 931bb196f1..0f41a09bf6 100644 --- a/erts/emulator/drivers/win32/win_efile.c +++ b/erts/emulator/drivers/win32/win_efile.c @@ -1581,3 +1581,27 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, errno = ERROR_SUCCESS; return check_error(0, errInfo); } + +int +efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *count) +{ + /* TODO: write proper Windows TransmitFile based implementation */ + /* use overlapped I/O and driver_select on the structure? */ + /* int res = efile_seek(errInfo, in_fd, *offset, EFILE_SEEK_SET, NULL); */ + /* if (res) { */ + /* /\* TODO: could in_fd be shared and require protecting/locking */ + /* efile_seek/SetFilePointerEx? *\/ */ + /* if (TransmitFile((SOCKET) out_fd, (HANDLE) in_fd, *count, */ + /* 0, NULL, NULL, 0)) { */ + /* return check_error(0, errInfo); */ + /* } else { */ + /* /\* TODO: correct error handling? *\/ */ + /* return set_error(errInfo); */ + /* } */ + /* } else { */ + /* return res; */ + /* } */ + errno = ENOTSUP; + return check_error(-1, errInfo); +} -- cgit v1.2.3 From 0348a9c9c0114ddf83d776adc3d01ac60dfcccfc Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 10:48:46 +0100 Subject: Implement sendfile using blocking io in asynch threads Move the command handling to outputv in preparation for header and trailer inclusion in the sendfile api. Use the standard efile communication functions for sendfile. --- erts/emulator/drivers/common/efile_drv.c | 171 ++++++------------------------- erts/emulator/drivers/common/erl_efile.h | 2 +- erts/emulator/drivers/unix/unix_efile.c | 27 ++--- 3 files changed, 40 insertions(+), 160 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 509c4fe48c..7e194a3787 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -629,113 +629,6 @@ static struct t_data *cq_deq(file_descriptor *desc) { return d; } - -/********************************************************************* - * Command queue functions - */ - -static ErlDrvTermData am_ok; -static ErlDrvTermData am_error; -static ErlDrvTermData am_efile_reply; - -#define INIT_ATOM(NAME) am_ ## NAME = driver_mk_atom(#NAME) - -#define LOAD_ATOM_CNT 2 -#define LOAD_ATOM(vec, i, atom) \ - (((vec)[(i)] = ERL_DRV_ATOM), \ - ((vec)[(i)+1] = (atom)), \ - ((i)+LOAD_ATOM_CNT)) - -#define LOAD_INT_CNT 2 -#define LOAD_INT(vec, i, val) \ - (((vec)[(i)] = ERL_DRV_INT), \ - ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ - ((i)+LOAD_INT_CNT)) - -#define LOAD_INT64_CNT 2 -#define LOAD_INT64(vec, i, val) \ - (((vec)[(i)] = ERL_DRV_INT64), \ - ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ - ((i)+LOAD_INT64_CNT)) - -#define LOAD_PORT_CNT 2 -#define LOAD_PORT(vec, i, port) \ - (((vec)[(i)] = ERL_DRV_PORT), \ - ((vec)[(i)+1] = (port)), \ - ((i)+LOAD_PORT_CNT)) - -#define LOAD_PID_CNT 2 -#define LOAD_PID(vec, i, pid) \ - (((vec)[(i)] = ERL_DRV_PID), \ - ((vec)[(i)+1] = (pid)), \ - ((i)+LOAD_PID_CNT)) - -#define LOAD_TUPLE_CNT 2 -#define LOAD_TUPLE(vec, i, size) \ - (((vec)[(i)] = ERL_DRV_TUPLE), \ - ((vec)[(i)+1] = (size)), \ - ((i)+LOAD_TUPLE_CNT)) - -/* send: -** {efile_reply, Pid, Port, {ok, int64()}} -*/ - -static int ef_send_ok_int64(file_descriptor *desc, ErlDrvTermData caller, - ErlDrvSInt64 *n) -{ - ErlDrvTermData spec[2*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT - + LOAD_INT64_CNT + 2*LOAD_TUPLE_CNT]; - int i = 0; - - i = LOAD_ATOM(spec, i, am_efile_reply); - i = LOAD_PID(spec, i, caller); - i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); - i = LOAD_ATOM(spec, i, am_ok); - i = LOAD_INT64(spec, i, n); - i = LOAD_TUPLE(spec, i, 2); - i = LOAD_TUPLE(spec, i, 4); - ASSERT(i == sizeof(spec)/sizeof(*spec)); - - return driver_send_term(desc->port, caller, spec, i); -} - -static ErlDrvTermData error_atom(int err) -{ - char errstr[256]; - char* s; - char* t; - - for (s = erl_errno_id(err), t = errstr; *s; s++, t++) - *t = tolower(*s); - *t = '\0'; - return driver_mk_atom(errstr); -} - -/* send: -** {efile_reply, Pid, Port, {error, posix_error()} -*/ - -static int ef_send_posix_error(file_descriptor *desc, ErlDrvTermData caller, - int e) -{ - ErlDrvTermData spec[3*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT - + 2*LOAD_TUPLE_CNT]; - int i = 0; - - i = LOAD_ATOM(spec, i, am_efile_reply); - i = LOAD_PID(spec, i, caller); - i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); - i = LOAD_ATOM(spec, i, am_error); - /* TODO: safe? set of error codes should be limited and safe */ - i = LOAD_ATOM(spec, i, error_atom(e)); - i = LOAD_TUPLE(spec, i, 2); - i = LOAD_TUPLE(spec, i, 4); - ASSERT(i == sizeof(spec)/sizeof(*spec)); - - desc->caller = 0; - return driver_send_term(desc->port, caller, spec, i); -} - /********************************************************************* * Driver entry point -> init */ @@ -751,10 +644,6 @@ file_init(void) : 0); driver_system_info(&sys_info, sizeof(ErlDrvSysInfo)); - INIT_ATOM(ok); - INIT_ATOM(error); - INIT_ATOM(efile_reply); - return 0; } @@ -1871,7 +1760,7 @@ static void do_sendfile(file_descriptor *d) ERL_DRV_USE|ERL_DRV_WRITE, 1); } else { printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain); - ef_send_ok_int64(d, d->caller, &d->sendfile.written); + reply_Uint(d, d->sendfile.written); } } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) { if (chunksize > 0) { @@ -2301,8 +2190,6 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - /* Return 'ok' and let prim_file:sendfile wait for message */ - reply_ok(desc); driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd, ERL_DRV_USE|ERL_DRV_WRITE, 1); free_data(data); @@ -2654,34 +2541,6 @@ file_output(ErlDrvData e, char* buf, int count) goto done; } - case FILE_SENDFILE: - { - d = EF_SAFE_ALLOC(sizeof(struct t_data)); - d->fd = fd; - d->command = command; - d->invoke = invoke_sendfile; - d->free = free_data; - d->level = 2; - desc->sendfile.out_fd = get_int32((uchar*) buf); - /* TODO: are off_t and size_t 64bit on all platforms? - off_t is 32bit on win32 msvc. maybe configurable in msvc. - Maybe use '#if SIZEOF_SIZE_T == 4'? */ - desc->sendfile.offset = get_int64(((uchar*) buf) - + sizeof(Sint32)); - desc->sendfile.count = get_int64(((uchar*) buf) - + sizeof(Sint32) - + sizeof(Sint64)); - desc->sendfile.chunksize = get_int64(((uchar*) buf) - + sizeof(Sint32) - + 2*sizeof(Sint64)); - desc->sendfile.written = 0; - desc->sendfile.eagain = 0; - /* TODO: shouldn't d->command be enough? */ - desc->command = command; - desc->caller = driver_caller(desc->port); - goto done; - } - } /* @@ -3475,6 +3334,34 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ + case FILE_SENDFILE: + { + struct t_data *d; + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = desc->fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = free_data; + d->level = 2; + desc->sendfile.out_fd = get_int32((uchar*) buf); + /* TODO: are off_t and size_t 64bit on all platforms? + off_t is 32bit on win32 msvc. maybe configurable in msvc. + Maybe use '#if SIZEOF_SIZE_T == 4'? */ + desc->sendfile.offset = get_int64(((uchar*) buf) + + sizeof(Sint32)); + desc->sendfile.count = get_int64(((uchar*) buf) + + sizeof(Sint32) + + sizeof(Sint64)); + desc->sendfile.chunksize = get_int64(((uchar*) buf) + + sizeof(Sint32) + + 2*sizeof(Sint64)); + desc->sendfile.written = 0; + desc->sendfile.eagain = 0; + /* TODO: shouldn't d->command be enough? */ + desc->command = command; + desc->caller = driver_caller(desc->port); + goto done; + } } /* switch(command) */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 3c6c2ec2db..864b867955 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,5 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); -int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, +int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t offset, size_t *count); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 5b001b3819..8b612164da 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1471,31 +1471,24 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *count) + off_t offset, size_t *ret_nbytes) { #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval = sendfile(out_fd, in_fd, offset, *count); - if (retval >= 0) { - if (retval != *count) { - *count = retval; - retval = -1; - errno = EAGAIN; - } else { - *count = retval; - } - } else if (retval == -1 && (errno == EINTR || errno == EAGAIN)) { - *count = 0; - } + ssize_t retval; + do { + retval = sendfile(out_fd, in_fd, &offset, *ret_nbytes); + } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); + *ret_nbytes = retval; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) - off_t len = *count; + off_t len = *ret_nbytes; int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); - *count = len; + *ret_nbytes = len; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) off_t len = 0; - int retval = sendfile(in_fd, out_fd, *offset, *count, NULL, &len, 0); - *count = len; + int retval = sendfile(in_fd, out_fd, *offset, *nbytes, NULL, &len, 0); + *nbytes = len; return check_error(retval, errInfo); #endif } -- cgit v1.2.3 From bfa81856150b59ea4578e0eef79b97ab0decb8f7 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 10:52:16 +0100 Subject: Remove output from driver entry outputv will always be used so removed output to decrease confusion. --- erts/emulator/drivers/common/efile_drv.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 7e194a3787..b14f5844b2 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -277,7 +277,7 @@ struct erl_drv_entry efile_driver_entry = { file_init, file_start, file_stop, - file_output, + NULL, NULL, file_ready_output, "efile", -- cgit v1.2.3 From 54bdd9a15d2e130c76f76ca322af56b306d02078 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 9 Nov 2011 11:52:14 +0100 Subject: Implement blocking calls for sendfile Move sendfile data to invoke data instead of file_descr. Remove usage of ready_output when doing a send. If told to send 0 bytes, file_sendfile now sends the entire file for linux. --- erts/emulator/drivers/common/efile_drv.c | 206 +++++++++++++++---------------- erts/emulator/drivers/unix/unix_efile.c | 31 +++-- 2 files changed, 124 insertions(+), 113 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index b14f5844b2..4ed6aa4891 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -76,6 +76,11 @@ #define FILE_OPT_DELAYED_WRITE 0 #define FILE_OPT_READ_AHEAD 1 +#define FILE_SENDFILE_OFFSET 0x10 +#define FILE_SENDFILE_NBYTES 0x08 +#define FILE_SENDFILE_NODISKIO 0x4 +#define FILE_SENDFILE_MNOWAIT 0x2 +#define FILE_SENDFILE_SYNC 0x1 /* IPREAD variants */ @@ -218,7 +223,6 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); -static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -226,7 +230,6 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); -static void file_stop_select(ErlDrvEvent event, void* _); @@ -256,18 +259,6 @@ typedef struct { ErlDrvPDL q_mtx; /* Mutex for the driver queue, known by the emulator. Also used for mutual exclusion when accessing field(s) below. */ size_t write_buffered; - ErlDrvTermData caller; /* recipient of sync reply */ - /* sendfile call state to retry/resume on event */ - int command; /* same as d->command. for sendfile. TODO: this seems wrong */ - struct { - int eagain; - int out_fd; - /* TODO: Use Sint64 instead? What about 32-bit off_t linux */ - off_t offset; - size_t count; - size_t chunksize; - ErlDrvSInt64 written; - } sendfile; } file_descriptor; @@ -279,7 +270,7 @@ struct erl_drv_entry efile_driver_entry = { file_stop, NULL, NULL, - file_ready_output, + NULL, "efile", NULL, NULL, @@ -296,7 +287,7 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, NULL, - file_stop_select + NULL }; @@ -415,6 +406,18 @@ struct t_data Sint64 length; int advise; } fadvise; +#ifdef HAVE_SENDFILE + struct { + int out_fd; + off_t offset; + size_t nbytes; + int flags; + int hdr_cnt; /* number of header iovecs */ + struct iovec *headers; /* pointer to header iovecs */ + int trl_cnt; /* number of trailer iovecs */ + struct iovec *trailers; /* pointer to trailer iovecs */ + } sendfile; +#endif } c; char b[1]; }; @@ -1710,71 +1713,27 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } - - -static void do_sendfile(file_descriptor *desc); -static void file_ready_output(ErlDrvData data, ErlDrvEvent event) -{ - file_descriptor* d = (file_descriptor*) data; - - switch (d->command) { - case FILE_SENDFILE: - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_WRITE, 0); - do_sendfile(d); - break; - default: - break; - } -} - -static void file_stop_select(ErlDrvEvent event, void* _) -{ - /* TODO: close socket? */ +static void free_sendfile(void *data) { + EF_FREE(data); } static void invoke_sendfile(void *data) { - ((struct t_data *)data)->again = 0; -} - -static void do_sendfile(file_descriptor *d) -{ - int fd = d->fd; - int out_fd = d->sendfile.out_fd; - off_t offset = d->sendfile.offset; - size_t count = d->sendfile.count; - size_t chunksize = count < d->sendfile.chunksize - ? count : d->sendfile.chunksize; - int result_ok = 0; - Efile_error errInfo; - - result_ok = efile_sendfile(&errInfo, fd, out_fd, &offset, &chunksize); - - if (result_ok) { - d->sendfile.offset += chunksize; - d->sendfile.written += chunksize; - d->sendfile.count -= chunksize; - if (d->sendfile.count > 0) { - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); - } else { - printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain); - reply_Uint(d, d->sendfile.written); - } - } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) { - if (chunksize > 0) { - d->sendfile.offset += chunksize; - d->sendfile.written += chunksize; - d->sendfile.count -= chunksize; - } - d->sendfile.eagain++; + struct t_data *d = (struct t_data *) data; + int fd = (int)d->fd; + int out_fd = (int) d->c.sendfile.out_fd; + off_t offset = (off_t) d->c.sendfile.offset; + size_t nbytes = (size_t) d->c.sendfile.nbytes; + + d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes); + d->c.sendfile.offset = offset; + d->c.sendfile.nbytes = nbytes; + d->again = 0; - driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); + if (d->result_ok) { + printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes); } else { - printf("==> sendfile ERROR %s\n", erl_errno_id(errInfo.posix_errno)); - ef_send_posix_error(d, d->caller, errInfo.posix_errno); + printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno)); } } @@ -2190,9 +2149,12 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); - free_data(data); + if (!d->result_ok) { + reply_error(desc, &d->errInfo); + } else { + reply_Sint64(desc, d->c.sendfile.nbytes); + } + free_sendfile(data); break; default: abort(); @@ -3334,37 +3296,71 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ - case FILE_SENDFILE: - { + case FILE_SENDFILE: { + struct t_data *d; - d = EF_SAFE_ALLOC(sizeof(struct t_data)); - d->fd = desc->fd; - d->command = command; - d->invoke = invoke_sendfile; - d->free = free_data; - d->level = 2; - desc->sendfile.out_fd = get_int32((uchar*) buf); - /* TODO: are off_t and size_t 64bit on all platforms? - off_t is 32bit on win32 msvc. maybe configurable in msvc. - Maybe use '#if SIZEOF_SIZE_T == 4'? */ - desc->sendfile.offset = get_int64(((uchar*) buf) - + sizeof(Sint32)); - desc->sendfile.count = get_int64(((uchar*) buf) - + sizeof(Sint32) - + sizeof(Sint64)); - desc->sendfile.chunksize = get_int64(((uchar*) buf) - + sizeof(Sint32) - + 2*sizeof(Sint64)); - desc->sendfile.written = 0; - desc->sendfile.eagain = 0; - /* TODO: shouldn't d->command be enough? */ - desc->command = command; - desc->caller = driver_caller(desc->port); + Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + char flags; + + /* DestFD:32, Offset:64, Bytes:64, + ChunkSize:64, + (get_bit(Nodiskio)):1, + (get_bit(MNowait)):1, + (get_bit(Sync)):1,0:5, + (encode_hdtl(Headers))/binary, + (encode_hdtl(Trailers))/binary */ + if (ev->size < 1 + 1 + 5 * sizeof(Uint32) + sizeof(char) + || !EV_GET_UINT32(ev, &out_fd, &p, &q) + || !EV_GET_CHAR(ev, &flags, &p, &q) + || !EV_GET_UINT32(ev, &offsetH, &p, &q) + || !EV_GET_UINT32(ev, &offsetL, &p, &q) + || !EV_GET_UINT32(ev, &nbytesH, &p, &q) + || !EV_GET_UINT32(ev, &nbytesL, &p, &q)) { + /* Buffer has wrong length to contain all the needed values */ + reply_posix_error(desc, EINVAL); goto done; } - + + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = desc->fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = free_sendfile; + d->level = 2; + + d->c.sendfile.out_fd = (int) out_fd; + d->c.sendfile.flags = (int) flags; + +#if SIZEOF_OFF_T == 4 + if (offsetH != 0) { + reply_posix_error(desc, EINVAL); + goto done; + } + d->c.sendfile.offset = (off_t) offsetT; +#else + d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; +#endif + +#if SIZEOF_SIZE_T == 4 + if (nbytesH != 0) { + reply_posix_error(desc, EINVAL); + goto done; + } + d->c.sendfile.nbytes = (size_t) nbytesT; +#else + d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; +#endif + + printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); + + /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ + + cq_enq(desc, d); + goto done; + } /* case FILE_SENDFILE: */ + } /* switch(command) */ - + if (lseek_flush_read(desc, &err) < 0) { reply_posix_error(desc, err); goto done; diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 8b612164da..05c2f1fce9 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1471,19 +1471,34 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t offset, size_t *ret_nbytes) + off_t offset, size_t *nbytes) { #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval; - do { - retval = sendfile(out_fd, in_fd, &offset, *ret_nbytes); - } while (retval == -1 && (errno == EINTR || errno == EAGAIN)); - *ret_nbytes = retval; + ssize_t retval, nbytes_sent = 0; + if (*nbytes == 0) { + *nbytes = (1 << 20) - 1; + do { + retval = sendfile(out_fd, in_fd, &offset, *nbytes); + nbytes_sent += retval; + printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes); + } while ((retval == -1 && errno == EINTR) + || (retval > 0 && errno == EAGAIN)); + } else { + do { + retval = sendfile(out_fd, in_fd, &offset, *nbytes); + if (retval > 0) { + nbytes_sent += retval; + *nbytes -= retval; + } + } while ((retval == -1 && errno == EINTR) + || (*nbytes > 0 && errno == EAGAIN)); + } + *nbytes = nbytes_sent; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) - off_t len = *ret_nbytes; + off_t len = *nbytes; int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); - *ret_nbytes = len; + *nbytes = len; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) off_t len = 0; -- cgit v1.2.3 From 020e988424cf0d15ebab8de50638492defb6f2b5 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 11 Nov 2011 16:19:07 +0100 Subject: Implement sendfile when there are no async threads When there are no async threads sendfile will use the ready_output select on the socket fd to know when to send data. The file_desc will also be put in the sending sendfile_state which buffers all other commands to that file until the sendfile is done. --- erts/emulator/drivers/common/efile_drv.c | 143 +++++++++++++++++++++---------- erts/emulator/drivers/common/erl_efile.h | 4 +- erts/emulator/drivers/unix/unix_efile.c | 30 +++---- 3 files changed, 112 insertions(+), 65 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 4ed6aa4891..a318384479 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -76,11 +76,6 @@ #define FILE_OPT_DELAYED_WRITE 0 #define FILE_OPT_READ_AHEAD 1 -#define FILE_SENDFILE_OFFSET 0x10 -#define FILE_SENDFILE_NBYTES 0x08 -#define FILE_SENDFILE_NODISKIO 0x4 -#define FILE_SENDFILE_MNOWAIT 0x2 -#define FILE_SENDFILE_SYNC 0x1 /* IPREAD variants */ @@ -223,6 +218,7 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -230,10 +226,12 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); +static void file_stop_select(ErlDrvEvent event, void* _); enum e_timer {timer_idle, timer_again, timer_write}; +enum e_sendfile {sending, not_sending}; struct t_data; @@ -248,6 +246,7 @@ typedef struct { struct t_data *cq_head; /* Queue of incoming commands */ struct t_data *cq_tail; /* -""- */ enum e_timer timer_state; + enum e_sendfile sendfile_state; size_t read_bufsize; ErlDrvBinary *read_binp; size_t read_offset; @@ -268,9 +267,9 @@ struct erl_drv_entry efile_driver_entry = { file_init, file_start, file_stop, + file_output, NULL, - NULL, - NULL, + file_ready_output, "efile", NULL, NULL, @@ -287,7 +286,7 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, NULL, - NULL + file_stop_select }; @@ -339,6 +338,13 @@ struct t_readdir_buf { char buf[READDIR_BUFSIZE]; }; +struct t_sendfile_hdtl { + int hdr_cnt; /* number of header iovecs */ + struct iovec *headers; /* pointer to header iovecs */ + int trl_cnt; /* number of trailer iovecs */ + struct iovec *trailers; /* pointer to trailer iovecs */ +}; + struct t_data { struct t_data *next; @@ -406,18 +412,14 @@ struct t_data Sint64 length; int advise; } fadvise; -#ifdef HAVE_SENDFILE struct { int out_fd; off_t offset; size_t nbytes; - int flags; - int hdr_cnt; /* number of header iovecs */ - struct iovec *headers; /* pointer to header iovecs */ - int trl_cnt; /* number of trailer iovecs */ - struct iovec *trailers; /* pointer to trailer iovecs */ + Uint64 written; + short flags; + struct t_sendfile_hdtl *hdtl; } sendfile; -#endif } c; char b[1]; }; @@ -632,6 +634,7 @@ static struct t_data *cq_deq(file_descriptor *desc) { return d; } + /********************************************************************* * Driver entry point -> init */ @@ -674,6 +677,7 @@ file_start(ErlDrvPort port, char* command) desc->cq_head = NULL; desc->cq_tail = NULL; desc->timer_state = timer_idle; + desc->sendfile_state = not_sending; desc->read_bufsize = 0; desc->read_binp = NULL; desc->read_offset = 0; @@ -1713,30 +1717,65 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } -static void free_sendfile(void *data) { - EF_FREE(data); -} - static void invoke_sendfile(void *data) { - struct t_data *d = (struct t_data *) data; - int fd = (int)d->fd; - int out_fd = (int) d->c.sendfile.out_fd; - off_t offset = (off_t) d->c.sendfile.offset; - size_t nbytes = (size_t) d->c.sendfile.nbytes; - - d->result_ok = efile_sendfile(&d->errInfo, fd, out_fd, offset, &nbytes); - d->c.sendfile.offset = offset; - d->c.sendfile.nbytes = nbytes; + struct t_data *d = (struct t_data *)data; + int fd = d->fd; + int out_fd = d->c.sendfile.out_fd; + size_t nbytes = d->c.sendfile.nbytes; + int result = 0; d->again = 0; - if (d->result_ok) { - printf("==> sendfile DONE nbytes=%d\n", d->c.sendfile.nbytes); + result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); + + /* printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n", + result, errno, d->c.sendfile.offset,nbytes);*/ + d->c.sendfile.written += nbytes; + + if (result == 1) { + if (d->c.sendfile.nbytes == 0 && nbytes != 0) { + d->result_ok = 1; + } else if ((d->c.sendfile.nbytes - nbytes) != 0) { + d->result_ok = 1; + d->c.sendfile.nbytes -= nbytes; + } else { + printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); + d->result_ok = 0; + } + } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN + || d->errInfo.posix_errno == EINTR)) { + d->result_ok = 1; } else { - printf("==> sendfile ERROR %s\n", erl_errno_id(d->errInfo.posix_errno)); + d->result_ok = -1; + printf("==> sendfile ERROR %s\r\n", erl_errno_id(d->errInfo.posix_errno)); } } +static void free_sendfile(void *data) { + EF_FREE(data); +} + +static void file_ready_output(ErlDrvData data, ErlDrvEvent event) +{ + file_descriptor* fd = (file_descriptor*) data; + + switch (fd->d->command) { + case FILE_SENDFILE: + driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd, + (int)ERL_DRV_WRITE,(int) 0); + invoke_sendfile((void *)fd->d); + file_async_ready((ErlDrvData)fd, (ErlDrvThreadData)fd->d); + break; + default: + break; + } +} + +static void file_stop_select(ErlDrvEvent event, void* _) +{ + /* TODO: close socket? */ +} + static void free_readdir(void *data) { struct t_data *d = (struct t_data *) data; @@ -1796,7 +1835,8 @@ static int try_again(file_descriptor *desc, struct t_data *d) { static void cq_execute(file_descriptor *desc) { struct t_data *d; register void *void_ptr; /* Soft cast variable */ - if (desc->timer_state == timer_again) + if (desc->timer_state == timer_again || + desc->sendfile_state == sending) return; if (! (d = cq_deq(desc))) return; @@ -2149,12 +2189,21 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - if (!d->result_ok) { + //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); + if (d->result_ok == -1) { + desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); - } else { - reply_Sint64(desc, d->c.sendfile.nbytes); + free_sendfile(data); + } else if (d->result_ok == 0) { + desc->sendfile_state = not_sending; + reply_Sint64(desc, d->c.sendfile.written); + free_sendfile(data); + } else if (d->result_ok == 1) { // If we are using select to send the rest of the data + desc->sendfile_state = sending; + desc->d = d; + driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); } - free_sendfile(data); break; default: abort(); @@ -3296,11 +3345,12 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } /* case FILE_OPT_DELAYED_WRITE: */ } ASSERT(0); goto done; /* case FILE_SETOPT: */ + case FILE_SENDFILE: { - struct t_data *d; - Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; - char flags; + struct t_data *d; + Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + char flags; /* DestFD:32, Offset:64, Bytes:64, ChunkSize:64, @@ -3330,26 +3380,27 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->c.sendfile.out_fd = (int) out_fd; d->c.sendfile.flags = (int) flags; + d->c.sendfile.written = 0; -#if SIZEOF_OFF_T == 4 + #if SIZEOF_OFF_T == 4 if (offsetH != 0) { reply_posix_error(desc, EINVAL); goto done; } d->c.sendfile.offset = (off_t) offsetT; -#else + #else d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; -#endif + #endif -#if SIZEOF_SIZE_T == 4 + #if SIZEOF_SIZE_T == 4 if (nbytesH != 0) { reply_posix_error(desc, EINVAL); goto done; } d->c.sendfile.nbytes = (size_t) nbytesT; -#else + #else d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; -#endif + #endif printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); @@ -3357,7 +3408,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { cq_enq(desc, d); goto done; - } /* case FILE_SENDFILE: */ + } /* case FILE_SENDFILE: */ } /* switch(command) */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 864b867955..e0b8cfca03 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,5 +162,5 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); -int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t offset, - size_t *count); +int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, + off_t *offset, size_t *nbytes); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 05c2f1fce9..3a966757d9 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1469,31 +1469,27 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, } #ifdef HAVE_SENDFILE +#define SENDFILE_CHUNK_SIZE ((1 << 30) - 1) int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t offset, size_t *nbytes) + off_t *offset, size_t *nbytes) { #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval, nbytes_sent = 0; + ssize_t retval, written = 0; + // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); if (*nbytes == 0) { - *nbytes = (1 << 20) - 1; do { - retval = sendfile(out_fd, in_fd, &offset, *nbytes); - nbytes_sent += retval; - printf("retval: %d, errno: %d, offset: %d, nbytes: %d\r\n", retval, errno, offset,*nbytes); - } while ((retval == -1 && errno == EINTR) - || (retval > 0 && errno == EAGAIN)); + *nbytes = SENDFILE_CHUNK_SIZE; // chunk size + retval = sendfile(out_fd, in_fd, offset, *nbytes); + if (retval > 0) + written += retval; + } while (retval == SENDFILE_CHUNK_SIZE); } else { - do { - retval = sendfile(out_fd, in_fd, &offset, *nbytes); - if (retval > 0) { - nbytes_sent += retval; - *nbytes -= retval; - } - } while ((retval == -1 && errno == EINTR) - || (*nbytes > 0 && errno == EAGAIN)); + retval = sendfile(out_fd, in_fd, offset, *nbytes); + if (retval > 0) + written = retval; } - *nbytes = nbytes_sent; + *nbytes = written; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) off_t len = *nbytes; -- cgit v1.2.3 From 035279e92f9bb3f9601098f6a70ba6d398d6727f Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Mon, 14 Nov 2011 12:24:37 +0100 Subject: Make socket blocking if there are async threads --- erts/emulator/drivers/common/efile_drv.c | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index a318384479..70917119e2 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -99,7 +99,13 @@ # include "config.h" #endif #include + +// Need (NON)BLOCKING macros for sendfile +#ifndef WANT_NONBLOCKING +#define WANT_NONBLOCKING +#endif #include "sys.h" + #include "erl_driver.h" #include "erl_efile.h" #include "erl_threads.h" @@ -1728,12 +1734,14 @@ static void invoke_sendfile(void *data) result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); - /* printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n", - result, errno, d->c.sendfile.offset,nbytes);*/ + //printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n",result, errno, d->c.sendfile.offset,nbytes); d->c.sendfile.written += nbytes; if (result == 1) { - if (d->c.sendfile.nbytes == 0 && nbytes != 0) { + if (sys_info.async_threads != 0) { + printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); + d->result_ok = 0; + } else if (d->c.sendfile.nbytes == 0 && nbytes != 0) { d->result_ok = 1; } else if ((d->c.sendfile.nbytes - nbytes) != 0) { d->result_ok = 1; @@ -2189,14 +2197,20 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); + printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); if (d->result_ok == -1) { desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); + if (sys_info.async_threads != 0) { + SET_NONBLOCKING(d->c.sendfile.out_fd); + } free_sendfile(data); } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; reply_Sint64(desc, d->c.sendfile.written); + if (sys_info.async_threads != 0) { + SET_NONBLOCKING(d->c.sendfile.out_fd); + } free_sendfile(data); } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; @@ -3348,9 +3362,9 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { case FILE_SENDFILE: { - struct t_data *d; - Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; - char flags; + struct t_data *d; + Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + char flags; /* DestFD:32, Offset:64, Bytes:64, ChunkSize:64, @@ -3406,6 +3420,10 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ + if (sys_info.async_threads != 0) { + SET_BLOCKING(d->c.sendfile.out_fd); + } + cq_enq(desc, d); goto done; } /* case FILE_SENDFILE: */ -- cgit v1.2.3 From 8e653ead2f361ce37b6e00b85844a48bd0cab394 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:10:56 +0100 Subject: Fix 32 bit parameters --- erts/emulator/drivers/common/efile_drv.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 70917119e2..0364d0722a 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -3401,7 +3401,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { reply_posix_error(desc, EINVAL); goto done; } - d->c.sendfile.offset = (off_t) offsetT; + d->c.sendfile.offset = (off_t) offsetL; #else d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; #endif @@ -3411,7 +3411,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { reply_posix_error(desc, EINVAL); goto done; } - d->c.sendfile.nbytes = (size_t) nbytesT; + d->c.sendfile.nbytes = (size_t) nbytesL; #else d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; #endif -- cgit v1.2.3 From 1087d64af63d1fd5044d56e864557365ed40aab3 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:12:36 +0100 Subject: Fix cleanup for sendfile --- erts/emulator/drivers/common/efile_drv.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 0364d0722a..98da63f4e3 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -1772,7 +1772,7 @@ static void file_ready_output(ErlDrvData data, ErlDrvEvent event) driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd, (int)ERL_DRV_WRITE,(int) 0); invoke_sendfile((void *)fd->d); - file_async_ready((ErlDrvData)fd, (ErlDrvThreadData)fd->d); + file_async_ready(data, (ErlDrvThreadData)fd->d); break; default: break; @@ -2203,15 +2203,19 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) reply_error(desc, &d->errInfo); if (sys_info.async_threads != 0) { SET_NONBLOCKING(d->c.sendfile.out_fd); + } else { + driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + ERL_DRV_USE, 0); } - free_sendfile(data); } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; reply_Sint64(desc, d->c.sendfile.written); if (sys_info.async_threads != 0) { SET_NONBLOCKING(d->c.sendfile.out_fd); + } else { + driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + ERL_DRV_USE, 0); } - free_sendfile(data); } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; desc->d = d; @@ -3389,7 +3393,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->fd = desc->fd; d->command = command; d->invoke = invoke_sendfile; - d->free = free_sendfile; + d->free = NULL; d->level = 2; d->c.sendfile.out_fd = (int) out_fd; @@ -3422,6 +3426,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { if (sys_info.async_threads != 0) { SET_BLOCKING(d->c.sendfile.out_fd); + d->free = free_sendfile; } cq_enq(desc, d); -- cgit v1.2.3 From eccba49e87ad32248a678d90cfdf355ffd97015d Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:14:10 +0100 Subject: Fix offset calculation for darwin --- erts/emulator/drivers/unix/unix_efile.c | 1 + 1 file changed, 1 insertion(+) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 3a966757d9..61df572a91 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1494,6 +1494,7 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, #elif defined(DARWIN) off_t len = *nbytes; int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); + *offset += len; *nbytes = len; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) -- cgit v1.2.3 From 59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:16:54 +0100 Subject: Implement ignorefd for TCP Ignore fd is a feature used by sendfile to temporarily remove all driver_select calls on that fd so that another driver can select on it. It also delays all actions which sends or receives data in that fd until in the fd is no longer ignored. Only the controlling_process should use the feature as it is otherwise possible that the ignore will never be cleaned up and hence create a memory leak in the driver. An ignored driver will not detect that an fd has been closed until it is unignored. --- erts/emulator/drivers/common/inet_drv.c | 67 +++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 7 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 1fe9e04341..56799555a9 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -445,6 +445,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) driver_select(port, e, mode | (on?ERL_DRV_USE:0), on) #define sock_select(d, flags, onoff) do { \ + ASSERT(!onoff || !(d)->is_ignored); \ (d)->event_mask = (onoff) ? \ ((d)->event_mask | (flags)) : \ ((d)->event_mask & ~(flags)); \ @@ -538,6 +539,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_REQ_GETIFADDRS 25 #define INET_REQ_ACCEPT 26 #define INET_REQ_LISTEN 27 +#define INET_REQ_IGNOREFD 28 + /* TCP requests */ /* #define TCP_REQ_ACCEPT 40 MOVED */ /* #define TCP_REQ_LISTEN 41 MERGED */ @@ -725,6 +728,11 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) /* Max interface name */ #define INET_IFNAMSIZ 16 +/* INET Ignore states */ +#define INET_IGNORE_NONE 0 +#define INET_IGNORE_READ 1 +#define INET_IGNORE_WRITE 1 << 1 + /* Max length of Erlang Term Buffer (for outputting structured terms): */ #ifdef HAVE_SCTP #define PACKET_ERL_DRV_TERM_DATA_LEN 512 @@ -864,6 +872,9 @@ typedef struct { double send_avg; /* average packet size sent */ subs_list empty_out_q_subs; /* Empty out queue subscribers */ + int is_ignored; /* if a fd is ignored by from the inet_drv, + this should be set to true when the fd is used + outside of inet_drv. */ } inet_descriptor; @@ -7302,6 +7313,8 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol) sys_memzero((char *)&desc->remote,sizeof(desc->remote)); + desc->is_ignored = 0; + return (ErlDrvData)desc; } @@ -7584,6 +7597,33 @@ static int inet_ctl(inet_descriptor* desc, int cmd, char* buf, int len, return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); } + case INET_REQ_IGNOREFD: { + DEBUGF(("inet_ctl(%ld): IGNOREFD, IGNORED = %d\r\n", + (long)desc->port,(int)*buf)); + + /* + * FD can only be ignored for connected TCP connections for now, + * possible to add UDP and SCTP support if needed. + */ + if (!IS_CONNECTED(desc)) + return ctl_error(ENOTCONN, rbuf, rsize); + + if (!desc->stype == SOCK_STREAM) + return ctl_error(EINVAL, rbuf, rsize); + + if (*buf == 1 && !desc->is_ignored) { + desc->is_ignored = INET_IGNORE_READ; + sock_select(desc, (FD_READ|FD_WRITE|FD_CLOSE|ERL_DRV_USE_NO_CALLBACK), 0); + } else if (*buf == 0 && desc->is_ignored) { + int flags = (FD_READ|FD_CLOSE|((desc->is_ignored & INET_IGNORE_WRITE)?FD_WRITE:0)); + desc->is_ignored = INET_IGNORE_NONE; + sock_select(desc, flags, 1); + } else + return ctl_error(EINVAL, rbuf, rsize); + + return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); + } + #ifndef VXWORKS case INET_REQ_GETSERVBYNAME: { /* L1 Name-String L2 Proto-String */ @@ -7959,6 +7999,7 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, char** rbuf, int rsize) { tcp_descriptor* desc = (tcp_descriptor*)e; + switch(cmd) { case INET_REQ_OPEN: { /* open socket and return internal index */ int domain; @@ -8224,13 +8265,14 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, if (enq_async(INETP(desc), tbuf, TCP_REQ_RECV) < 0) return ctl_error(EALREADY, rbuf, rsize); - if (tcp_recv(desc, n) == 0) { + if (INETP(desc)->is_ignored || tcp_recv(desc, n) == 0) { if (timeout == 0) async_error_am(INETP(desc), am_timeout); else { if (timeout != INET_INFINITY) - driver_set_timer(desc->inet.port, timeout); - sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); + driver_set_timer(desc->inet.port, timeout); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); } } return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); @@ -8970,6 +9012,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) #ifdef DEBUG long port = (long) desc->inet.port; /* Used after driver_exit() */ #endif + ASSERT(!INETP(desc)->is_ignored); DEBUGF(("tcp_inet_input(%ld) {s=%d\r\n", port, desc->inet.s)); if (desc->inet.state == INET_STATE_ACCEPTING) { SOCKET s; @@ -9231,7 +9274,11 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) DEBUGF(("tcp_sendv(%ld): s=%d, about to send %d,%d bytes\r\n", (long)desc->inet.port, desc->inet.s, h_len, len)); - if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { + + if (INETP(desc)->is_ignored) { + INETP(desc)->is_ignored |= INET_IGNORE_WRITE; + n = 0; + } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { n = 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov, vsize, &n, 0))) { @@ -9259,7 +9306,8 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n", (long)desc->inet.port, desc->inet.s)); driver_enqv(ix, ev, n); - sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } return 0; } @@ -9324,7 +9372,10 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len) DEBUGF(("tcp_send(%ld): s=%d, about to send %d,%d bytes\r\n", (long)desc->inet.port, desc->inet.s, h_len, len)); - if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { + if (INETP(desc)->is_ignored) { + INETP(desc)->is_ignored |= INET_IGNORE_WRITE; + n = 0; + } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { sock_send(desc->inet.s, buf, 0, 0); n = 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s,iov,2,&n,0))) { @@ -9355,7 +9406,8 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len) n -= h_len; driver_enq(ix, ptr+n, len-n); } - sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } return 0; } @@ -9379,6 +9431,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) int ret = 0; ErlDrvPort ix = desc->inet.port; + ASSERT(!INETP(desc)->is_ignored); DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n", (long)desc->inet.port, desc->inet.s)); if (desc->inet.state == INET_STATE_CONNECTING) { -- cgit v1.2.3 From 23d62043ebf4bfad900935c650e8fcb3f2e6f88c Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:19:54 +0100 Subject: Change nbytes to 64 bit --- erts/emulator/drivers/common/efile_drv.c | 26 ++++++---------- erts/emulator/drivers/common/erl_efile.h | 2 +- erts/emulator/drivers/unix/unix_efile.c | 53 ++++++++++++++++++++------------ 3 files changed, 44 insertions(+), 37 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 98da63f4e3..5d785a75c4 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -421,7 +421,7 @@ struct t_data struct { int out_fd; off_t offset; - size_t nbytes; + Uint64 nbytes; Uint64 written; short flags; struct t_sendfile_hdtl *hdtl; @@ -1728,7 +1728,7 @@ static void invoke_sendfile(void *data) struct t_data *d = (struct t_data *)data; int fd = d->fd; int out_fd = d->c.sendfile.out_fd; - size_t nbytes = d->c.sendfile.nbytes; + Uint64 nbytes = d->c.sendfile.nbytes; int result = 0; d->again = 0; @@ -2197,12 +2197,13 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_preadv(data); break; case FILE_SENDFILE: - printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); + //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); if (d->result_ok == -1) { desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); if (sys_info.async_threads != 0) { SET_NONBLOCKING(d->c.sendfile.out_fd); + free_sendfile(data); } else { driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, ERL_DRV_USE, 0); @@ -2212,6 +2213,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) reply_Sint64(desc, d->c.sendfile.written); if (sys_info.async_threads != 0) { SET_NONBLOCKING(d->c.sendfile.out_fd); + free_sendfile(data); } else { driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, ERL_DRV_USE, 0); @@ -3367,7 +3369,8 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { case FILE_SENDFILE: { struct t_data *d; - Uint32 out_fd, offsetH, offsetL, nbytesH, nbytesL; + Uint32 out_fd, offsetH, offsetL; + Uint64 nbytes; char flags; /* DestFD:32, Offset:64, Bytes:64, @@ -3382,8 +3385,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { || !EV_GET_CHAR(ev, &flags, &p, &q) || !EV_GET_UINT32(ev, &offsetH, &p, &q) || !EV_GET_UINT32(ev, &offsetL, &p, &q) - || !EV_GET_UINT32(ev, &nbytesH, &p, &q) - || !EV_GET_UINT32(ev, &nbytesL, &p, &q)) { + || !EV_GET_UINT64(ev, &nbytes, &p, &q)) { /* Buffer has wrong length to contain all the needed values */ reply_posix_error(desc, EINVAL); goto done; @@ -3410,17 +3412,9 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->c.sendfile.offset = ((off_t) offsetH << 32) | offsetL; #endif - #if SIZEOF_SIZE_T == 4 - if (nbytesH != 0) { - reply_posix_error(desc, EINVAL); - goto done; - } - d->c.sendfile.nbytes = (size_t) nbytesL; - #else - d->c.sendfile.nbytes = ((size_t) nbytesH << 32) | nbytesL; - #endif + d->c.sendfile.nbytes = nbytes; - printf("sendfile(nbytes => %d, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); + printf("sendfile(nbytes => %ld, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index e0b8cfca03..8e79b3923a 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -163,4 +163,4 @@ int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *nbytes); + off_t *offset, Uint64 *nbytes); diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 61df572a91..911ec63588 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1469,33 +1469,46 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, } #ifdef HAVE_SENDFILE -#define SENDFILE_CHUNK_SIZE ((1 << 30) - 1) int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *nbytes) + off_t *offset, Uint64 *nbytes) { + // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); + Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) - ssize_t retval, written = 0; - // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); - if (*nbytes == 0) { - do { - *nbytes = SENDFILE_CHUNK_SIZE; // chunk size - retval = sendfile(out_fd, in_fd, offset, *nbytes); - if (retval > 0) - written += retval; - } while (retval == SENDFILE_CHUNK_SIZE); - } else { - retval = sendfile(out_fd, in_fd, offset, *nbytes); - if (retval > 0) - written = retval; - } +#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_SIZE_T)) - 1) + ssize_t retval; + do { + // check if *nbytes is 0 or greater than the largest size_t + if (*nbytes == 0 || *nbytes > SENDFILE_CHUNK_SIZE) + retval = sendfile(out_fd, in_fd, offset, SENDFILE_CHUNK_SIZE); + else + retval = sendfile(out_fd, in_fd, offset, *nbytes); + if (retval > 0) { + written += retval; + *nbytes -= retval; + } + } while (retval == SENDFILE_CHUNK_SIZE); *nbytes = written; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) - off_t len = *nbytes; - int retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); - *offset += len; - *nbytes = len; +#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_OFF_T)) - 1) + int retval; + off_t len; + do { + // check if *nbytes is 0 or greater than the largest off_t + if(*nbytes > SENDFILE_CHUNK_SIZE) + len = SENDFILE_CHUNK_SIZE; + else + len = *nbytes; + retval = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); + if (retval != -1 || errno == EAGAIN || errno == EINTR) { + *offset += len; + *nbytes -= len; + written += len; + } + } while (len == SENDFILE_CHUNK_SIZE); + *nbytes = written; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) off_t len = 0; -- cgit v1.2.3 From a508d712553761d3cdc69d5e14c09ba3a6530d7a Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Tue, 15 Nov 2011 10:32:03 +0100 Subject: Fix freebsd support for sendfile --- erts/emulator/drivers/unix/unix_efile.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 911ec63588..14b7a5cffa 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1511,9 +1511,22 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, *nbytes = written; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) - off_t len = 0; - int retval = sendfile(in_fd, out_fd, *offset, *nbytes, NULL, &len, 0); - *nbytes = len; +#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_SIZE_T)) - 1) + off_t len; + int retval; + do { + if (*nbytes > SENDFILE_CHUNK_SIZE) + retval = sendfile(in_fd, out_fd, *offset, SENDFILE_CHUNK_SIZE, + NULL, &len, 0); + else + retval = sendfile(in_fd, out_fd, *offset, *nbytes, NULL, &len, 0); + if (retval != -1 || errno == EAGAIN || errno == EINTR) { + *offset += len; + *nbytes -= len; + written += len; + } + } while(len == SENDFILE_CHUNK_SIZE); + *nbytes = written; return check_error(retval, errInfo); #endif } -- cgit v1.2.3 From a0d3a833cbd70971aa0c79da9853502e6631524d Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:02:57 +0100 Subject: Add ifdef's for HAVE_SENDFILE --- erts/emulator/drivers/common/efile_drv.c | 42 ++++++++++++++++++++++++++++---- erts/emulator/drivers/common/erl_efile.h | 2 ++ erts/emulator/drivers/unix/unix_efile.c | 12 ++------- 3 files changed, 41 insertions(+), 15 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 5d785a75c4..b26e244312 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -224,7 +224,6 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); -static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -232,12 +231,17 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); -static void file_stop_select(ErlDrvEvent event, void* _); +#ifdef HAVE_SENDFILE +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); +static void file_stop_select(ErlDrvEvent event, void* _); +#endif /* HAVE_SENDFILE */ enum e_timer {timer_idle, timer_again, timer_write}; +#ifdef HAVE_SENDFILE enum e_sendfile {sending, not_sending}; +#endif /* HAVE_SENDFILE */ struct t_data; @@ -252,7 +256,9 @@ typedef struct { struct t_data *cq_head; /* Queue of incoming commands */ struct t_data *cq_tail; /* -""- */ enum e_timer timer_state; +#ifdef HAVE_SENDFILE enum e_sendfile sendfile_state; +#endif /* HAVE_SENDFILE */ size_t read_bufsize; ErlDrvBinary *read_binp; size_t read_offset; @@ -275,7 +281,11 @@ struct erl_drv_entry efile_driver_entry = { file_stop, file_output, NULL, +#ifdef HAVE_SENDFILE file_ready_output, +#else + NULL, +#endif /* HAVE_SENDFILE */ "efile", NULL, NULL, @@ -292,7 +302,11 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, NULL, +#ifdef HAVE_SENDFILE file_stop_select +#else + NULL +#endif /* HAVE_SENDFILE */ }; @@ -344,12 +358,14 @@ struct t_readdir_buf { char buf[READDIR_BUFSIZE]; }; +#ifdef HAVE_SENDFILE struct t_sendfile_hdtl { int hdr_cnt; /* number of header iovecs */ struct iovec *headers; /* pointer to header iovecs */ int trl_cnt; /* number of trailer iovecs */ struct iovec *trailers; /* pointer to trailer iovecs */ }; +#endif /* HAVE_SENDFILE */ struct t_data { @@ -418,6 +434,7 @@ struct t_data Sint64 length; int advise; } fadvise; +#ifdef HAVE_SENDFILE struct { int out_fd; off_t offset; @@ -426,6 +443,7 @@ struct t_data short flags; struct t_sendfile_hdtl *hdtl; } sendfile; +#endif /* HAVE_SENDFILE */ } c; char b[1]; }; @@ -683,7 +701,9 @@ file_start(ErlDrvPort port, char* command) desc->cq_head = NULL; desc->cq_tail = NULL; desc->timer_state = timer_idle; +#ifdef HAVE_SENDFILE desc->sendfile_state = not_sending; +#endif desc->read_bufsize = 0; desc->read_binp = NULL; desc->read_offset = 0; @@ -1723,6 +1743,7 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } +#ifdef HAVE_SENDFILE static void invoke_sendfile(void *data) { struct t_data *d = (struct t_data *)data; @@ -1781,8 +1802,10 @@ static void file_ready_output(ErlDrvData data, ErlDrvEvent event) static void file_stop_select(ErlDrvEvent event, void* _) { - /* TODO: close socket? */ + } +#endif /* HAVE_SENDFILE */ + static void free_readdir(void *data) { @@ -1843,9 +1866,12 @@ static int try_again(file_descriptor *desc, struct t_data *d) { static void cq_execute(file_descriptor *desc) { struct t_data *d; register void *void_ptr; /* Soft cast variable */ - if (desc->timer_state == timer_again || - desc->sendfile_state == sending) + if (desc->timer_state == timer_again) return; +#ifdef HAVE_SENDFILE + if (desc->sendfile_state == sending) + return; +#endif if (! (d = cq_deq(desc))) return; TRACE_F(("x%i", (int) d->command)); @@ -2196,6 +2222,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } free_preadv(data); break; +#ifdef HAVE_SENDFILE case FILE_SENDFILE: //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); if (d->result_ok == -1) { @@ -2225,6 +2252,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) ERL_DRV_USE|ERL_DRV_WRITE, 1); } break; +#endif default: abort(); } @@ -3368,6 +3396,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { case FILE_SENDFILE: { +#ifdef HAVE_SENDFILE struct t_data *d; Uint32 out_fd, offsetH, offsetL; Uint64 nbytes; @@ -3424,6 +3453,9 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { } cq_enq(desc, d); +#else + reply_posix_error(desc, ENOTSUP); +#endif goto done; } /* case FILE_SENDFILE: */ diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index 8e79b3923a..fd6dc94755 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -162,5 +162,7 @@ int efile_symlink(Efile_error* errInfo, char* old, char* new); int efile_may_openfile(Efile_error* errInfo, char *name); int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); +#ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, Uint64 *nbytes); +#endif /* HAVE_SENDFILE */ diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 14b7a5cffa..dc118c9b9f 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -33,7 +33,7 @@ #include #include #endif -#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) +#if defined(HAVE_SENDFILE) && (defined(__linux__) || (defined(__sun) && defined(__SVR4))) #include #endif @@ -1530,12 +1530,4 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, return check_error(retval, errInfo); #endif } -#else /* no sendfile() */ -int -efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *count) -{ - errno = ENOTSUP; - return check_error(-1, errInfo); -} -#endif +#endif /* HAVE_SENDFILE */ -- cgit v1.2.3 From ce8fb42d7e92a95666e40614684232d476509cbe Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:04:59 +0100 Subject: Change type of fd to be ErlDrvEvent --- erts/emulator/drivers/common/efile_drv.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index b26e244312..c4f92cc318 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -1748,7 +1748,7 @@ static void invoke_sendfile(void *data) { struct t_data *d = (struct t_data *)data; int fd = d->fd; - int out_fd = d->c.sendfile.out_fd; + int out_fd = (int)d->c.sendfile.out_fd; Uint64 nbytes = d->c.sendfile.nbytes; int result = 0; d->again = 0; @@ -1790,7 +1790,7 @@ static void file_ready_output(ErlDrvData data, ErlDrvEvent event) switch (fd->d->command) { case FILE_SENDFILE: - driver_select(fd->port, (ErlDrvEvent)fd->d->c.sendfile.out_fd, + driver_select(fd->port, event, (int)ERL_DRV_WRITE,(int) 0); invoke_sendfile((void *)fd->d); file_async_ready(data, (ErlDrvThreadData)fd->d); @@ -2232,8 +2232,8 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) SET_NONBLOCKING(d->c.sendfile.out_fd); free_sendfile(data); } else { - driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, - ERL_DRV_USE, 0); + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, + ERL_DRV_USE, 0); } } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; @@ -2242,13 +2242,12 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) SET_NONBLOCKING(d->c.sendfile.out_fd); free_sendfile(data); } else { - driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, - ERL_DRV_USE, 0); + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0); } } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; desc->d = d; - driver_select(desc->port, (ErlDrvEvent)d->c.sendfile.out_fd, + driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE|ERL_DRV_WRITE, 1); } break; -- cgit v1.2.3 From 16b395a11ddc45ee8a36324ed0fb543f4065fc76 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 16 Nov 2011 16:05:34 +0100 Subject: Set chunk size to 3 GB It is not possible to use the maximum size_t/off_t for the chunks as that causes sendfile to return einval. 3GB seems to work on all *nix platforms. --- erts/emulator/drivers/unix/unix_efile.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index dc118c9b9f..8db7f2336e 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1469,6 +1469,8 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, } #ifdef HAVE_SENDFILE +#define SENDFILE_CHUNK_SIZE ((1 << 30) -1) + int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, Uint64 *nbytes) @@ -1476,7 +1478,6 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) -#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_SIZE_T)) - 1) ssize_t retval; do { // check if *nbytes is 0 or greater than the largest size_t @@ -1488,11 +1489,10 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, written += retval; *nbytes -= retval; } - } while (retval == SENDFILE_CHUNK_SIZE); + } while (retval != -1 && retval == SENDFILE_CHUNK_SIZE); *nbytes = written; return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) -#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_OFF_T)) - 1) int retval; off_t len; do { @@ -1511,7 +1511,6 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, *nbytes = written; return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) -#define SENDFILE_CHUNK_SIZE ((1 << (8*SIZEOF_SIZE_T)) - 1) off_t len; int retval; do { -- cgit v1.2.3 From 82ed2e3d5d959b0dce61056c648bead385c77d65 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 12:20:51 +0100 Subject: Remove debug printouts --- erts/emulator/drivers/common/efile_drv.c | 7 ------- erts/emulator/drivers/unix/unix_efile.c | 1 - 2 files changed, 8 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index c4f92cc318..b807f110eb 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -1755,12 +1755,10 @@ static void invoke_sendfile(void *data) result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); - //printf("sendfile: result: %d, errno: %d, offset: %d, nbytes: %d\r\n",result, errno, d->c.sendfile.offset,nbytes); d->c.sendfile.written += nbytes; if (result == 1) { if (sys_info.async_threads != 0) { - printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); d->result_ok = 0; } else if (d->c.sendfile.nbytes == 0 && nbytes != 0) { d->result_ok = 1; @@ -1768,7 +1766,6 @@ static void invoke_sendfile(void *data) d->result_ok = 1; d->c.sendfile.nbytes -= nbytes; } else { - printf("==> sendfile DONE written=%ld\r\n", d->c.sendfile.written); d->result_ok = 0; } } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN @@ -1776,7 +1773,6 @@ static void invoke_sendfile(void *data) d->result_ok = 1; } else { d->result_ok = -1; - printf("==> sendfile ERROR %s\r\n", erl_errno_id(d->errInfo.posix_errno)); } } @@ -2224,7 +2220,6 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) break; #ifdef HAVE_SENDFILE case FILE_SENDFILE: - //printf("efile_ready_async: sendfile (d->result_ok == %d)\r\n",d->result_ok); if (d->result_ok == -1) { desc->sendfile_state = not_sending; reply_error(desc, &d->errInfo); @@ -3442,8 +3437,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->c.sendfile.nbytes = nbytes; - printf("sendfile(nbytes => %ld, offset => %d, flags => %x)\r\n",d->c.sendfile.nbytes,d->c.sendfile.offset, d->c.sendfile.flags); - /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ if (sys_info.async_threads != 0) { diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 8db7f2336e..01de088b0f 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1475,7 +1475,6 @@ int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, off_t *offset, Uint64 *nbytes) { - // printf("sendfile(%d,%d,%d,%d)\r\n",out_fd,in_fd,*offset,*nbytes); Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) ssize_t retval; -- cgit v1.2.3 From 9322161952ed25a96578f163cc383be605e7f75c Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 12:21:08 +0100 Subject: Use free_sendfile explicitly for non-async This is needed because aync job will not call free_sendfile if there is an async_ready callback. --- erts/emulator/drivers/common/efile_drv.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index b807f110eb..1f13a65350 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -2229,6 +2229,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } else { driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0); + free_sendfile(data); } } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; @@ -2238,6 +2239,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) free_sendfile(data); } else { driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0); + free_sendfile(data); } } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; @@ -3441,7 +3443,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { if (sys_info.async_threads != 0) { SET_BLOCKING(d->c.sendfile.out_fd); - d->free = free_sendfile; } cq_enq(desc, d); -- cgit v1.2.3 From a5b3d81936ab85edb8713f29baf85307ae0b25b8 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 20:29:59 +0100 Subject: Preliminary work on header/trailer Have to figure out how to represent progress in header writing when using non-blocking, not sure how to do this. --- erts/emulator/drivers/common/efile_drv.c | 68 +++++++++++++++++++++++++------- erts/emulator/drivers/common/erl_efile.h | 15 ++++++- erts/emulator/drivers/unix/unix_efile.c | 2 +- 3 files changed, 69 insertions(+), 16 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 1f13a65350..7eaafd5af1 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -358,15 +358,6 @@ struct t_readdir_buf { char buf[READDIR_BUFSIZE]; }; -#ifdef HAVE_SENDFILE -struct t_sendfile_hdtl { - int hdr_cnt; /* number of header iovecs */ - struct iovec *headers; /* pointer to header iovecs */ - int trl_cnt; /* number of trailer iovecs */ - struct iovec *trailers; /* pointer to trailer iovecs */ -}; -#endif /* HAVE_SENDFILE */ - struct t_data { struct t_data *next; @@ -530,6 +521,8 @@ static void *ef_safe_realloc(void *op, Uint s) !0) \ : 0) +/* int EV_GET_SYSIOVEC(ErlIoVec *ev, Uint32, int *cnt, SysIOVec **target, int *pp, int *qp) */ +#define EV_GET_SYSIOVEC ev_get_sysiovec #if 0 @@ -943,6 +936,41 @@ static int reply_eof(file_descriptor *desc) { return 0; } +static int ev_get_sysiovec(ErlIOVec *ev, Uint32 len, int *cnt, SysIOVec **target, int *pp, int *qp) { + int tmp_p = *pp, tmp_q = *qp, tmp_len = len, i; + SysIOVec *tmp_target; + while (tmp_len != 0) { + if (tmp_len + tmp_p > ev->iov[tmp_q].iov_len) { + + tmp_len -= ev->iov[tmp_q].iov_len - tmp_p; + tmp_q++; + tmp_p = 0; + if (tmp_q == ev->vsize) + return 0; + } else break; + } + *cnt = tmp_q - *qp + 1; + tmp_target = EF_SAFE_ALLOC(sizeof(SysIOVec)* (*cnt)); + *target = tmp_target; + for (i = 0; i < *cnt; i++) { + tmp_target[i].iov_base = ev->iov[*qp].iov_base+*pp; + if (len + *pp <= ev->iov[*qp].iov_len) { + tmp_target[i].iov_len = len; + if (len + *pp == ev->iov[*qp].iov_len) { + *pp = 0; + (*qp)++; + } else + *pp += len; + } else { + tmp_target[i].iov_len = ev->iov[*qp].iov_len - *pp; + len -= ev->iov[*qp].iov_len - *pp; + *pp = 0; + (*qp)++; + } + } + return 1; +} + static void invoke_name(void *data, int (*f)(Efile_error *, char *)) @@ -1753,7 +1781,7 @@ static void invoke_sendfile(void *data) int result = 0; d->again = 0; - result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes); + result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes, &d->c.sendfile.hdtl); d->c.sendfile.written += nbytes; @@ -3394,7 +3422,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { #ifdef HAVE_SENDFILE struct t_data *d; - Uint32 out_fd, offsetH, offsetL; + Uint32 out_fd, offsetH, offsetL, hd_len, tl_len; Uint64 nbytes; char flags; @@ -3410,7 +3438,9 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { || !EV_GET_CHAR(ev, &flags, &p, &q) || !EV_GET_UINT32(ev, &offsetH, &p, &q) || !EV_GET_UINT32(ev, &offsetL, &p, &q) - || !EV_GET_UINT64(ev, &nbytes, &p, &q)) { + || !EV_GET_UINT64(ev, &nbytes, &p, &q) + || !EV_GET_UINT32(ev, &hd_len, &p, &q) + || !EV_GET_UINT32(ev, &tl_len, &p, &q)) { /* Buffer has wrong length to contain all the needed values */ reply_posix_error(desc, EINVAL); goto done; @@ -3438,8 +3468,18 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { #endif d->c.sendfile.nbytes = nbytes; - - /* Do HEADER TRAILER stuff by calculating pointer places, not by copying data! */ + if (hd_len == 0 && tl_len == 0) + d->c.sendfile.hdtl = NULL; + else { + d->c.sendfile.hdtl = EF_SAFE_ALLOC(sizeof(struct t_sendfile_hdtl)); + if (!EV_GET_SYSIOVEC(ev, hd_len, &d->c.sendfile.hdtl->hdr_cnt, &d->c.sendfile.hdtl->headers, &p, &q) + || !EV_GET_SYSIOVEC(ev, tl_len, &d->c.sendfile.hdtl->trl_cnt, &d->c.sendfile.hdtl->trailers, &p, &q)) { + EF_FREE(d->c.sendfile.hdtl); + EF_FREE(d); + reply_posix_error(desc, EINVAL); + goto done; + } + } if (sys_info.async_threads != 0) { SET_BLOCKING(d->c.sendfile.out_fd); diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index fd6dc94755..b73fb35120 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -118,6 +118,19 @@ typedef struct _Efile_info { */ } Efile_info; + +#ifdef HAVE_SENDFILE +/* + * Described the structure of header/trailers for sendfile + */ +struct t_sendfile_hdtl { + SysIOVec *headers; + int hdr_cnt; + SysIOVec *trailers; + int trl_cnt; +}; +#endif /* HAVE_SENDFILE */ + /* * Functions. */ @@ -164,5 +177,5 @@ int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, Uint64 *nbytes); + off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl **hdtl); #endif /* HAVE_SENDFILE */ diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 01de088b0f..138c550fdd 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1473,7 +1473,7 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, Uint64 *nbytes) + off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl** hdtl) { Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) -- cgit v1.2.3 From 27faa34693f35b6aa41fa67cbfe365bd082a5757 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Sun, 27 Nov 2011 17:33:19 +0100 Subject: Remove windows implementation --- erts/emulator/drivers/win32/win_efile.c | 24 ------------------------ 1 file changed, 24 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/win32/win_efile.c b/erts/emulator/drivers/win32/win_efile.c index 0f41a09bf6..931bb196f1 100644 --- a/erts/emulator/drivers/win32/win_efile.c +++ b/erts/emulator/drivers/win32/win_efile.c @@ -1581,27 +1581,3 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, errno = ERROR_SUCCESS; return check_error(0, errInfo); } - -int -efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, size_t *count) -{ - /* TODO: write proper Windows TransmitFile based implementation */ - /* use overlapped I/O and driver_select on the structure? */ - /* int res = efile_seek(errInfo, in_fd, *offset, EFILE_SEEK_SET, NULL); */ - /* if (res) { */ - /* /\* TODO: could in_fd be shared and require protecting/locking */ - /* efile_seek/SetFilePointerEx? *\/ */ - /* if (TransmitFile((SOCKET) out_fd, (HANDLE) in_fd, *count, */ - /* 0, NULL, NULL, 0)) { */ - /* return check_error(0, errInfo); */ - /* } else { */ - /* /\* TODO: correct error handling? *\/ */ - /* return set_error(errInfo); */ - /* } */ - /* } else { */ - /* return res; */ - /* } */ - errno = ENOTSUP; - return check_error(-1, errInfo); -} -- cgit v1.2.3 From 1bbf8cee44b8836d66d289cc0b5b314ed83de821 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Tue, 29 Nov 2011 11:05:37 +0100 Subject: Remove header/trailer support Since the API for headers/trailers seem to be very awkward to work with when using non-blocking io the feature is dropped for now. See unix_efile.c for more details. --- erts/emulator/drivers/common/efile_drv.c | 72 ++++---------------------------- erts/emulator/drivers/common/erl_efile.h | 2 +- erts/emulator/drivers/unix/unix_efile.c | 14 ++++++- 3 files changed, 22 insertions(+), 66 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 7eaafd5af1..5c52b99348 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -431,8 +431,6 @@ struct t_data off_t offset; Uint64 nbytes; Uint64 written; - short flags; - struct t_sendfile_hdtl *hdtl; } sendfile; #endif /* HAVE_SENDFILE */ } c; @@ -521,9 +519,6 @@ static void *ef_safe_realloc(void *op, Uint s) !0) \ : 0) -/* int EV_GET_SYSIOVEC(ErlIoVec *ev, Uint32, int *cnt, SysIOVec **target, int *pp, int *qp) */ -#define EV_GET_SYSIOVEC ev_get_sysiovec - #if 0 @@ -935,43 +930,6 @@ static int reply_eof(file_descriptor *desc) { driver_output2(desc->port, &c, 1, NULL, 0); return 0; } - -static int ev_get_sysiovec(ErlIOVec *ev, Uint32 len, int *cnt, SysIOVec **target, int *pp, int *qp) { - int tmp_p = *pp, tmp_q = *qp, tmp_len = len, i; - SysIOVec *tmp_target; - while (tmp_len != 0) { - if (tmp_len + tmp_p > ev->iov[tmp_q].iov_len) { - - tmp_len -= ev->iov[tmp_q].iov_len - tmp_p; - tmp_q++; - tmp_p = 0; - if (tmp_q == ev->vsize) - return 0; - } else break; - } - *cnt = tmp_q - *qp + 1; - tmp_target = EF_SAFE_ALLOC(sizeof(SysIOVec)* (*cnt)); - *target = tmp_target; - for (i = 0; i < *cnt; i++) { - tmp_target[i].iov_base = ev->iov[*qp].iov_base+*pp; - if (len + *pp <= ev->iov[*qp].iov_len) { - tmp_target[i].iov_len = len; - if (len + *pp == ev->iov[*qp].iov_len) { - *pp = 0; - (*qp)++; - } else - *pp += len; - } else { - tmp_target[i].iov_len = ev->iov[*qp].iov_len - *pp; - len -= ev->iov[*qp].iov_len - *pp; - *pp = 0; - (*qp)++; - } - } - return 1; -} - - static void invoke_name(void *data, int (*f)(Efile_error *, char *)) { @@ -1781,7 +1739,7 @@ static void invoke_sendfile(void *data) int result = 0; d->again = 0; - result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes, &d->c.sendfile.hdtl); + result = efile_sendfile(&d->errInfo, fd, out_fd, &d->c.sendfile.offset, &nbytes, NULL); d->c.sendfile.written += nbytes; @@ -3426,14 +3384,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { Uint64 nbytes; char flags; - /* DestFD:32, Offset:64, Bytes:64, - ChunkSize:64, - (get_bit(Nodiskio)):1, - (get_bit(MNowait)):1, - (get_bit(Sync)):1,0:5, - (encode_hdtl(Headers))/binary, - (encode_hdtl(Trailers))/binary */ - if (ev->size < 1 + 1 + 5 * sizeof(Uint32) + sizeof(char) + if (ev->size < 1 + 7 * sizeof(Uint32) + sizeof(char) || !EV_GET_UINT32(ev, &out_fd, &p, &q) || !EV_GET_CHAR(ev, &flags, &p, &q) || !EV_GET_UINT32(ev, &offsetH, &p, &q) @@ -3446,6 +3397,12 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { goto done; } + if (hd_len != 0 || tl_len != 0 || flags != 0) { + // We do not allow header, trailers and/or flags right now + reply_posix_error(desc, EINVAL); + goto done; + } + d = EF_SAFE_ALLOC(sizeof(struct t_data)); d->fd = desc->fd; d->command = command; @@ -3454,7 +3411,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->level = 2; d->c.sendfile.out_fd = (int) out_fd; - d->c.sendfile.flags = (int) flags; d->c.sendfile.written = 0; #if SIZEOF_OFF_T == 4 @@ -3468,18 +3424,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { #endif d->c.sendfile.nbytes = nbytes; - if (hd_len == 0 && tl_len == 0) - d->c.sendfile.hdtl = NULL; - else { - d->c.sendfile.hdtl = EF_SAFE_ALLOC(sizeof(struct t_sendfile_hdtl)); - if (!EV_GET_SYSIOVEC(ev, hd_len, &d->c.sendfile.hdtl->hdr_cnt, &d->c.sendfile.hdtl->headers, &p, &q) - || !EV_GET_SYSIOVEC(ev, tl_len, &d->c.sendfile.hdtl->trl_cnt, &d->c.sendfile.hdtl->trailers, &p, &q)) { - EF_FREE(d->c.sendfile.hdtl); - EF_FREE(d); - reply_posix_error(desc, EINVAL); - goto done; - } - } if (sys_info.async_threads != 0) { SET_BLOCKING(d->c.sendfile.out_fd); diff --git a/erts/emulator/drivers/common/erl_efile.h b/erts/emulator/drivers/common/erl_efile.h index b73fb35120..349ab0e17b 100644 --- a/erts/emulator/drivers/common/erl_efile.h +++ b/erts/emulator/drivers/common/erl_efile.h @@ -177,5 +177,5 @@ int efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, Sint64 length, int advise); #ifdef HAVE_SENDFILE int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl **hdtl); + off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl *hdtl); #endif /* HAVE_SENDFILE */ diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 138c550fdd..72911641d3 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1471,9 +1471,21 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, #ifdef HAVE_SENDFILE #define SENDFILE_CHUNK_SIZE ((1 << 30) -1) +/* + * sendfile: The implementation of the sendfile system call varies + * a lot on different *nix platforms so to make the api similar in all + * we have to emulate some things in linux and play with variables on + * bsd/darwin. + * + * It could be possible to implement header/trailer in sendfile, though + * you would have to emulate it in linux and on BSD/Darwin some complex + * calculations have to be made when using a non blocking socket to figure + * out how much of the header/file/trailer was sent in each command. + */ + int efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, - off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl** hdtl) + off_t *offset, Uint64 *nbytes, struct t_sendfile_hdtl* hdtl) { Uint64 written = 0; #if defined(__linux__) || (defined(__sun) && defined(__SVR4)) -- cgit v1.2.3