diff options
Diffstat (limited to 'erts/emulator/drivers/common/efile_drv.c')
-rw-r--r-- | erts/emulator/drivers/common/efile_drv.c | 256 |
1 files changed, 166 insertions, 90 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 912f5d3d8b..595b0488a8 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2012. All Rights Reserved. + * Copyright Ericsson AB 1996-2013. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in @@ -56,7 +56,8 @@ #define FILE_FDATASYNC 30 #define FILE_FADVISE 31 #define FILE_SENDFILE 32 - +#define FILE_FALLOCATE 33 +#define FILE_CLOSE_ON_PORT_EXIT 34 /* Return codes */ #define FILE_RESP_OK 0 @@ -177,6 +178,7 @@ dt_private *get_dt_private(int); #define MUTEX_LOCK(m) do { IF_THRDS { TRACE_DRIVER; driver_pdl_lock(m); } } while (0) #define MUTEX_UNLOCK(m) do { IF_THRDS { TRACE_DRIVER; driver_pdl_unlock(m); } } while (0) #else +#define IF_THRDS if (0) #define MUTEX_INIT(m, p) #define MUTEX_LOCK(m) #define MUTEX_UNLOCK(m) @@ -395,7 +397,6 @@ struct t_pwritev { ErlDrvPort port; ErlDrvPDL q_mtx; size_t size; - size_t free_size; unsigned cnt; unsigned n; struct t_pbuf_spec specs[1]; @@ -428,6 +429,7 @@ struct t_data int level; void (*invoke)(void *); void (*free)(void *); + void *data_to_free; /* used by FILE_CLOSE_ON_PORT_EXIT only */ int again; int reply; #ifdef USE_VM_PROBES @@ -439,6 +441,7 @@ struct t_data Efile_error errInfo; int flags; SWord fd; + int is_fd_unused; /**/ Efile_info info; EFILE_DIR_HANDLE dir_handle; /* Handle to open directory. */ @@ -458,7 +461,6 @@ struct t_data ErlDrvPort port; ErlDrvPDL q_mtx; size_t size; - size_t free_size; size_t reply_size; } writev; struct t_pwritev pwritev; @@ -503,6 +505,10 @@ struct t_data Uint64 written; } sendfile; #endif /* HAVE_SENDFILE */ + struct { + Sint64 offset; + Sint64 length; + } fallocate; } c; char b[1]; }; @@ -781,11 +787,6 @@ file_start(ErlDrvPort port, char* command) return (ErlDrvData) desc; } -static void free_data(void *data) -{ - EF_FREE(data); -} - static void do_close(int flags, SWord fd) { if (flags & EFILE_COMPRESSED) { erts_gzclose((gzFile)(fd)); @@ -803,25 +804,27 @@ static void invoke_close(void *data) DTRACE_INVOKE_RETURN(FILE_CLOSE); } -/********************************************************************* - * Driver entry point -> stop - */ -static void -file_stop(ErlDrvData e) +static void free_data(void *data) { - file_descriptor* desc = (file_descriptor*)e; - - TRACE_C('p'); + struct t_data *d = (struct t_data *) data; - if (desc->fd != FILE_FD_INVALID) { - do_close(desc->flags, desc->fd); - desc->fd = FILE_FD_INVALID; - desc->flags = 0; - } - if (desc->read_binp) { - driver_free_binary(desc->read_binp); + switch (d->command) { + case FILE_OPEN: + if (d->is_fd_unused && d->fd != FILE_FD_INVALID) { + /* This is OK to do in scheduler thread because there can be no async op + ongoing for this fd here, as we exited during async open. + Ideally, this close should happen in an async thread too, but that would + require a substantial rewrite, as we are here because of a dead port and + cannot schedule async jobs for that port any more... */ + do_close(d->flags, d->fd); + } + break; + case FILE_CLOSE_ON_PORT_EXIT: + EF_FREE(d->data_to_free); + break; } - EF_FREE(desc); + + EF_FREE(data); } @@ -1144,7 +1147,7 @@ static void invoke_read_line(void *data) { struct t_data *d = (struct t_data *) data; int status; - size_t read_size; + size_t read_size = 0; int local_loop = (d->again == 0); DTRACE_INVOKE_SETUP(FILE_READ_LINE); @@ -1155,7 +1158,14 @@ static void invoke_read_line(void *data) /* Need more place */ ErlDrvSizeT need = (d->c.read_line.read_size >= DEFAULT_LINEBUF_SIZE) ? d->c.read_line.read_size + DEFAULT_LINEBUF_SIZE : DEFAULT_LINEBUF_SIZE; - ErlDrvBinary *newbin = driver_alloc_binary(need); + ErlDrvBinary *newbin; +#if !ALWAYS_READ_LINE_AHEAD + /* Use read_ahead size if need does not exceed it */ + if (need < (d->c.read_line.binp)->orig_size && + d->c.read_line.read_ahead) + need = (d->c.read_line.binp)->orig_size; +#endif + newbin = driver_alloc_binary(need); if (newbin == NULL) { d->result_ok = 0; d->errInfo.posix_errno = ENOMEM; @@ -1334,7 +1344,7 @@ static void invoke_preadv(void *data) = efile_pread(&d->errInfo, (int) d->fd, c->offsets[c->cnt] + c->size, - ev->iov[1 + c->cnt].iov_base + c->size, + ((char *)ev->iov[1 + c->cnt].iov_base) + c->size, read_size, &bytes_read))) { bytes_read_so_far += bytes_read; @@ -1520,26 +1530,24 @@ static void invoke_writev(void *data) { } EF_FREE(iov); - d->c.writev.free_size = size; - d->c.writev.size -= size; if (! d->result_ok) { d->again = 0; + MUTEX_LOCK(d->c.writev.q_mtx); + driver_deq(d->c.writev.port, d->c.writev.size); + MUTEX_UNLOCK(d->c.writev.q_mtx); } else { if (! segment) { d->again = 0; } + d->c.writev.size -= size; TRACE_F(("w%lu", (unsigned long)size)); - + MUTEX_LOCK(d->c.writev.q_mtx); + driver_deq(d->c.writev.port, size); + MUTEX_UNLOCK(d->c.writev.q_mtx); } - DTRACE_INVOKE_RETURN(FILE_WRITE); -} -static void free_writev(void *data) { - struct t_data *d = data; - MUTEX_LOCK(d->c.writev.q_mtx); - driver_deq(d->c.writev.port, d->c.writev.size + d->c.writev.free_size); - MUTEX_UNLOCK(d->c.writev.q_mtx); - EF_FREE(d); + + DTRACE_INVOKE_RETURN(FILE_WRITE); } static void invoke_pwd(void *data) @@ -1590,7 +1598,7 @@ static void invoke_pwritev(void *data) { struct t_pwritev *c = &d->c.pwritev; size_t p; int segment; - size_t size, write_size; + size_t size, write_size, written; DTRACE_INVOKE_SETUP(FILE_PWRITEV); segment = d->again && c->size >= 2*FILE_SEGMENT_WRITE; @@ -1610,39 +1618,35 @@ static void invoke_pwritev(void *data) { if (iovlen < 0) goto error; /* Port terminated */ - for (iovcnt = 0, c->free_size = 0; - c->cnt < c->n && iovcnt < iovlen && c->free_size < size; + for (iovcnt = 0, written = 0; + c->cnt < c->n && iovcnt < iovlen && written < size; c->cnt++) { int chop; write_size = c->specs[c->cnt].size; if (iov[iovcnt].iov_len - p < write_size) { - /* Mismatch between pos/size spec and what is queued */ - d->errInfo.posix_errno = EINVAL; - d->result_ok = 0; - d->again = 0; - goto done; + goto error; } - chop = segment && c->free_size + write_size >= 2*FILE_SEGMENT_WRITE; + chop = segment && written + write_size >= 2*FILE_SEGMENT_WRITE; if (chop) { - ASSERT(c->free_size < FILE_SEGMENT_WRITE); + ASSERT(written < FILE_SEGMENT_WRITE); write_size = FILE_SEGMENT_WRITE + FILE_SEGMENT_WRITE/2 - - c->free_size; + - written; } d->result_ok = efile_pwrite(&d->errInfo, (int) d->fd, - iov[iovcnt].iov_base + p, + (char *)(iov[iovcnt].iov_base) + p, write_size, c->specs[c->cnt].offset); if (! d->result_ok) { d->again = 0; - goto done; + goto deq_error; } - c->free_size += write_size; + written += write_size; c->size -= write_size; if (chop) { c->specs[c->cnt].offset += write_size; c->specs[c->cnt].size -= write_size; /* Schedule out (d->again != 0) */ - goto done; + break; } /* Move forward in buffer */ p += write_size; @@ -1664,25 +1668,28 @@ static void invoke_pwritev(void *data) { d->errInfo.posix_errno = EINVAL; d->result_ok = 0; d->again = 0; + deq_error: + MUTEX_LOCK(d->c.writev.q_mtx); + driver_deq(d->c.pwritev.port, c->size); + MUTEX_UNLOCK(d->c.writev.q_mtx); + + goto done; } else { - ASSERT(c->free_size == size); + ASSERT(written == size); d->again = 0; } - } + } else + ASSERT(written >= FILE_SEGMENT_WRITE); + + MUTEX_LOCK(d->c.writev.q_mtx); + driver_deq(d->c.pwritev.port, written); + MUTEX_UNLOCK(d->c.writev.q_mtx); done: EF_FREE(iov); /* Free our copy of the vector, nothing to restore */ + DTRACE_INVOKE_RETURN(FILE_PWRITEV); } -static void free_pwritev(void *data) { - struct t_data *d = data; - - MUTEX_LOCK(d->c.writev.q_mtx); - driver_deq(d->c.pwritev.port, d->c.pwritev.free_size + d->c.pwritev.size); - MUTEX_UNLOCK(d->c.writev.q_mtx); - EF_FREE(d); -} - static void invoke_flstat(void *data) { struct t_data *d = (struct t_data *) data; @@ -1862,6 +1869,9 @@ static void invoke_open(void *data) } d->result_ok = status; + if (!status) { + d->fd = FILE_FD_INVALID; + } DTRACE_INVOKE_RETURN(FILE_OPEN); } @@ -1953,6 +1963,17 @@ static int flush_sendfile(file_descriptor *desc,void *_) { #endif /* HAVE_SENDFILE */ +static void invoke_fallocate(void *data) +{ + struct t_data *d = (struct t_data *) data; + int fd = (int) d->fd; + Sint64 offset = d->c.fallocate.offset; + Sint64 length = d->c.fallocate.length; + + d->again = 0; + d->result_ok = efile_fallocate(&d->errInfo, fd, offset, length); +} + static void free_readdir(void *data) { struct t_data *d = (struct t_data *) data; @@ -1982,21 +2003,8 @@ static void try_free_read_bin(file_descriptor *desc) { static int try_again(file_descriptor *desc, struct t_data *d) { - if (! d->again) { + if (! d->again) return 0; - } - switch (d->command) { - case FILE_WRITE: - MUTEX_LOCK(d->c.writev.q_mtx); - driver_deq(d->c.writev.port, d->c.writev.free_size); - MUTEX_UNLOCK(d->c.writev.q_mtx); - break; - case FILE_PWRITEV: - MUTEX_LOCK(d->c.writev.q_mtx); - driver_deq(d->c.pwritev.port, d->c.pwritev.free_size); - MUTEX_UNLOCK(d->c.writev.q_mtx); - break; - } if (desc->timer_state != timer_idle) { driver_cancel_timer(desc->port); } @@ -2052,10 +2060,9 @@ static struct t_data *async_write(file_descriptor *desc, int *errp, } #endif d->reply = reply; - d->c.writev.free_size = 0; d->c.writev.reply_size = reply_size; d->invoke = invoke_writev; - d->free = free_writev; + d->free = free_data; d->level = 1; cq_enq(desc, d); desc->write_buffered = 0; @@ -2216,6 +2223,49 @@ static int lseek_flush_read(file_descriptor *desc, int *errp } +/********************************************************************* + * Driver entry point -> stop + * The close has to be scheduled on async thread, so that currently active + * async operation does not suddenly have the ground disappearing under their feet... + */ +static void +file_stop(ErlDrvData e) +{ + file_descriptor* desc = (file_descriptor*)e; + + TRACE_C('p'); + + IF_THRDS { + flush_read(desc); + if (desc->fd != FILE_FD_INVALID) { + struct t_data *d = EF_SAFE_ALLOC(sizeof(struct t_data)); + d->command = FILE_CLOSE_ON_PORT_EXIT; + d->reply = !0; + d->fd = desc->fd; + d->flags = desc->flags; + d->invoke = invoke_close; + d->free = free_data; + d->level = 2; + d->data_to_free = (void *) desc; + cq_enq(desc, d); + desc->fd = FILE_FD_INVALID; + desc->flags = 0; + cq_execute(desc); + } else { + EF_FREE(desc); + } + } else { + if (desc->fd != FILE_FD_INVALID) { + do_close(desc->flags, desc->fd); + desc->fd = FILE_FD_INVALID; + desc->flags = 0; + } + if (desc->read_binp) { + driver_free_binary(desc->read_binp); + } + EF_FREE(desc); + } +} /********************************************************************* * Driver entry point -> ready_async @@ -2325,7 +2375,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) desc->write_errInfo = d->errInfo; } } - free_writev(data); + free_data(data); break; case FILE_LSEEK: if (d->reply) { @@ -2348,6 +2398,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) case FILE_RENAME: case FILE_WRITE_INFO: case FILE_FADVISE: + case FILE_FALLOCATE: reply(desc, d->result_ok, &d->errInfo); free_data(data); break; @@ -2373,8 +2424,10 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) if (!d->result_ok) { reply_error(desc, &d->errInfo); } else { + ASSERT(d->is_fd_unused); desc->fd = d->fd; desc->flags = d->flags; + d->is_fd_unused = 0; reply_Uint(desc, d->fd); } free_data(data); @@ -2436,7 +2489,6 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } free_readdir(data); break; - /* See file_stop */ case FILE_CLOSE: if (d->reply) { TRACE_C('K'); @@ -2453,7 +2505,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } else { reply_Uint(desc, d->c.pwritev.n); } - free_pwritev(data); + free_data(data); break; case FILE_PREADV: if (!d->result_ok) { @@ -2488,7 +2540,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) reply_Sint64(desc, d->c.sendfile.written); desc->sendfile_state = not_sending; free_sendfile(data); - } else if (d->result_ok == 1) { // If we are using select to send the rest of the data + } else if (d->result_ok == 1) { /* If we are using select to send the rest of the data */ desc->sendfile_state = sending; desc->d = d; driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, @@ -2496,16 +2548,26 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) } break; #endif + case FILE_CLOSE_ON_PORT_EXIT: + /* See file_stop. However this is never invoked after the port is killed. */ + free_data(data); + EF_FREE(desc); + desc = NULL; + /* This is it for this port, so just send dtrace and return, avoid doing anything to the freed data */ + DTRACE6(efile_drv_return, sched_i1, sched_i2, sched_utag, + command, result_ok, posix_errno); + return; default: abort(); } DTRACE6(efile_drv_return, sched_i1, sched_i2, sched_utag, command, result_ok, posix_errno); - if (desc->write_buffered != 0 && desc->timer_state == timer_idle) { + if (desc->write_buffered != 0 && desc->timer_state == timer_idle ) { desc->timer_state = timer_write; driver_set_timer(desc->port, desc->write_delay); } cq_execute(desc); + } @@ -2745,6 +2807,7 @@ file_output(ErlDrvData e, char* buf, ErlDrvSizeT count) d->invoke = invoke_open; d->free = free_data; d->level = 2; + d->is_fd_unused = 1; goto done; } @@ -2958,6 +3021,20 @@ file_output(ErlDrvData e, char* buf, ErlDrvSizeT count) goto done; } + case FILE_FALLOCATE: + { + d = EF_SAFE_ALLOC(sizeof(struct t_data)); + + d->fd = fd; + d->command = command; + d->invoke = invoke_fallocate; + d->free = free_data; + d->level = 2; + d->c.fallocate.offset = get_int64((uchar*) buf); + d->c.fallocate.length = get_int64(((uchar*) buf) + sizeof(Sint64)); + goto done; + } + } /* @@ -3555,7 +3632,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { #ifdef USE_VM_PROBES dt_i3 = d->c.pwritev.size; #endif - d->c.pwritev.free_size = 0; if (j == 0) { /* Trivial case - nothing to write */ EF_FREE(d); @@ -3579,7 +3655,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { MUTEX_UNLOCK(desc->q_mtx); /* Execute the command */ d->invoke = invoke_pwritev; - d->free = free_pwritev; + d->free = free_data; d->level = 1; cq_enq(desc, d); } @@ -3717,7 +3793,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { res_ev->iov[0].iov_base = res_ev->binv[0]->orig_bytes; /* Fill in the number of buffers in the header */ put_int32(0, res_ev->iov[0].iov_base); - put_int32(n, res_ev->iov[0].iov_base+4); + put_int32(n, (char *)(res_ev->iov[0].iov_base) + 4); /**/ res_ev->size = res_ev->iov[0].iov_len; if (n == 0) { @@ -4018,7 +4094,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { } if (hd_len != 0 || tl_len != 0 || flags != 0) { - // We do not allow header, trailers and/or flags right now + /* We do not allow header, trailers and/or flags right now */ reply_posix_error(desc, EINVAL); goto done; } |