aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/drivers/common/efile_drv.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/drivers/common/efile_drv.c')
-rw-r--r--erts/emulator/drivers/common/efile_drv.c256
1 files changed, 166 insertions, 90 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c
index 912f5d3d8b..595b0488a8 100644
--- a/erts/emulator/drivers/common/efile_drv.c
+++ b/erts/emulator/drivers/common/efile_drv.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2012. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2013. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
@@ -56,7 +56,8 @@
#define FILE_FDATASYNC 30
#define FILE_FADVISE 31
#define FILE_SENDFILE 32
-
+#define FILE_FALLOCATE 33
+#define FILE_CLOSE_ON_PORT_EXIT 34
/* Return codes */
#define FILE_RESP_OK 0
@@ -177,6 +178,7 @@ dt_private *get_dt_private(int);
#define MUTEX_LOCK(m) do { IF_THRDS { TRACE_DRIVER; driver_pdl_lock(m); } } while (0)
#define MUTEX_UNLOCK(m) do { IF_THRDS { TRACE_DRIVER; driver_pdl_unlock(m); } } while (0)
#else
+#define IF_THRDS if (0)
#define MUTEX_INIT(m, p)
#define MUTEX_LOCK(m)
#define MUTEX_UNLOCK(m)
@@ -395,7 +397,6 @@ struct t_pwritev {
ErlDrvPort port;
ErlDrvPDL q_mtx;
size_t size;
- size_t free_size;
unsigned cnt;
unsigned n;
struct t_pbuf_spec specs[1];
@@ -428,6 +429,7 @@ struct t_data
int level;
void (*invoke)(void *);
void (*free)(void *);
+ void *data_to_free; /* used by FILE_CLOSE_ON_PORT_EXIT only */
int again;
int reply;
#ifdef USE_VM_PROBES
@@ -439,6 +441,7 @@ struct t_data
Efile_error errInfo;
int flags;
SWord fd;
+ int is_fd_unused;
/**/
Efile_info info;
EFILE_DIR_HANDLE dir_handle; /* Handle to open directory. */
@@ -458,7 +461,6 @@ struct t_data
ErlDrvPort port;
ErlDrvPDL q_mtx;
size_t size;
- size_t free_size;
size_t reply_size;
} writev;
struct t_pwritev pwritev;
@@ -503,6 +505,10 @@ struct t_data
Uint64 written;
} sendfile;
#endif /* HAVE_SENDFILE */
+ struct {
+ Sint64 offset;
+ Sint64 length;
+ } fallocate;
} c;
char b[1];
};
@@ -781,11 +787,6 @@ file_start(ErlDrvPort port, char* command)
return (ErlDrvData) desc;
}
-static void free_data(void *data)
-{
- EF_FREE(data);
-}
-
static void do_close(int flags, SWord fd) {
if (flags & EFILE_COMPRESSED) {
erts_gzclose((gzFile)(fd));
@@ -803,25 +804,27 @@ static void invoke_close(void *data)
DTRACE_INVOKE_RETURN(FILE_CLOSE);
}
-/*********************************************************************
- * Driver entry point -> stop
- */
-static void
-file_stop(ErlDrvData e)
+static void free_data(void *data)
{
- file_descriptor* desc = (file_descriptor*)e;
-
- TRACE_C('p');
+ struct t_data *d = (struct t_data *) data;
- if (desc->fd != FILE_FD_INVALID) {
- do_close(desc->flags, desc->fd);
- desc->fd = FILE_FD_INVALID;
- desc->flags = 0;
- }
- if (desc->read_binp) {
- driver_free_binary(desc->read_binp);
+ switch (d->command) {
+ case FILE_OPEN:
+ if (d->is_fd_unused && d->fd != FILE_FD_INVALID) {
+ /* This is OK to do in scheduler thread because there can be no async op
+ ongoing for this fd here, as we exited during async open.
+ Ideally, this close should happen in an async thread too, but that would
+ require a substantial rewrite, as we are here because of a dead port and
+ cannot schedule async jobs for that port any more... */
+ do_close(d->flags, d->fd);
+ }
+ break;
+ case FILE_CLOSE_ON_PORT_EXIT:
+ EF_FREE(d->data_to_free);
+ break;
}
- EF_FREE(desc);
+
+ EF_FREE(data);
}
@@ -1144,7 +1147,7 @@ static void invoke_read_line(void *data)
{
struct t_data *d = (struct t_data *) data;
int status;
- size_t read_size;
+ size_t read_size = 0;
int local_loop = (d->again == 0);
DTRACE_INVOKE_SETUP(FILE_READ_LINE);
@@ -1155,7 +1158,14 @@ static void invoke_read_line(void *data)
/* Need more place */
ErlDrvSizeT need = (d->c.read_line.read_size >= DEFAULT_LINEBUF_SIZE) ?
d->c.read_line.read_size + DEFAULT_LINEBUF_SIZE : DEFAULT_LINEBUF_SIZE;
- ErlDrvBinary *newbin = driver_alloc_binary(need);
+ ErlDrvBinary *newbin;
+#if !ALWAYS_READ_LINE_AHEAD
+ /* Use read_ahead size if need does not exceed it */
+ if (need < (d->c.read_line.binp)->orig_size &&
+ d->c.read_line.read_ahead)
+ need = (d->c.read_line.binp)->orig_size;
+#endif
+ newbin = driver_alloc_binary(need);
if (newbin == NULL) {
d->result_ok = 0;
d->errInfo.posix_errno = ENOMEM;
@@ -1334,7 +1344,7 @@ static void invoke_preadv(void *data)
= efile_pread(&d->errInfo,
(int) d->fd,
c->offsets[c->cnt] + c->size,
- ev->iov[1 + c->cnt].iov_base + c->size,
+ ((char *)ev->iov[1 + c->cnt].iov_base) + c->size,
read_size,
&bytes_read))) {
bytes_read_so_far += bytes_read;
@@ -1520,26 +1530,24 @@ static void invoke_writev(void *data) {
}
EF_FREE(iov);
- d->c.writev.free_size = size;
- d->c.writev.size -= size;
if (! d->result_ok) {
d->again = 0;
+ MUTEX_LOCK(d->c.writev.q_mtx);
+ driver_deq(d->c.writev.port, d->c.writev.size);
+ MUTEX_UNLOCK(d->c.writev.q_mtx);
} else {
if (! segment) {
d->again = 0;
}
+ d->c.writev.size -= size;
TRACE_F(("w%lu", (unsigned long)size));
-
+ MUTEX_LOCK(d->c.writev.q_mtx);
+ driver_deq(d->c.writev.port, size);
+ MUTEX_UNLOCK(d->c.writev.q_mtx);
}
- DTRACE_INVOKE_RETURN(FILE_WRITE);
-}
-static void free_writev(void *data) {
- struct t_data *d = data;
- MUTEX_LOCK(d->c.writev.q_mtx);
- driver_deq(d->c.writev.port, d->c.writev.size + d->c.writev.free_size);
- MUTEX_UNLOCK(d->c.writev.q_mtx);
- EF_FREE(d);
+
+ DTRACE_INVOKE_RETURN(FILE_WRITE);
}
static void invoke_pwd(void *data)
@@ -1590,7 +1598,7 @@ static void invoke_pwritev(void *data) {
struct t_pwritev *c = &d->c.pwritev;
size_t p;
int segment;
- size_t size, write_size;
+ size_t size, write_size, written;
DTRACE_INVOKE_SETUP(FILE_PWRITEV);
segment = d->again && c->size >= 2*FILE_SEGMENT_WRITE;
@@ -1610,39 +1618,35 @@ static void invoke_pwritev(void *data) {
if (iovlen < 0)
goto error; /* Port terminated */
- for (iovcnt = 0, c->free_size = 0;
- c->cnt < c->n && iovcnt < iovlen && c->free_size < size;
+ for (iovcnt = 0, written = 0;
+ c->cnt < c->n && iovcnt < iovlen && written < size;
c->cnt++) {
int chop;
write_size = c->specs[c->cnt].size;
if (iov[iovcnt].iov_len - p < write_size) {
- /* Mismatch between pos/size spec and what is queued */
- d->errInfo.posix_errno = EINVAL;
- d->result_ok = 0;
- d->again = 0;
- goto done;
+ goto error;
}
- chop = segment && c->free_size + write_size >= 2*FILE_SEGMENT_WRITE;
+ chop = segment && written + write_size >= 2*FILE_SEGMENT_WRITE;
if (chop) {
- ASSERT(c->free_size < FILE_SEGMENT_WRITE);
+ ASSERT(written < FILE_SEGMENT_WRITE);
write_size = FILE_SEGMENT_WRITE + FILE_SEGMENT_WRITE/2
- - c->free_size;
+ - written;
}
d->result_ok = efile_pwrite(&d->errInfo, (int) d->fd,
- iov[iovcnt].iov_base + p,
+ (char *)(iov[iovcnt].iov_base) + p,
write_size,
c->specs[c->cnt].offset);
if (! d->result_ok) {
d->again = 0;
- goto done;
+ goto deq_error;
}
- c->free_size += write_size;
+ written += write_size;
c->size -= write_size;
if (chop) {
c->specs[c->cnt].offset += write_size;
c->specs[c->cnt].size -= write_size;
/* Schedule out (d->again != 0) */
- goto done;
+ break;
}
/* Move forward in buffer */
p += write_size;
@@ -1664,25 +1668,28 @@ static void invoke_pwritev(void *data) {
d->errInfo.posix_errno = EINVAL;
d->result_ok = 0;
d->again = 0;
+ deq_error:
+ MUTEX_LOCK(d->c.writev.q_mtx);
+ driver_deq(d->c.pwritev.port, c->size);
+ MUTEX_UNLOCK(d->c.writev.q_mtx);
+
+ goto done;
} else {
- ASSERT(c->free_size == size);
+ ASSERT(written == size);
d->again = 0;
}
- }
+ } else
+ ASSERT(written >= FILE_SEGMENT_WRITE);
+
+ MUTEX_LOCK(d->c.writev.q_mtx);
+ driver_deq(d->c.pwritev.port, written);
+ MUTEX_UNLOCK(d->c.writev.q_mtx);
done:
EF_FREE(iov); /* Free our copy of the vector, nothing to restore */
+
DTRACE_INVOKE_RETURN(FILE_PWRITEV);
}
-static void free_pwritev(void *data) {
- struct t_data *d = data;
-
- MUTEX_LOCK(d->c.writev.q_mtx);
- driver_deq(d->c.pwritev.port, d->c.pwritev.free_size + d->c.pwritev.size);
- MUTEX_UNLOCK(d->c.writev.q_mtx);
- EF_FREE(d);
-}
-
static void invoke_flstat(void *data)
{
struct t_data *d = (struct t_data *) data;
@@ -1862,6 +1869,9 @@ static void invoke_open(void *data)
}
d->result_ok = status;
+ if (!status) {
+ d->fd = FILE_FD_INVALID;
+ }
DTRACE_INVOKE_RETURN(FILE_OPEN);
}
@@ -1953,6 +1963,17 @@ static int flush_sendfile(file_descriptor *desc,void *_) {
#endif /* HAVE_SENDFILE */
+static void invoke_fallocate(void *data)
+{
+ struct t_data *d = (struct t_data *) data;
+ int fd = (int) d->fd;
+ Sint64 offset = d->c.fallocate.offset;
+ Sint64 length = d->c.fallocate.length;
+
+ d->again = 0;
+ d->result_ok = efile_fallocate(&d->errInfo, fd, offset, length);
+}
+
static void free_readdir(void *data)
{
struct t_data *d = (struct t_data *) data;
@@ -1982,21 +2003,8 @@ static void try_free_read_bin(file_descriptor *desc) {
static int try_again(file_descriptor *desc, struct t_data *d) {
- if (! d->again) {
+ if (! d->again)
return 0;
- }
- switch (d->command) {
- case FILE_WRITE:
- MUTEX_LOCK(d->c.writev.q_mtx);
- driver_deq(d->c.writev.port, d->c.writev.free_size);
- MUTEX_UNLOCK(d->c.writev.q_mtx);
- break;
- case FILE_PWRITEV:
- MUTEX_LOCK(d->c.writev.q_mtx);
- driver_deq(d->c.pwritev.port, d->c.pwritev.free_size);
- MUTEX_UNLOCK(d->c.writev.q_mtx);
- break;
- }
if (desc->timer_state != timer_idle) {
driver_cancel_timer(desc->port);
}
@@ -2052,10 +2060,9 @@ static struct t_data *async_write(file_descriptor *desc, int *errp,
}
#endif
d->reply = reply;
- d->c.writev.free_size = 0;
d->c.writev.reply_size = reply_size;
d->invoke = invoke_writev;
- d->free = free_writev;
+ d->free = free_data;
d->level = 1;
cq_enq(desc, d);
desc->write_buffered = 0;
@@ -2216,6 +2223,49 @@ static int lseek_flush_read(file_descriptor *desc, int *errp
}
+/*********************************************************************
+ * Driver entry point -> stop
+ * The close has to be scheduled on async thread, so that currently active
+ * async operation does not suddenly have the ground disappearing under their feet...
+ */
+static void
+file_stop(ErlDrvData e)
+{
+ file_descriptor* desc = (file_descriptor*)e;
+
+ TRACE_C('p');
+
+ IF_THRDS {
+ flush_read(desc);
+ if (desc->fd != FILE_FD_INVALID) {
+ struct t_data *d = EF_SAFE_ALLOC(sizeof(struct t_data));
+ d->command = FILE_CLOSE_ON_PORT_EXIT;
+ d->reply = !0;
+ d->fd = desc->fd;
+ d->flags = desc->flags;
+ d->invoke = invoke_close;
+ d->free = free_data;
+ d->level = 2;
+ d->data_to_free = (void *) desc;
+ cq_enq(desc, d);
+ desc->fd = FILE_FD_INVALID;
+ desc->flags = 0;
+ cq_execute(desc);
+ } else {
+ EF_FREE(desc);
+ }
+ } else {
+ if (desc->fd != FILE_FD_INVALID) {
+ do_close(desc->flags, desc->fd);
+ desc->fd = FILE_FD_INVALID;
+ desc->flags = 0;
+ }
+ if (desc->read_binp) {
+ driver_free_binary(desc->read_binp);
+ }
+ EF_FREE(desc);
+ }
+}
/*********************************************************************
* Driver entry point -> ready_async
@@ -2325,7 +2375,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
desc->write_errInfo = d->errInfo;
}
}
- free_writev(data);
+ free_data(data);
break;
case FILE_LSEEK:
if (d->reply) {
@@ -2348,6 +2398,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
case FILE_RENAME:
case FILE_WRITE_INFO:
case FILE_FADVISE:
+ case FILE_FALLOCATE:
reply(desc, d->result_ok, &d->errInfo);
free_data(data);
break;
@@ -2373,8 +2424,10 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
if (!d->result_ok) {
reply_error(desc, &d->errInfo);
} else {
+ ASSERT(d->is_fd_unused);
desc->fd = d->fd;
desc->flags = d->flags;
+ d->is_fd_unused = 0;
reply_Uint(desc, d->fd);
}
free_data(data);
@@ -2436,7 +2489,6 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
}
free_readdir(data);
break;
- /* See file_stop */
case FILE_CLOSE:
if (d->reply) {
TRACE_C('K');
@@ -2453,7 +2505,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
} else {
reply_Uint(desc, d->c.pwritev.n);
}
- free_pwritev(data);
+ free_data(data);
break;
case FILE_PREADV:
if (!d->result_ok) {
@@ -2488,7 +2540,7 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
reply_Sint64(desc, d->c.sendfile.written);
desc->sendfile_state = not_sending;
free_sendfile(data);
- } else if (d->result_ok == 1) { // If we are using select to send the rest of the data
+ } else if (d->result_ok == 1) { /* If we are using select to send the rest of the data */
desc->sendfile_state = sending;
desc->d = d;
driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd,
@@ -2496,16 +2548,26 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data)
}
break;
#endif
+ case FILE_CLOSE_ON_PORT_EXIT:
+ /* See file_stop. However this is never invoked after the port is killed. */
+ free_data(data);
+ EF_FREE(desc);
+ desc = NULL;
+ /* This is it for this port, so just send dtrace and return, avoid doing anything to the freed data */
+ DTRACE6(efile_drv_return, sched_i1, sched_i2, sched_utag,
+ command, result_ok, posix_errno);
+ return;
default:
abort();
}
DTRACE6(efile_drv_return, sched_i1, sched_i2, sched_utag,
command, result_ok, posix_errno);
- if (desc->write_buffered != 0 && desc->timer_state == timer_idle) {
+ if (desc->write_buffered != 0 && desc->timer_state == timer_idle ) {
desc->timer_state = timer_write;
driver_set_timer(desc->port, desc->write_delay);
}
cq_execute(desc);
+
}
@@ -2745,6 +2807,7 @@ file_output(ErlDrvData e, char* buf, ErlDrvSizeT count)
d->invoke = invoke_open;
d->free = free_data;
d->level = 2;
+ d->is_fd_unused = 1;
goto done;
}
@@ -2958,6 +3021,20 @@ file_output(ErlDrvData e, char* buf, ErlDrvSizeT count)
goto done;
}
+ case FILE_FALLOCATE:
+ {
+ d = EF_SAFE_ALLOC(sizeof(struct t_data));
+
+ d->fd = fd;
+ d->command = command;
+ d->invoke = invoke_fallocate;
+ d->free = free_data;
+ d->level = 2;
+ d->c.fallocate.offset = get_int64((uchar*) buf);
+ d->c.fallocate.length = get_int64(((uchar*) buf) + sizeof(Sint64));
+ goto done;
+ }
+
}
/*
@@ -3555,7 +3632,6 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
#ifdef USE_VM_PROBES
dt_i3 = d->c.pwritev.size;
#endif
- d->c.pwritev.free_size = 0;
if (j == 0) {
/* Trivial case - nothing to write */
EF_FREE(d);
@@ -3579,7 +3655,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
MUTEX_UNLOCK(desc->q_mtx);
/* Execute the command */
d->invoke = invoke_pwritev;
- d->free = free_pwritev;
+ d->free = free_data;
d->level = 1;
cq_enq(desc, d);
}
@@ -3717,7 +3793,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
res_ev->iov[0].iov_base = res_ev->binv[0]->orig_bytes;
/* Fill in the number of buffers in the header */
put_int32(0, res_ev->iov[0].iov_base);
- put_int32(n, res_ev->iov[0].iov_base+4);
+ put_int32(n, (char *)(res_ev->iov[0].iov_base) + 4);
/**/
res_ev->size = res_ev->iov[0].iov_len;
if (n == 0) {
@@ -4018,7 +4094,7 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) {
}
if (hd_len != 0 || tl_len != 0 || flags != 0) {
- // We do not allow header, trailers and/or flags right now
+ /* We do not allow header, trailers and/or flags right now */
reply_posix_error(desc, EINVAL);
goto done;
}