diff options
Diffstat (limited to 'erts/emulator/drivers/common/efile_drv.c')
-rw-r--r-- | erts/emulator/drivers/common/efile_drv.c | 234 |
1 files changed, 232 insertions, 2 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 901d98c09d..509c4fe48c 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -55,6 +55,7 @@ #define FILE_READ_LINE 29 #define FILE_FDATASYNC 30 #define FILE_FADVISE 31 +#define FILE_SENDFILE 32 /* Return codes */ @@ -217,6 +218,7 @@ typedef unsigned char uchar; static ErlDrvData file_start(ErlDrvPort port, char* command); static int file_init(void); static void file_stop(ErlDrvData); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event); static void file_output(ErlDrvData, char* buf, int len); static int file_control(ErlDrvData, unsigned int command, char* buf, int len, char **rbuf, int rlen); @@ -224,6 +226,7 @@ static void file_timeout(ErlDrvData); static void file_outputv(ErlDrvData, ErlIOVec*); static void file_async_ready(ErlDrvData, ErlDrvThreadData); static void file_flush(ErlDrvData); +static void file_stop_select(ErlDrvEvent event, void* _); @@ -253,6 +256,18 @@ typedef struct { ErlDrvPDL q_mtx; /* Mutex for the driver queue, known by the emulator. Also used for mutual exclusion when accessing field(s) below. */ size_t write_buffered; + ErlDrvTermData caller; /* recipient of sync reply */ + /* sendfile call state to retry/resume on event */ + int command; /* same as d->command. for sendfile. TODO: this seems wrong */ + struct { + int eagain; + int out_fd; + /* TODO: Use Sint64 instead? What about 32-bit off_t linux */ + off_t offset; + size_t count; + size_t chunksize; + ErlDrvSInt64 written; + } sendfile; } file_descriptor; @@ -264,7 +279,7 @@ struct erl_drv_entry efile_driver_entry = { file_stop, file_output, NULL, - NULL, + file_ready_output, "efile", NULL, NULL, @@ -279,7 +294,9 @@ struct erl_drv_entry efile_driver_entry = { ERL_DRV_EXTENDED_MAJOR_VERSION, ERL_DRV_EXTENDED_MINOR_VERSION, ERL_DRV_FLAG_USE_PORT_LOCKING, - NULL + NULL, + NULL, + file_stop_select }; @@ -613,6 +630,111 @@ static struct t_data *cq_deq(file_descriptor *desc) { } +/********************************************************************* + * Command queue functions + */ + +static ErlDrvTermData am_ok; +static ErlDrvTermData am_error; +static ErlDrvTermData am_efile_reply; + +#define INIT_ATOM(NAME) am_ ## NAME = driver_mk_atom(#NAME) + +#define LOAD_ATOM_CNT 2 +#define LOAD_ATOM(vec, i, atom) \ + (((vec)[(i)] = ERL_DRV_ATOM), \ + ((vec)[(i)+1] = (atom)), \ + ((i)+LOAD_ATOM_CNT)) + +#define LOAD_INT_CNT 2 +#define LOAD_INT(vec, i, val) \ + (((vec)[(i)] = ERL_DRV_INT), \ + ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ + ((i)+LOAD_INT_CNT)) + +#define LOAD_INT64_CNT 2 +#define LOAD_INT64(vec, i, val) \ + (((vec)[(i)] = ERL_DRV_INT64), \ + ((vec)[(i)+1] = (ErlDrvTermData)(val)), \ + ((i)+LOAD_INT64_CNT)) + +#define LOAD_PORT_CNT 2 +#define LOAD_PORT(vec, i, port) \ + (((vec)[(i)] = ERL_DRV_PORT), \ + ((vec)[(i)+1] = (port)), \ + ((i)+LOAD_PORT_CNT)) + +#define LOAD_PID_CNT 2 +#define LOAD_PID(vec, i, pid) \ + (((vec)[(i)] = ERL_DRV_PID), \ + ((vec)[(i)+1] = (pid)), \ + ((i)+LOAD_PID_CNT)) + +#define LOAD_TUPLE_CNT 2 +#define LOAD_TUPLE(vec, i, size) \ + (((vec)[(i)] = ERL_DRV_TUPLE), \ + ((vec)[(i)+1] = (size)), \ + ((i)+LOAD_TUPLE_CNT)) + +/* send: +** {efile_reply, Pid, Port, {ok, int64()}} +*/ + +static int ef_send_ok_int64(file_descriptor *desc, ErlDrvTermData caller, + ErlDrvSInt64 *n) +{ + ErlDrvTermData spec[2*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT + + LOAD_INT64_CNT + 2*LOAD_TUPLE_CNT]; + int i = 0; + + i = LOAD_ATOM(spec, i, am_efile_reply); + i = LOAD_PID(spec, i, caller); + i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); + i = LOAD_ATOM(spec, i, am_ok); + i = LOAD_INT64(spec, i, n); + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 4); + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + return driver_send_term(desc->port, caller, spec, i); +} + +static ErlDrvTermData error_atom(int err) +{ + char errstr[256]; + char* s; + char* t; + + for (s = erl_errno_id(err), t = errstr; *s; s++, t++) + *t = tolower(*s); + *t = '\0'; + return driver_mk_atom(errstr); +} + +/* send: +** {efile_reply, Pid, Port, {error, posix_error()} +*/ + +static int ef_send_posix_error(file_descriptor *desc, ErlDrvTermData caller, + int e) +{ + ErlDrvTermData spec[3*LOAD_ATOM_CNT + LOAD_PID_CNT + LOAD_PORT_CNT + + 2*LOAD_TUPLE_CNT]; + int i = 0; + + i = LOAD_ATOM(spec, i, am_efile_reply); + i = LOAD_PID(spec, i, caller); + i = LOAD_PORT(spec, i, driver_mk_port(desc->port)); + i = LOAD_ATOM(spec, i, am_error); + /* TODO: safe? set of error codes should be limited and safe */ + i = LOAD_ATOM(spec, i, error_atom(e)); + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 4); + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + desc->caller = 0; + return driver_send_term(desc->port, caller, spec, i); +} /********************************************************************* * Driver entry point -> init @@ -628,6 +750,11 @@ file_init(void) ? atoi(buf) : 0); driver_system_info(&sys_info, sizeof(ErlDrvSysInfo)); + + INIT_ATOM(ok); + INIT_ATOM(error); + INIT_ATOM(efile_reply); + return 0; } @@ -1694,6 +1821,74 @@ static void invoke_fadvise(void *data) d->result_ok = efile_fadvise(&d->errInfo, fd, offset, length, advise); } + + +static void do_sendfile(file_descriptor *desc); +static void file_ready_output(ErlDrvData data, ErlDrvEvent event) +{ + file_descriptor* d = (file_descriptor*) data; + + switch (d->command) { + case FILE_SENDFILE: + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_WRITE, 0); + do_sendfile(d); + break; + default: + break; + } +} + +static void file_stop_select(ErlDrvEvent event, void* _) +{ + /* TODO: close socket? */ +} + +static void invoke_sendfile(void *data) +{ + ((struct t_data *)data)->again = 0; +} + +static void do_sendfile(file_descriptor *d) +{ + int fd = d->fd; + int out_fd = d->sendfile.out_fd; + off_t offset = d->sendfile.offset; + size_t count = d->sendfile.count; + size_t chunksize = count < d->sendfile.chunksize + ? count : d->sendfile.chunksize; + int result_ok = 0; + Efile_error errInfo; + + result_ok = efile_sendfile(&errInfo, fd, out_fd, &offset, &chunksize); + + if (result_ok) { + d->sendfile.offset += chunksize; + d->sendfile.written += chunksize; + d->sendfile.count -= chunksize; + if (d->sendfile.count > 0) { + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + } else { + printf("==> sendfile DONE eagain=%d\n", d->sendfile.eagain); + ef_send_ok_int64(d, d->caller, &d->sendfile.written); + } + } else if (errInfo.posix_errno == EAGAIN || errInfo.posix_errno == EINTR) { + if (chunksize > 0) { + d->sendfile.offset += chunksize; + d->sendfile.written += chunksize; + d->sendfile.count -= chunksize; + } + d->sendfile.eagain++; + + driver_select(d->port, (ErlDrvEvent)d->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + } else { + printf("==> sendfile ERROR %s\n", erl_errno_id(errInfo.posix_errno)); + ef_send_posix_error(d, d->caller, errInfo.posix_errno); + } +} + static void free_readdir(void *data) { struct t_data *d = (struct t_data *) data; @@ -2105,6 +2300,13 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } free_preadv(data); break; + case FILE_SENDFILE: + /* Return 'ok' and let prim_file:sendfile wait for message */ + reply_ok(desc); + driver_select(desc->port, (ErlDrvEvent)desc->sendfile.out_fd, + ERL_DRV_USE|ERL_DRV_WRITE, 1); + free_data(data); + break; default: abort(); } @@ -2452,6 +2654,34 @@ file_output(ErlDrvData e, char* buf, int count) goto done; } + case FILE_SENDFILE: + { + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->fd = fd; + d->command = command; + d->invoke = invoke_sendfile; + d->free = free_data; + d->level = 2; + desc->sendfile.out_fd = get_int32((uchar*) buf); + /* TODO: are off_t and size_t 64bit on all platforms? + off_t is 32bit on win32 msvc. maybe configurable in msvc. + Maybe use '#if SIZEOF_SIZE_T == 4'? */ + desc->sendfile.offset = get_int64(((uchar*) buf) + + sizeof(Sint32)); + desc->sendfile.count = get_int64(((uchar*) buf) + + sizeof(Sint32) + + sizeof(Sint64)); + desc->sendfile.chunksize = get_int64(((uchar*) buf) + + sizeof(Sint32) + + 2*sizeof(Sint64)); + desc->sendfile.written = 0; + desc->sendfile.eagain = 0; + /* TODO: shouldn't d->command be enough? */ + desc->command = command; + desc->caller = driver_caller(desc->port); + goto done; + } + } /* |