aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c4732
1 files changed, 4732 insertions, 0 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
new file mode 100644
index 0000000000..61985271e6
--- /dev/null
+++ b/erts/emulator/beam/io.c
@@ -0,0 +1,4732 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 1996-2009. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * I/O routines for manipulating ports.
+ */
+
+#define ERL_IO_C__
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "sys.h"
+
+/* must be included BEFORE global.h (since it includes erl_driver.h) */
+#include "erl_sys_driver.h"
+
+#include "erl_vm.h"
+#include "global.h"
+#include "erl_process.h"
+#include "dist.h"
+#include "big.h"
+#include "erl_binary.h"
+#include "erl_bits.h"
+#include "erl_version.h"
+#include "error.h"
+
+extern ErlDrvEntry fd_driver_entry;
+extern ErlDrvEntry vanilla_driver_entry;
+extern ErlDrvEntry spawn_driver_entry;
+extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */
+
+erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */
+erts_smp_mtx_t erts_driver_list_lock; /* Mutex for driver list */
+static erts_smp_tsd_key_t driver_list_lock_status_key; /*stop recursive locks when calling
+ driver init */
+static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error on a
+ per thread basis (for BC interfaces) */
+
+Port* erts_port; /* The port table */
+erts_smp_atomic_t erts_ports_alive;
+erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */
+erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */
+
+Uint erts_max_ports;
+Uint erts_port_tab_index_mask;
+
+const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL;
+
+erts_driver_t vanilla_driver;
+erts_driver_t spawn_driver;
+erts_driver_t fd_driver;
+
+static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
+static void terminate_port(Port *p);
+static void pdl_init(void);
+
+static ERTS_INLINE ErlIOQueue*
+drvport2ioq(ErlDrvPort drvport)
+{
+ int ix = (int) drvport;
+ if (ix < 0 || erts_max_ports <= ix)
+ return NULL;
+ if (erts_port[ix].status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ return NULL;
+ ERTS_LC_ASSERT(!erts_port[ix].port_data_lock
+ || erts_lc_mtx_is_locked(&erts_port[ix].port_data_lock->mtx));
+ ERTS_SMP_LC_ASSERT(erts_port[ix].port_data_lock
+ || erts_lc_is_port_locked(&erts_port[ix]));
+ return &erts_port[ix].ioq;
+}
+
+static ERTS_INLINE int
+is_port_ioq_empty(Port *pp)
+{
+ int res;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ if (!pp->port_data_lock)
+ res = (pp->ioq.size == 0);
+ else {
+ ErlDrvPDL pdl = pp->port_data_lock;
+ erts_mtx_lock(&pdl->mtx);
+ res = (pp->ioq.size == 0);
+ erts_mtx_unlock(&pdl->mtx);
+ }
+ return res;
+}
+
+int
+erts_is_port_ioq_empty(Port *pp)
+{
+ return is_port_ioq_empty(pp);
+}
+
+Uint
+erts_port_ioq_size(Port *pp)
+{
+ int res;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ if (!pp->port_data_lock)
+ res = pp->ioq.size;
+ else {
+ ErlDrvPDL pdl = pp->port_data_lock;
+ erts_mtx_lock(&pdl->mtx);
+ res = pp->ioq.size;
+ erts_mtx_unlock(&pdl->mtx);
+ }
+ return (Uint) res;
+}
+
+/*
+ * Line buffered I/O.
+ */
+typedef struct line_buf_context {
+ LineBuf **b;
+ char *buf;
+ int left;
+ int retlen;
+} LineBufContext;
+
+#define LINEBUF_EMPTY 0
+#define LINEBUF_EOL 1
+#define LINEBUF_NOEOL 2
+#define LINEBUF_ERROR -1
+
+#define LINEBUF_STATE(LBC) ((*(LBC).b)->data[0])
+
+#define LINEBUF_DATA(LBC) (((*(LBC).b)->data) + 1)
+#define LINEBUF_DATALEN(LBC) ((LBC).retlen)
+
+#define LINEBUF_INITIAL 100
+
+
+/* The 'number' field in a port now has two parts: the lowest bits
+ contain the index in the port table, and the higher bits are a counter
+ which is incremented each time we look for a free port and start from
+ the beginning of the table. erts_max_ports is the number of file descriptors,
+ rounded up to a power of 2.
+ To get the index from a port, use the macro 'internal_port_index';
+ 'port_number' returns the whole number field.
+*/
+
+static erts_smp_spinlock_t get_free_port_lck;
+static Uint last_port_num;
+static Uint port_num_mask;
+erts_smp_atomic_t erts_ports_snapshot; /* Identifies the _next_ snapshot (not the ongoing) */
+
+
+static ERTS_INLINE void
+kill_port(Port *pp)
+{
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ erts_port_task_free_port(pp);
+ ASSERT(pp->status & ERTS_PORT_SFLGS_DEAD);
+}
+
+#ifdef ERTS_SMP
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+int
+erts_lc_is_port_locked(Port *prt)
+{
+ if (!prt)
+ return 0;
+ return erts_smp_lc_mtx_is_locked(prt->lock);
+}
+#endif
+
+#endif /* #ifdef ERTS_SMP */
+
+static int
+get_free_port(void)
+{
+ Uint num;
+ Uint tries = erts_max_ports;
+ Port* port;
+
+ erts_smp_spin_lock(&get_free_port_lck);
+ num = last_port_num + 1;
+ for (;; ++num) {
+ port = &erts_port[num & erts_port_tab_index_mask];
+
+ erts_smp_port_state_lock(port);
+ if (port->status & ERTS_PORT_SFLG_FREE) {
+ last_port_num = num;
+ erts_smp_spin_unlock(&get_free_port_lck);
+ break;
+ }
+ erts_smp_port_state_unlock(port);
+
+ if (--tries == 0) {
+ erts_smp_spin_unlock(&get_free_port_lck);
+ return -1;
+ }
+ }
+ port->status = ERTS_PORT_SFLG_INITIALIZING;
+#ifdef ERTS_SMP
+ ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&port->refc) == 0);
+ erts_smp_atomic_set(&port->refc, 2); /* Port alive + lock */
+#endif
+ erts_smp_port_state_unlock(port);
+ return num & port_num_mask;
+}
+
+/*
+ * erts_test_next_port() is only used for testing.
+ */
+Sint
+erts_test_next_port(int set, Uint next)
+{
+ Uint i, num;
+ Sint res = -1;
+
+ erts_smp_spin_lock(&get_free_port_lck);
+ if (set) {
+ last_port_num = (next - 1) & port_num_mask;
+ }
+ num = last_port_num + 1;
+
+ for (i=0; i < erts_max_ports && res<0; ++i, ++num) {
+
+ Port* port = &erts_port[num & erts_port_tab_index_mask];
+
+ erts_smp_port_state_lock(port);
+
+ if (port->status & ERTS_PORT_SFLG_FREE) {
+ last_port_num = num - 1;
+ res = num & port_num_mask;
+ }
+ erts_smp_port_state_unlock(port);
+ }
+ erts_smp_spin_unlock(&get_free_port_lck);
+ return res;
+}
+
+void
+erts_port_cleanup(Port *prt)
+{
+#ifdef ERTS_SMP
+ Uint32 port_specific;
+ erts_smp_mtx_t *mtx;
+#endif
+ erts_driver_t *driver;
+
+ erts_smp_port_state_lock(prt);
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ driver = prt->drv_ptr;
+ prt->drv_ptr = NULL;
+ ASSERT(driver);
+
+#ifdef ERTS_SMP
+
+ ASSERT(prt->status & ERTS_PORT_SFLG_FREE_SCHEDULED);
+ ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&prt->refc) == 0);
+
+ port_specific = (prt->status & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK);
+
+ mtx = prt->lock;
+ ASSERT(mtx);
+
+ prt->lock = NULL;
+
+ ASSERT(prt->status & ERTS_PORT_SFLG_PORT_DEBUG);
+ ASSERT(!(prt->status & ERTS_PORT_SFLG_FREE));
+ prt->status = ERTS_PORT_SFLG_FREE;
+
+ erts_smp_port_state_unlock(prt);
+ erts_smp_mtx_unlock(mtx);
+
+ if (port_specific) {
+ erts_smp_mtx_destroy(mtx);
+ erts_free(ERTS_ALC_T_PORT_LOCK, mtx);
+ }
+#endif
+
+ if (driver->handle)
+ erts_ddll_dereference_driver(driver->handle);
+}
+
+
+/*
+** Initialize v_start to point to the small fixed vector.
+** Once (reallocated) we never reset the pointer to the small vector
+** This is a possible optimisation.
+*/
+static void initq(Port* prt)
+{
+ ErlIOQueue* q = &prt->ioq;
+
+ ERTS_LC_ASSERT(!prt->port_data_lock);
+
+ q->size = 0;
+ q->v_head = q->v_tail = q->v_start = q->v_small;
+ q->v_end = q->v_small + SMALL_IO_QUEUE;
+ q->b_head = q->b_tail = q->b_start = q->b_small;
+ q->b_end = q->b_small + SMALL_IO_QUEUE;
+}
+
+static void stopq(Port* prt)
+{
+ ErlIOQueue* q;
+ ErlDrvBinary** binp;
+
+ if (prt->port_data_lock)
+ driver_pdl_lock(prt->port_data_lock);
+
+ q = &prt->ioq;
+ binp = q->b_head;
+
+ if (q->v_start != q->v_small)
+ erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
+
+ while(binp < q->b_tail) {
+ if (*binp != NULL)
+ driver_free_binary(*binp);
+ binp++;
+ }
+ if (q->b_start != q->b_small)
+ erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
+ q->v_start = q->v_end = q->v_head = q->v_tail = NULL;
+ q->b_start = q->b_end = q->b_head = q->b_tail = NULL;
+ q->size = 0;
+
+ if (prt->port_data_lock) {
+ driver_pdl_unlock(prt->port_data_lock);
+ driver_pdl_dec_refc(prt->port_data_lock);
+ prt->port_data_lock = NULL;
+ }
+}
+
+
+
+static void
+setup_port(Port* prt, Eterm pid, erts_driver_t *driver,
+ ErlDrvData drv_data, char *name, Uint32 xstatus)
+{
+ ErtsRunQueue *runq = erts_get_runq_current(NULL);
+ char *new_name, *old_name;
+#ifdef DEBUG
+ /* Make sure the debug flags survives until port is freed */
+ xstatus |= ERTS_PORT_SFLG_PORT_DEBUG;
+#endif
+ ASSERT(runq);
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+
+ new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1);
+ sys_strcpy(new_name, name);
+ erts_smp_runq_lock(runq);
+ erts_smp_atomic_inc(&erts_ports_alive);
+ erts_smp_port_state_lock(prt);
+ prt->status = ERTS_PORT_SFLG_CONNECTED | xstatus;
+ prt->snapshot = (Uint32) erts_smp_atomic_read(&erts_ports_snapshot);
+ old_name = prt->name;
+ prt->name = new_name;
+#ifdef ERTS_SMP
+ erts_smp_atomic_set(&prt->run_queue, (long) runq);
+#endif
+ ASSERT(!prt->drv_ptr);
+ prt->drv_ptr = driver;
+ erts_smp_port_state_unlock(prt);
+ erts_smp_runq_unlock(runq);
+#ifdef ERTS_SMP
+ ASSERT(!prt->xports);
+#endif
+ if (old_name) {
+ erts_free(ERTS_ALC_T_PORT_NAME, (void *) old_name);
+ }
+
+ prt->control_flags = 0;
+ prt->connected = pid;
+ prt->drv_data = (long) drv_data;
+ prt->bytes_in = 0;
+ prt->bytes_out = 0;
+ prt->dist_entry = NULL;
+ prt->reg = NULL;
+#ifdef ERTS_SMP
+ prt->ptimer = NULL;
+#else
+ sys_memset(&prt->tm, 0, sizeof(ErlTimer));
+#endif
+ erts_port_task_handle_init(&prt->timeout_task);
+ prt->suspended = NULL;
+ sys_strcpy(prt->name, name);
+ prt->nlinks = NULL;
+ prt->monitors = NULL;
+ prt->linebuf = NULL;
+ prt->bp = NULL;
+ prt->data = am_undefined;
+ /* Set default tracing */
+ erts_get_default_tracing(&(prt->trace_flags), &(prt->tracer_proc));
+
+ prt->psd = NULL;
+
+ initq(prt);
+}
+
+void
+erts_wake_process_later(Port *prt, Process *process)
+{
+ ErtsProcList** p;
+ ErtsProcList* new_p;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if (prt->status & ERTS_PORT_SFLGS_DEAD)
+ return;
+
+ for (p = &(prt->suspended); *p != NULL; p = &((*p)->next))
+ /* Empty loop body */;
+
+ new_p = erts_proclist_create(process);
+ new_p->next = NULL;
+ *p = new_p;
+}
+
+/*
+ Opens a driver.
+ Returns the non-negative port number, if successful.
+ If there is an error, -1 or -2 or -3 is returned. -2 means that
+ there is valid error information in *error_number_ptr.
+ Returning -3 means that an error in the given options was detected
+ (*error_number_ptr must contain either BADARG or SYSTEM_LIMIT).
+ The driver start function must obey the same conventions.
+*/
+int
+erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
+ Eterm pid, /* Current process. */
+ char* name, /* Driver name. */
+ SysDriverOpts* opts, /* Options. */
+ int *error_number_ptr) /* errno in case -2 is returned */
+{
+ int port_num;
+ int port_ix;
+ ErlDrvData drv_data = 0;
+ Uint32 xstatus = 0;
+ Port *port;
+ int fpe_was_unmasked;
+
+ if (error_number_ptr)
+ *error_number_ptr = 0;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if ((port_num = get_free_port()) < 0) {
+ if (error_number_ptr) {
+ *error_number_ptr = SYSTEM_LIMIT;
+ }
+ return -3;
+ }
+
+ port_ix = port_num & erts_port_tab_index_mask;
+ port = &erts_port[port_ix];
+ port->id = make_internal_port(port_num);
+
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ if (!driver) {
+ for (driver = driver_list; driver; driver = driver->next) {
+ if (sys_strcmp(driver->name, name) == 0)
+ break;
+ }
+ if (!driver) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ if (error_number_ptr)
+ *error_number_ptr = BADARG;
+ return -3;
+ }
+ }
+ if (driver == &spawn_driver) {
+ char *p;
+ erts_driver_t *d;
+
+ /*
+ * Dig out the name of the driver or port program.
+ */
+
+ if (!(opts->spawn_type & ERTS_SPAWN_EXECUTABLE)) {
+ /* No spawn driver default */
+ driver = NULL;
+ }
+
+
+ if (opts->spawn_type != ERTS_SPAWN_EXECUTABLE) {
+ p = name;
+ while(*p != '\0' && *p != ' ')
+ p++;
+ if (*p == '\0')
+ p = NULL;
+ else
+ *p = '\0';
+
+ /*
+ * Search for a driver having this name. Defaults to spawn_driver
+ * if not found.
+ */
+
+ for (d = driver_list; d; d = d->next) {
+ if (strcmp(d->name, name) == 0 &&
+ erts_ddll_driver_ok(d->handle)) {
+ driver = d;
+ break;
+ }
+ }
+ if (p != NULL)
+ *p = ' ';
+ }
+ }
+
+ if (driver == NULL || (driver != &spawn_driver && opts->exit_status)) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ if (error_number_ptr) {
+ *error_number_ptr = BADARG;
+ }
+ /* Need to mark the port as free again */
+ erts_smp_port_state_lock(port);
+ port->status = ERTS_PORT_SFLG_FREE;
+#ifdef ERTS_SMP
+ ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&port->refc) == 2);
+ erts_smp_atomic_set(&port->refc, 0);
+#endif
+ erts_smp_port_state_unlock(port);
+ return -3;
+ }
+
+ /*
+ * We'll set up the port before calling the start function,
+ * to allow message sending and setting timers in the start function.
+ */
+
+#ifdef ERTS_SMP
+ ASSERT(!port->lock);
+ port->lock = driver->lock;
+ if (!port->lock) {
+ port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK,
+ sizeof(erts_smp_mtx_t));
+ erts_smp_mtx_init_x(port->lock,
+ "port_lock",
+ port->id);
+ xstatus |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
+ }
+#endif
+
+ if (driver->handle != NULL) {
+ erts_ddll_increment_port_count(driver->handle);
+ erts_ddll_reference_driver(driver->handle);
+ }
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+
+#ifdef ERTS_SMP
+ erts_smp_mtx_lock(port->lock);
+#endif
+
+ setup_port(port, pid, driver, drv_data, name, xstatus);
+
+ if (IS_TRACED_FL(port, F_TRACE_PORTS)) {
+ trace_port_open(port,
+ pid,
+ am_atom_put(port->name, strlen(port->name)));
+ }
+
+ if (driver->start) {
+ if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
+ trace_sched_ports_where(port, am_in, am_start);
+ }
+ port->caller = pid;
+ fpe_was_unmasked = erts_block_fpe();
+ drv_data = (*driver->start)((ErlDrvPort)(port_ix),
+ name, opts);
+ erts_unblock_fpe(fpe_was_unmasked);
+ port->caller = NIL;
+ erts_unblock_fpe(fpe_was_unmasked);
+ if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
+ trace_sched_ports_where(port, am_out, am_start);
+ }
+ if (error_number_ptr && ((long) drv_data) == (long) -2)
+ *error_number_ptr = errno;
+#ifdef ERTS_SMP
+ if (port->xports)
+ erts_smp_xports_unlock(port);
+ ASSERT(!port->xports);
+#endif
+ }
+
+ if (((long)drv_data) == -1 ||
+ ((long)drv_data) == -2 ||
+ ((long)drv_data) == -3) {
+ int res = (int) ((long) drv_data);
+
+ if (res == -3 && error_number_ptr) {
+ *error_number_ptr = BADARG;
+ }
+
+ /*
+ * Must clean up the port.
+ */
+#ifdef ERTS_SMP
+ erts_cancel_smp_ptimer(port->ptimer);
+#else
+ erl_cancel_timer(&(port->tm));
+#endif
+ stopq(port);
+ kill_port(port);
+ if (port->linebuf != NULL) {
+ erts_free(ERTS_ALC_T_LINEBUF,
+ (void *) port->linebuf);
+ port->linebuf = NULL;
+ }
+ if (driver->handle != NULL) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ erts_ddll_decrement_port_count(driver->handle);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+ erts_port_release(port);
+ return res;
+ }
+ port->drv_data = (long) drv_data;
+ return port_ix;
+}
+
+#ifdef ERTS_SMP
+
+struct ErtsXPortsList_ {
+ ErtsXPortsList *next;
+ Port *port;
+};
+
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(xports_list, ErtsXPortsList, 50, ERTS_ALC_T_XPORTS_LIST)
+
+#endif
+
+/*
+ * Driver function to create new instances of a driver
+ * Historical reason: to be used with inet_drv for creating
+ * accept sockets inorder to avoid a global table.
+ */
+ErlDrvPort
+driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
+ ErlDrvTermData pid, /* Owner/Caller */
+ char* name, /* Driver name */
+ ErlDrvData drv_data) /* Driver data */
+{
+ Port *creator_port;
+ Port* port;
+ erts_driver_t *driver;
+ Process *rp;
+ int port_num;
+ Eterm port_id;
+ Uint32 xstatus = 0;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ creator_port = erts_drvport2port(creator_port_ix);
+ if (!creator_port)
+ return (ErlDrvTermData) -1;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(creator_port));
+
+ driver = creator_port->drv_ptr;
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ if (!erts_ddll_driver_ok(driver->handle)) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ return (ErlDrvTermData) -1;
+ }
+
+ rp = erts_pid2proc(NULL, 0, pid, ERTS_PROC_LOCK_LINK);
+ if (!rp) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ return (ErlDrvTermData) -1; /* pid does not exist */
+ }
+ if ((port_num = get_free_port()) < 0) {
+ errno = ENFILE;
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ return (ErlDrvTermData) -1;
+ }
+
+ port_id = make_internal_port(port_num);
+ port = &erts_port[port_num & erts_port_tab_index_mask];
+
+#ifdef ERTS_SMP
+ ASSERT(!port->lock);
+ port->lock = driver->lock;
+ if (!port->lock) {
+ ErtsXPortsList *xplp = xports_list_alloc();
+ xplp->port = port;
+ xplp->next = creator_port->xports;
+ creator_port->xports = xplp;
+ port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK,
+ sizeof(erts_smp_mtx_t));
+ erts_smp_mtx_init_locked_x(port->lock, "port_lock", port_id);
+ xstatus |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
+ }
+
+#endif
+
+ if (driver->handle != NULL) {
+ erts_ddll_increment_port_count(driver->handle);
+ erts_ddll_reference_referenced_driver(driver->handle);
+ }
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port));
+
+ setup_port(port, pid, driver, drv_data, name, xstatus);
+ port->id = port_id;
+
+ erts_add_link(&(port->nlinks), LINK_PID, pid);
+ erts_add_link(&(rp->nlinks), LINK_PID, port_id);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ return port_num & erts_port_tab_index_mask;
+}
+
+#ifdef ERTS_SMP
+void
+erts_smp_xports_unlock(Port *prt)
+{
+ ErtsXPortsList *xplp;
+
+ ASSERT(prt);
+ xplp = prt->xports;
+ ASSERT(xplp);
+ while (xplp) {
+ ErtsXPortsList *free_xplp;
+ if (xplp->port->xports)
+ erts_smp_xports_unlock(xplp->port);
+ erts_port_release(xplp->port);
+ free_xplp = xplp;
+ xplp = xplp->next;
+ xports_list_free(free_xplp);
+ }
+ prt->xports = NULL;
+}
+#endif
+
+/* Fills a possibly deep list of chars and binaries into vec
+** Small characters are first stored in the buffer buf of length ln
+** binaries found are copied and linked into msoh
+** Return vector length on succsess,
+** -1 on overflow
+** -2 on type error
+*/
+
+#define SET_VEC(iov, bv, bin, ptr, len, vlen) do { \
+ (iov)->iov_base = (ptr); \
+ (iov)->iov_len = (len); \
+ *(bv)++ = (bin); \
+ (iov)++; \
+ (vlen)++; \
+} while(0)
+
+static int
+io_list_to_vec(Eterm obj, /* io-list */
+ SysIOVec* iov, /* io vector */
+ ErlDrvBinary** binv, /* binary reference vector */
+ ErlDrvBinary* cbin, /* binary to store characters */
+ int bin_limit) /* small binaries limit */
+{
+ DECLARE_ESTACK(s);
+ Eterm* objp;
+ char *buf = cbin->orig_bytes;
+ int len = cbin->orig_size;
+ int csize = 0;
+ int vlen = 0;
+ char* cptr = buf;
+
+ goto L_jump_start; /* avoid push */
+
+ while (!ESTACK_ISEMPTY(s)) {
+ obj = ESTACK_POP(s);
+ L_jump_start:
+ if (is_list(obj)) {
+ L_iter_list:
+ objp = list_val(obj);
+ obj = CAR(objp);
+ if (is_byte(obj)) {
+ if (len == 0)
+ goto L_overflow;
+ *buf++ = unsigned_val(obj);
+ csize++;
+ len--;
+ } else if (is_binary(obj)) {
+ ESTACK_PUSH(s, CDR(objp));
+ goto handle_binary;
+ } else if (is_list(obj)) {
+ ESTACK_PUSH(s, CDR(objp));
+ goto L_iter_list; /* on head */
+ } else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ obj = CDR(objp);
+ if (is_list(obj))
+ goto L_iter_list; /* on tail */
+ else if (is_binary(obj)) {
+ goto handle_binary;
+ } else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ } else if (is_binary(obj)) {
+ Eterm real_bin;
+ Uint offset;
+ Eterm* bptr;
+ int size;
+ int bitoffs;
+ int bitsize;
+
+ handle_binary:
+ size = binary_size(obj);
+ ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
+ ASSERT(bitsize == 0);
+ bptr = binary_val(real_bin);
+ if (*bptr == HEADER_PROC_BIN) {
+ ProcBin* pb = (ProcBin *) bptr;
+ if (bitoffs != 0) {
+ if (len < size) {
+ goto L_overflow;
+ }
+ erts_copy_bits(pb->bytes+offset, bitoffs, 1,
+ (byte *) buf, 0, 1, size*8);
+ csize += size;
+ buf += size;
+ len -= size;
+ } else if (bin_limit && size < bin_limit) {
+ if (len < size) {
+ goto L_overflow;
+ }
+ sys_memcpy(buf, pb->bytes+offset, size);
+ csize += size;
+ buf += size;
+ len -= size;
+ } else {
+ if (csize != 0) {
+ SET_VEC(iov, binv, cbin, cptr, csize, vlen);
+ cptr = buf;
+ csize = 0;
+ }
+ if (pb->flags) {
+ erts_emasculate_writable_binary(pb);
+ }
+ SET_VEC(iov, binv, Binary2ErlDrvBinary(pb->val),
+ pb->bytes+offset, size, vlen);
+ }
+ } else {
+ ErlHeapBin* hb = (ErlHeapBin *) bptr;
+ if (len < size) {
+ goto L_overflow;
+ }
+ copy_binary_to_buffer(buf, 0,
+ ((byte *) hb->data)+offset, bitoffs,
+ 8*size);
+ csize += size;
+ buf += size;
+ len -= size;
+ }
+ } else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ }
+
+ if (csize != 0) {
+ SET_VEC(iov, binv, cbin, cptr, csize, vlen);
+ }
+
+ DESTROY_ESTACK(s);
+ return vlen;
+
+ L_type_error:
+ DESTROY_ESTACK(s);
+ return -2;
+
+ L_overflow:
+ DESTROY_ESTACK(s);
+ return -1;
+}
+
+#define IO_LIST_VEC_COUNT(obj) \
+do { \
+ int _size = binary_size(obj); \
+ Eterm _real; \
+ Uint _offset; \
+ int _bitoffs; \
+ int _bitsize; \
+ ERTS_GET_REAL_BIN(obj, _real, _offset, _bitoffs, _bitsize); \
+ ASSERT(_bitsize == 0); \
+ if (thing_subtag(*binary_val(_real)) == REFC_BINARY_SUBTAG && \
+ _bitoffs == 0) { \
+ b_size += _size; \
+ in_clist = 0; \
+ v_size++; \
+ if (_size >= bin_limit) { \
+ p_in_clist = 0; \
+ p_v_size++; \
+ } else { \
+ p_c_size += _size; \
+ if (!p_in_clist) { \
+ p_in_clist = 1; \
+ p_v_size++; \
+ } \
+ } \
+ } else { \
+ c_size += _size; \
+ if (!in_clist) { \
+ in_clist = 1; \
+ v_size++; \
+ } \
+ p_c_size += _size; \
+ if (!p_in_clist) { \
+ p_in_clist = 1; \
+ p_v_size++; \
+ } \
+ } \
+} while (0)
+
+
+/*
+** Size of a io list in bytes
+** return -1 if error
+** returns: - Total size of io list
+** vsize - SysIOVec size needed for a writev
+** csize - Number of bytes not in binary (in the common binary)
+** pvsize - SysIOVec size needed if packing small binaries
+** pcsize - Number of bytes in the common binary if packing
+*/
+
+static int
+io_list_vec_len(Eterm obj, int* vsize, int* csize,
+ int bin_limit, /* small binaries limit */
+ int * pvsize, int * pcsize)
+{
+ DECLARE_ESTACK(s);
+ Eterm* objp;
+ int v_size = 0;
+ int c_size = 0;
+ int b_size = 0;
+ int in_clist = 0;
+ int p_v_size = 0;
+ int p_c_size = 0;
+ int p_in_clist = 0;
+
+ goto L_jump_start; /* avoid a push */
+
+ while (!ESTACK_ISEMPTY(s)) {
+ obj = ESTACK_POP(s);
+ L_jump_start:
+ if (is_list(obj)) {
+ L_iter_list:
+ objp = list_val(obj);
+ obj = CAR(objp);
+
+ if (is_byte(obj)) {
+ c_size++;
+ if (!in_clist) {
+ in_clist = 1;
+ v_size++;
+ }
+ p_c_size++;
+ if (!p_in_clist) {
+ p_in_clist = 1;
+ p_v_size++;
+ }
+ }
+ else if (is_binary(obj)) {
+ IO_LIST_VEC_COUNT(obj);
+ }
+ else if (is_list(obj)) {
+ ESTACK_PUSH(s, CDR(objp));
+ goto L_iter_list; /* on head */
+ }
+ else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+
+ obj = CDR(objp);
+ if (is_list(obj))
+ goto L_iter_list; /* on tail */
+ else if (is_binary(obj)) { /* binary tail is OK */
+ IO_LIST_VEC_COUNT(obj);
+ }
+ else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ }
+ else if (is_binary(obj)) {
+ IO_LIST_VEC_COUNT(obj);
+ }
+ else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ }
+
+ DESTROY_ESTACK(s);
+ if (vsize != NULL)
+ *vsize = v_size;
+ if (csize != NULL)
+ *csize = c_size;
+ if (pvsize != NULL)
+ *pvsize = p_v_size;
+ if (pcsize != NULL)
+ *pcsize = p_c_size;
+ return c_size + b_size;
+
+ L_type_error:
+ DESTROY_ESTACK(s);
+ return -1;
+}
+
+#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT)
+#define SMALL_WRITE_VEC 16
+
+
+/* write data to a port */
+int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
+{
+ char *buf;
+ erts_driver_t *drv = p->drv_ptr;
+ int size;
+ int fpe_was_unmasked;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ p->caller = caller_id;
+ if (drv->outputv != NULL) {
+ int vsize;
+ int csize;
+ int pvsize;
+ int pcsize;
+ int blimit;
+ SysIOVec iv[SMALL_WRITE_VEC];
+ ErlDrvBinary* bv[SMALL_WRITE_VEC];
+ SysIOVec* ivp;
+ ErlDrvBinary** bvp;
+ ErlDrvBinary* cbin;
+ ErlIOVec ev;
+
+ if ((size = io_list_vec_len(list, &vsize, &csize,
+ ERL_SMALL_IO_BIN_LIMIT,
+ &pvsize, &pcsize)) < 0) {
+ goto bad_value;
+ }
+ /* To pack or not to pack (small binaries) ...? */
+ vsize++;
+ if (vsize <= SMALL_WRITE_VEC) {
+ /* Do NOT pack */
+ blimit = 0;
+ } else {
+ /* Do pack */
+ vsize = pvsize + 1;
+ csize = pcsize;
+ blimit = ERL_SMALL_IO_BIN_LIMIT;
+ }
+ /* Use vsize and csize from now on */
+ if (vsize <= SMALL_WRITE_VEC) {
+ ivp = iv;
+ bvp = bv;
+ } else {
+ ivp = (SysIOVec *) erts_alloc(ERTS_ALC_T_TMP,
+ vsize * sizeof(SysIOVec));
+ bvp = (ErlDrvBinary**) erts_alloc(ERTS_ALC_T_TMP,
+ vsize * sizeof(ErlDrvBinary*));
+ }
+ cbin = driver_alloc_binary(csize);
+ if (!cbin)
+ erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, sizeof(Binary) + csize);
+
+ /* Element 0 is for driver usage to add header block */
+ ivp[0].iov_base = NULL;
+ ivp[0].iov_len = 0;
+ bvp[0] = NULL;
+ ev.vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
+ ev.vsize++;
+#if 0
+ /* This assertion may say something useful, but it can
+ be falsified during the emulator test suites. */
+ ASSERT((ev.vsize >= 0) && (ev.vsize == vsize));
+#endif
+ ev.size = size; /* total size */
+ ev.iov = ivp;
+ ev.binv = bvp;
+ fpe_was_unmasked = erts_block_fpe();
+ (*drv->outputv)((ErlDrvData)p->drv_data, &ev);
+ erts_unblock_fpe(fpe_was_unmasked);
+ if (ivp != iv) {
+ erts_free(ERTS_ALC_T_TMP, (void *) ivp);
+ }
+ if (bvp != bv) {
+ erts_free(ERTS_ALC_T_TMP, (void *) bvp);
+ }
+ driver_free_binary(cbin);
+ } else {
+ int r;
+
+ /* Try with an 8KB buffer first (will often be enough I guess). */
+ size = 8*1024;
+ /* See below why the extra byte is added. */
+ buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
+ r = io_list_to_buf(list, buf, size);
+
+ if (r >= 0) {
+ size -= r;
+ fpe_was_unmasked = erts_block_fpe();
+ (*drv->output)((ErlDrvData)p->drv_data, buf, size);
+ erts_unblock_fpe(fpe_was_unmasked);
+ erts_free(ERTS_ALC_T_TMP, buf);
+ }
+ else if (r == -2) {
+ erts_free(ERTS_ALC_T_TMP, buf);
+ goto bad_value;
+ }
+ else {
+ ASSERT(r == -1); /* Overflow */
+ erts_free(ERTS_ALC_T_TMP, buf);
+ if ((size = io_list_len(list)) < 0) {
+ goto bad_value;
+ }
+
+ /*
+ * I know drivers that pad space with '\0' this is clearly
+ * incorrect but I don't feel like fixing them now, insted
+ * add ONE extra byte.
+ */
+ buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
+ r = io_list_to_buf(list, buf, size);
+ fpe_was_unmasked = erts_block_fpe();
+ (*drv->output)((ErlDrvData)p->drv_data, buf, size);
+ erts_unblock_fpe(fpe_was_unmasked);
+ erts_free(ERTS_ALC_T_TMP, buf);
+ }
+ }
+ p->bytes_out += size;
+ erts_smp_atomic_add(&erts_bytes_out, size);
+
+#ifdef ERTS_SMP
+ if (p->xports)
+ erts_smp_xports_unlock(p);
+ ASSERT(!p->xports);
+#endif
+ p->caller = NIL;
+ return 0;
+
+ bad_value:
+ p->caller = NIL;
+ {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ erts_dsprintf(dsbufp, "Bad value on output port '%s'\n", p->name);
+ erts_send_error_to_logger_nogl(dsbufp);
+ return 1;
+ }
+}
+
+/* initialize the port array */
+void init_io(void)
+{
+ int i;
+ ErlDrvEntry** dp;
+ ErlDrvEntry* drv;
+ char maxports[21]; /* enough for any 64-bit integer */
+ size_t maxportssize = sizeof(maxports);
+ Uint ports_bits = ERTS_PORTS_BITS;
+ Sint port_extra_shift;
+
+#ifdef ERTS_SMP
+ init_xports_list_alloc();
+#endif
+
+ pdl_init();
+
+ if (erts_sys_getenv("ERL_MAX_PORTS", maxports, &maxportssize) == 0)
+ erts_max_ports = atoi(maxports);
+ else
+ erts_max_ports = sys_max_files();
+
+ if (erts_max_ports > ERTS_MAX_PORTS)
+ erts_max_ports = ERTS_MAX_PORTS;
+ if (erts_max_ports < 1024)
+ erts_max_ports = 1024;
+
+ if (erts_use_r9_pids_ports) {
+ ports_bits = ERTS_R9_PORTS_BITS;
+ if (erts_max_ports > ERTS_MAX_R9_PORTS)
+ erts_max_ports = ERTS_MAX_R9_PORTS;
+ }
+
+ port_extra_shift = erts_fit_in_bits(erts_max_ports - 1);
+ port_num_mask = (1 << ports_bits) - 1;
+
+ erts_port_tab_index_mask = ~(~((Uint) 0) << port_extra_shift);
+ erts_max_ports = 1 << port_extra_shift;
+
+ erts_smp_mtx_init(&erts_driver_list_lock,"driver_list");
+ driver_list = NULL;
+ erts_smp_tsd_key_create(&driver_list_lock_status_key);
+ erts_smp_tsd_key_create(&driver_list_last_error_key);
+
+ if (erts_max_ports * sizeof(Port) <= erts_max_ports) {
+ /* More memory needed than the whole address space. */
+ erts_alloc_enomem(ERTS_ALC_T_PORT_TABLE, ~((Uint) 0));
+ }
+
+ erts_port = (Port *) erts_alloc(ERTS_ALC_T_PORT_TABLE,
+ erts_max_ports * sizeof(Port));
+
+ erts_smp_atomic_init(&erts_bytes_out, 0);
+ erts_smp_atomic_init(&erts_bytes_in, 0);
+ erts_smp_atomic_init(&erts_ports_alive, 0);
+
+ for (i = 0; i < erts_max_ports; i++) {
+ erts_port_task_init_sched(&erts_port[i].sched);
+#ifdef ERTS_SMP
+ erts_smp_atomic_init(&erts_port[i].refc, 0);
+ erts_port[i].lock = NULL;
+ erts_port[i].xports = NULL;
+ erts_smp_spinlock_init(&erts_port[i].state_lck, "port_state");
+#endif
+ erts_port[i].tracer_proc = NIL;
+ erts_port[i].trace_flags = 0;
+
+ erts_port[i].drv_ptr = NULL;
+ erts_port[i].status = ERTS_PORT_SFLG_FREE;
+ erts_port[i].name = NULL;
+ erts_port[i].nlinks = NULL;
+ erts_port[i].monitors = NULL;
+ erts_port[i].linebuf = NULL;
+ erts_port[i].port_data_lock = NULL;
+ }
+
+ erts_smp_atomic_init(&erts_ports_snapshot, (long) 0);
+ last_port_num = 0;
+ erts_smp_spinlock_init(&get_free_port_lck, "get_free_port");
+
+ sys_init_io();
+
+ erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1);
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+
+ init_driver(&fd_driver, &fd_driver_entry, NULL);
+ init_driver(&vanilla_driver, &vanilla_driver_entry, NULL);
+ init_driver(&spawn_driver, &spawn_driver_entry, NULL);
+ for (dp = driver_tab; *dp != NULL; dp++) {
+ drv = *dp;
+ erts_add_driver_entry(*dp, NULL, 1);
+ }
+
+ erts_smp_tsd_set(driver_list_lock_status_key, NULL);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+}
+
+/*
+ * Buffering of data when using line oriented I/O on ports
+ */
+
+/*
+ * Buffer states
+ */
+#define LINEBUF_MAIN 0
+#define LINEBUF_FULL 1
+#define LINEBUF_CR_INSIDE 2
+#define LINEBUF_CR_AFTER 3
+
+/*
+ * Creates a LineBuf to be added to the port structure,
+ * Returns: Pointer to a newly allocated and initialized LineBuf.
+ * Parameters:
+ * bufsiz - The (maximum) size of the line buffer.
+ */
+LineBuf *allocate_linebuf(bufsiz)
+int bufsiz;
+{
+ int ovsiz = (bufsiz < LINEBUF_INITIAL) ? bufsiz : LINEBUF_INITIAL;
+ LineBuf *lb = (LineBuf *) erts_alloc(ERTS_ALC_T_LINEBUF,
+ sizeof(LineBuf)+ovsiz);
+ lb->ovsiz = ovsiz;
+ lb->bufsiz = bufsiz;
+ lb->ovlen = 0;
+ lb->data[0] = LINEBUF_MAIN; /* state */
+ return lb;
+}
+
+/*
+ * Initializes a LineBufContext to be used in calls to read_linebuf
+ * or flush_linebuf.
+ * Returns: 0 if ok, <0 on error.
+ * Parameters:
+ * lc - Pointer to an allocated LineBufContext.
+ * lb - Pointer to a LineBuf structure (probably from the Port structure).
+ * buf - A buffer containing the data to be read and split to lines.
+ * len - The number of bytes in buf.
+ */
+static int init_linebuf_context(LineBufContext *lc, LineBuf **lb, char *buf, int len)
+{
+ if(lc == NULL || lb == NULL)
+ return -1;
+ lc->b = lb;
+ lc->buf = buf;
+ lc->left = len;
+ return 0;
+}
+
+static void resize_linebuf(LineBuf **b)
+{
+ int newsiz = (((*b)->ovsiz * 2) > (*b)->bufsiz) ? (*b)->bufsiz :
+ (*b)->ovsiz * 2;
+ *b = (LineBuf *) erts_realloc(ERTS_ALC_T_LINEBUF,
+ (void *) *b,
+ sizeof(LineBuf)+newsiz);
+ (*b)->ovsiz = newsiz;
+}
+
+/*
+ * Delivers all data in the buffer regardless of newlines (always
+ * an LINEBUF_NOEOL. Has to be called until it return LINEBUF_EMPTY.
+ * Return values and barameters as read_linebuf (see below).
+ */
+static int flush_linebuf(LineBufContext *bp)
+{
+ bp->retlen = (*bp->b)->ovlen;
+ switch(LINEBUF_STATE(*bp)){
+ case LINEBUF_CR_INSIDE:
+ if((*bp->b)->ovlen >= (*bp->b)->ovsiz)
+ resize_linebuf(bp->b);
+ LINEBUF_DATA(*bp)[((*bp->b)->ovlen)++] = '\r';
+ ++bp->retlen; /* fall through instead of switching state... */
+ case LINEBUF_MAIN:
+ case LINEBUF_FULL:
+ (*bp->b)->ovlen = 0;
+ LINEBUF_STATE(*bp) = LINEBUF_MAIN;
+ if(!bp->retlen)
+ return LINEBUF_EMPTY;
+ return LINEBUF_NOEOL;
+ case LINEBUF_CR_AFTER:
+ LINEBUF_STATE(*bp) = LINEBUF_CR_INSIDE;
+ (*bp->b)->ovlen = 0;
+ if(!bp->retlen)
+ return LINEBUF_EMPTY;
+ return LINEBUF_NOEOL;
+ default:
+ return LINEBUF_ERROR;
+ }
+}
+
+/*
+ * Reads input from a buffer and "chops" it up in lines.
+ * Has to be called repeatedly until it returns LINEBUF_EMPTY
+ * to get all lines in buffer.
+ * Handles both <LF> and <CR><LF> style newlines.
+ * On Unix, this is slightly incorrect, as <CR><LF> is NOT to be regarded
+ * as a newline together, but i treat newlines equally in all systems
+ * to avoid putting this in sys.c or clutter it with #ifdef's.
+ * Returns: LINEBUF_EMPTY if there is no more data that can be
+ * determined as a line (only part of a line left), LINEBUF_EOL if a whole
+ * line could be delivered and LINEBUF_NOEOL if the buffer size has been
+ * exceeded. The data and the data length can be accesed through the
+ * LINEBUF_DATA and the LINEBUF_DATALEN macros applied to the LineBufContext.
+ * Parameters:
+ * bp - A LineBufContext that is initialized with
+ * the init_linebuf_context call. The context has to be retained during
+ * all calls that returns other than LINEBUF_EMPTY. When LINEBUF_EMPTY
+ * is returned the context can be discarded and a new can be created when new
+ * data arrives (the state is saved in the Port structure).
+ */
+static int read_linebuf(LineBufContext *bp)
+{
+ for(;;){
+ if(bp->left == 0)
+ return LINEBUF_EMPTY;
+ if(*bp->buf == '\n'){
+ LINEBUF_STATE(*bp) = LINEBUF_MAIN;
+ ++(bp->buf);
+ --(bp->left);
+ bp->retlen = (*bp->b)->ovlen;
+ (*bp->b)->ovlen = 0;
+ return LINEBUF_EOL;
+ }
+ switch(LINEBUF_STATE(*bp)){
+ case LINEBUF_MAIN:
+ if((*bp->b)->ovlen == (*bp->b)->bufsiz)
+ LINEBUF_STATE(*bp) = LINEBUF_FULL;
+ else if(*bp->buf == '\r'){
+ ++(bp->buf);
+ --(bp->left);
+ LINEBUF_STATE(*bp) = LINEBUF_CR_INSIDE;
+ } else {
+ if((*bp->b)->ovlen >= (*bp->b)->ovsiz)
+ resize_linebuf(bp->b);
+ LINEBUF_DATA(*bp)[((*bp->b)->ovlen)++] = *((bp->buf)++);
+ --(bp->left);
+ }
+ continue;
+ case LINEBUF_FULL:
+ if(*bp->buf == '\r'){
+ ++(bp->buf);
+ --(bp->left);
+ LINEBUF_STATE(*bp) = LINEBUF_CR_AFTER;
+ } else {
+ bp->retlen = (*bp->b)->ovlen;
+ (*bp->b)->ovlen = 0;
+ LINEBUF_STATE(*bp) = LINEBUF_MAIN;
+ return LINEBUF_NOEOL;
+ }
+ continue;
+ case LINEBUF_CR_INSIDE:
+ if((*bp->b)->ovlen >= (*bp->b)->ovsiz)
+ resize_linebuf(bp->b);
+ LINEBUF_DATA(*bp)[((*bp->b)->ovlen)++] = '\r';
+ LINEBUF_STATE(*bp) = LINEBUF_MAIN;
+ continue;
+ case LINEBUF_CR_AFTER:
+ bp->retlen = (*bp->b)->ovlen;
+ (*bp->b)->ovlen = 0;
+ LINEBUF_STATE(*bp) = LINEBUF_CR_INSIDE;
+ return LINEBUF_NOEOL;
+ default:
+ return LINEBUF_ERROR;
+ }
+ }
+}
+
+static void
+deliver_result(Eterm sender, Eterm pid, Eterm res)
+{
+ Process *rp;
+ ErtsProcLocks rp_locks = 0;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ ASSERT(is_internal_port(sender)
+ && is_internal_pid(pid)
+ && internal_pid_index(pid) < erts_max_processes);
+
+ rp = erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC);
+
+ if (rp) {
+ Eterm tuple;
+ ErlHeapFragment *bp;
+ ErlOffHeap *ohp;
+ Eterm* hp;
+ Uint sz_res;
+ sz_res = size_object(res);
+ hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks);
+ res = copy_struct(res, sz_res, &hp, ohp);
+ tuple = TUPLE2(hp, sender, res);
+ erts_queue_message(rp, &rp_locks, bp, tuple, NIL);
+ erts_smp_proc_unlock(rp, rp_locks);
+ erts_smp_proc_dec_refc(rp);
+ }
+}
+
+
+/*
+ * Deliver a "read" message.
+ * hbuf -- byte that are always formated as a list
+ * hlen -- number of byte in header
+ * buf -- data
+ * len -- length of data
+ */
+
+static void deliver_read_message(Port* prt, Eterm to,
+ char *hbuf, int hlen,
+ char *buf, int len, int eol)
+{
+ int need;
+ Eterm listp;
+ Eterm tuple;
+ Process* rp;
+ Eterm* hp;
+ ErlHeapFragment *bp;
+ ErlOffHeap *ohp;
+ ErtsProcLocks rp_locks = 0;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ need = 3 + 3 + 2*hlen;
+ if (prt->status & ERTS_PORT_SFLG_LINEBUF_IO) {
+ need += 3;
+ }
+ if (prt->status & ERTS_PORT_SFLG_BINARY_IO && buf != NULL) {
+ need += PROC_BIN_SIZE;
+ } else {
+ need += 2*len;
+ }
+
+ rp = erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC);
+ if (!rp)
+ return;
+
+ hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+
+ listp = NIL;
+ if ((prt->status & ERTS_PORT_SFLG_BINARY_IO) == 0) {
+ listp = buf_to_intlist(&hp, buf, len, listp);
+ } else if (buf != NULL) {
+ ProcBin* pb;
+ Binary* bptr;
+
+ bptr = erts_bin_nrml_alloc(len);
+ bptr->flags = 0;
+ bptr->orig_size = len;
+ erts_refc_init(&bptr->refc, 1);
+ sys_memcpy(bptr->orig_bytes, buf, len);
+
+ pb = (ProcBin *) hp;
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = len;
+ pb->next = ohp->mso;
+ ohp->mso = pb;
+ pb->val = bptr;
+ pb->bytes = (byte*) bptr->orig_bytes;
+ pb->flags = 0;
+ hp += PROC_BIN_SIZE;
+
+ ohp->overhead += pb->size / sizeof(Eterm);
+ listp = make_binary(pb);
+ }
+
+ /* Prepend the header */
+ if (hlen > 0) {
+ listp = buf_to_intlist(&hp, hbuf, hlen, listp);
+ }
+
+ if (prt->status & ERTS_PORT_SFLG_LINEBUF_IO){
+ listp = TUPLE2(hp, (eol) ? am_eol : am_noeol, listp);
+ hp += 3;
+ }
+ tuple = TUPLE2(hp, am_data, listp);
+ hp += 3;
+
+ tuple = TUPLE2(hp, prt->id, tuple);
+ hp += 3;
+
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_smp_proc_unlock(rp, rp_locks);
+ erts_smp_proc_dec_refc(rp);
+}
+
+/*
+ * Deliver all lines in a line buffer, repeats calls to
+ * deliver_read_message, and takes the same parameters.
+ */
+static void deliver_linebuf_message(Port* prt, Eterm to,
+ char* hbuf, int hlen,
+ char *buf, int len)
+{
+ LineBufContext lc;
+ int ret;
+ if(init_linebuf_context(&lc,&(prt->linebuf), buf, len) < 0)
+ return;
+ while((ret = read_linebuf(&lc)) > LINEBUF_EMPTY)
+ deliver_read_message(prt, to, hbuf, hlen, LINEBUF_DATA(lc),
+ LINEBUF_DATALEN(lc), (ret == LINEBUF_EOL));
+}
+
+/*
+ * Deliver any nonterminated lines in the line buffer before the
+ * port gets closed.
+ * Has to be called before terminate_port.
+ * Parameters:
+ * prt - Pointer to a Port structure for this port.
+ */
+static void flush_linebuf_messages(Port *prt)
+{
+ LineBufContext lc;
+ int ret;
+
+ ERTS_SMP_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));
+ if(prt == NULL || !(prt->status & ERTS_PORT_SFLG_LINEBUF_IO))
+ return;
+
+ if(init_linebuf_context(&lc,&(prt->linebuf), NULL, 0) < 0)
+ return;
+ while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY)
+ deliver_read_message(prt,
+ prt->connected,
+ NULL,
+ 0,
+ LINEBUF_DATA(lc),
+ LINEBUF_DATALEN(lc),
+ (ret == LINEBUF_EOL));
+}
+
+static void
+deliver_vec_message(Port* prt, /* Port */
+ Eterm to, /* Receiving pid */
+ char* hbuf, /* "Header" buffer... */
+ int hlen, /* ... and its length */
+ ErlDrvBinary** binv, /* Vector of binaries */
+ SysIOVec* iov, /* I/O vector */
+ int vsize, /* Size of binv & iov */
+ int csize) /* Size of characters in
+ iov (not hlen) */
+{
+ int need;
+ Eterm listp;
+ Eterm tuple;
+ Process* rp;
+ Eterm* hp;
+ ErlHeapFragment *bp;
+ ErlOffHeap *ohp;
+ ErtsProcLocks rp_locks = 0;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ /*
+ * Check arguments for validity.
+ */
+
+ rp = erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC);
+ if (!rp)
+ return;
+
+ /*
+ * Calculate the exact number of heap words needed.
+ */
+
+ need = 3 + 3; /* Heap space for two tuples */
+ if (prt->status & ERTS_PORT_SFLG_BINARY_IO) {
+ need += (2+PROC_BIN_SIZE)*vsize - 2 + hlen*2;
+ } else {
+ need += (hlen+csize)*2;
+ }
+
+ hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+
+ listp = NIL;
+ iov += vsize;
+
+ if ((prt->status & ERTS_PORT_SFLG_BINARY_IO) == 0) {
+ Eterm* thp = hp;
+ while (vsize--) {
+ iov--;
+ listp = buf_to_intlist(&thp, iov->iov_base, iov->iov_len, listp);
+ }
+ hp = thp;
+ } else {
+ binv += vsize;
+ while (vsize--) {
+ ErlDrvBinary* b;
+ ProcBin* pb = (ProcBin*) hp;
+ byte* base;
+
+ iov--;
+ binv--;
+ if ((b = *binv) == NULL) {
+ b = driver_alloc_binary(iov->iov_len);
+ sys_memcpy(b->orig_bytes, iov->iov_base, iov->iov_len);
+ base = (byte*) b->orig_bytes;
+ } else {
+ /* Must increment reference count, caller calls free */
+ driver_binary_inc_refc(b);
+ base = iov->iov_base;
+ }
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = iov->iov_len;
+ pb->next = ohp->mso;
+ ohp->mso = pb;
+ pb->val = ErlDrvBinary2Binary(b);
+ pb->bytes = base;
+ pb->flags = 0;
+ hp += PROC_BIN_SIZE;
+
+ ohp->overhead += iov->iov_len / sizeof(Eterm);
+
+ if (listp == NIL) { /* compatible with deliver_bin_message */
+ listp = make_binary(pb);
+ } else {
+ listp = CONS(hp, make_binary(pb), listp);
+ hp += 2;
+ }
+ }
+ }
+
+ if (hlen > 0) { /* Prepend the header */
+ Eterm* thp = hp;
+ listp = buf_to_intlist(&thp, hbuf, hlen, listp);
+ hp = thp;
+ }
+
+ tuple = TUPLE2(hp, am_data, listp);
+ hp += 3;
+ tuple = TUPLE2(hp, prt->id, tuple);
+ hp += 3;
+
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_smp_proc_unlock(rp, rp_locks);
+ erts_smp_proc_dec_refc(rp);
+}
+
+
+static void deliver_bin_message(Port* prt, /* port */
+ Eterm to, /* receiving pid */
+ char* hbuf, /* "header" buffer */
+ int hlen, /* and it's length */
+ ErlDrvBinary* bin, /* binary data */
+ int offs, /* offset into binary */
+ int len) /* length of binary */
+{
+ SysIOVec vec;
+
+ vec.iov_base = bin->orig_bytes+offs;
+ vec.iov_len = len;
+ deliver_vec_message(prt, to, hbuf, hlen, &bin, &vec, 1, len);
+}
+
+/* flush the port I/O queue and terminate if empty */
+/*
+ * Note.
+ *
+ * The test for (p->status & ERTS_PORT_SFLGS_DEAD) == 0 is important since the
+ * driver's flush function might call driver_async, which when using no
+ * threads and being short circuited will notice that the io queue is empty
+ * (after calling the driver's async_ready) and recursively call
+ * terminate_port. So when we get back here, the port is already terminated.
+ */
+static void flush_port(Port *p)
+{
+ int fpe_was_unmasked;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+
+ if (p->drv_ptr->flush != NULL) {
+ if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
+ trace_sched_ports_where(p, am_in, am_flush);
+ }
+ fpe_was_unmasked = erts_block_fpe();
+ (*p->drv_ptr->flush)((ErlDrvData)p->drv_data);
+ erts_unblock_fpe(fpe_was_unmasked);
+ if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
+ trace_sched_ports_where(p, am_out, am_flush);
+ }
+#ifdef ERTS_SMP
+ if (p->xports)
+ erts_smp_xports_unlock(p);
+ ASSERT(!p->xports);
+#endif
+ }
+ if ((p->status & ERTS_PORT_SFLGS_DEAD) == 0 && is_port_ioq_empty(p)) {
+ terminate_port(p);
+ }
+}
+
+/* stop and delete a port that is ERTS_PORT_SFLG_CLOSING */
+static void
+terminate_port(Port *prt)
+{
+ Eterm send_closed_port_id;
+ Eterm connected_id = NIL /* Initialize to silence compiler */;
+ erts_driver_t *drv;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ ASSERT(!prt->nlinks);
+ ASSERT(!prt->monitors);
+
+ if (prt->status & ERTS_PORT_SFLG_SEND_CLOSED) {
+ erts_port_status_band_set(prt, ~ERTS_PORT_SFLG_SEND_CLOSED);
+ send_closed_port_id = prt->id;
+ connected_id = prt->connected;
+ }
+ else {
+ send_closed_port_id = NIL;
+ }
+
+#ifdef ERTS_SMP
+ erts_cancel_smp_ptimer(prt->ptimer);
+#else
+ erl_cancel_timer(&prt->tm);
+#endif
+
+ drv = prt->drv_ptr;
+ if ((drv != NULL) && (drv->stop != NULL)) {
+ int fpe_was_unmasked = erts_block_fpe();
+ (*drv->stop)((ErlDrvData)prt->drv_data);
+ erts_unblock_fpe(fpe_was_unmasked);
+#ifdef ERTS_SMP
+ if (prt->xports)
+ erts_smp_xports_unlock(prt);
+ ASSERT(!prt->xports);
+#endif
+ }
+ if(drv->handle != NULL) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ erts_ddll_decrement_port_count(drv->handle);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+ stopq(prt); /* clear queue memory */
+ if(prt->linebuf != NULL){
+ erts_free(ERTS_ALC_T_LINEBUF, (void *) prt->linebuf);
+ prt->linebuf = NULL;
+ }
+ if (prt->bp != NULL) {
+ free_message_buffer(prt->bp);
+ prt->bp = NULL;
+ prt->data = am_undefined;
+ }
+
+ if (prt->psd)
+ erts_free(ERTS_ALC_T_PRTSD, prt->psd);
+
+ kill_port(prt);
+
+ /*
+ * We don't want to send the closed message until after the
+ * port has been removed from the port table (in kill_port()).
+ */
+ if (is_internal_port(send_closed_port_id))
+ deliver_result(send_closed_port_id, connected_id, am_closed);
+
+ ASSERT(prt->dist_entry == NULL);
+}
+
+void
+erts_terminate_port(Port *pp)
+{
+ terminate_port(pp);
+}
+
+static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc)
+{
+ ErtsMonitor *rmon;
+ Process *rp;
+
+ ASSERT(mon->type == MON_ORIGIN);
+ ASSERT(is_internal_pid(mon->pid));
+ rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK);
+ if (!rp) {
+ goto done;
+ }
+ rmon = erts_remove_monitor(&(rp->monitors),mon->ref);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ if (rmon == NULL) {
+ goto done;
+ }
+ erts_destroy_monitor(rmon);
+ done:
+ erts_destroy_monitor(mon);
+}
+
+
+
+typedef struct {
+ Eterm port;
+ Eterm reason;
+} SweepContext;
+
+static void sweep_one_link(ErtsLink *lnk, void *vpsc)
+{
+ SweepContext *psc = vpsc;
+ DistEntry *dep;
+ Process *rp;
+
+
+ ASSERT(lnk->type == LINK_PID);
+
+ if (is_external_pid(lnk->pid)) {
+ dep = external_pid_dist_entry(lnk->pid);
+ if(dep != erts_this_dist_entry) {
+ ErtsDistLinkData dld;
+ ErtsDSigData dsd;
+ int code;
+ code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0);
+ switch (code) {
+ case ERTS_DSIG_PREP_NOT_ALIVE:
+ case ERTS_DSIG_PREP_NOT_CONNECTED:
+ break;
+ case ERTS_DSIG_PREP_CONNECTED:
+ erts_remove_dist_link(&dld, psc->port, lnk->pid, dep);
+ erts_destroy_dist_link(&dld);
+ code = erts_dsig_send_exit(&dsd, psc->port, lnk->pid,
+ psc->reason);
+ ASSERT(code == ERTS_DSIG_SEND_OK);
+ break;
+ default:
+ ASSERT(! "Invalid dsig prepare result");
+ break;
+ }
+ }
+ } else {
+ ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND;
+ ASSERT(is_internal_pid(lnk->pid));
+ rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
+ if (rp) {
+ ErtsLink *rlnk = erts_remove_link(&(rp->nlinks), psc->port);
+
+ if (rlnk) {
+ int xres = erts_send_exit_signal(NULL,
+ psc->port,
+ rp,
+ &rp_locks,
+ psc->reason,
+ NIL,
+ NULL,
+ 0);
+ if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
+ /* We didn't exit the process and it is traced */
+ if (IS_TRACED_FL(rp, F_TRACE_PROCS)) {
+ trace_proc(NULL, rp, am_getting_unlinked,
+ psc->port);
+ }
+ }
+ erts_destroy_link(rlnk);
+ }
+
+ erts_smp_proc_unlock(rp, rp_locks);
+ }
+ }
+ erts_destroy_link(lnk);
+}
+
+/* 'from' is sending 'this_port' an exit signal, (this_port must be internal).
+ * If reason is normal we don't do anything, *unless* from is our connected
+ * process in which case we close the port. Any other reason kills the port.
+ * If 'from' is ourself we always die.
+ * When a driver has data in ioq then driver will be set to closing
+ * and become inaccessible to the processes. One exception exists and
+ * that is to kill a port till reason kill. Then the port is stopped.
+ *
+ */
+void
+erts_do_exit_port(Port *p, Eterm from, Eterm reason)
+{
+ ErtsLink *lnk;
+ Eterm rreason;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+
+ rreason = (reason == am_kill) ? am_killed : reason;
+
+ if ((p->status & (ERTS_PORT_SFLGS_DEAD
+ | ERTS_PORT_SFLG_EXITING
+ | ERTS_PORT_SFLG_IMMORTAL))
+ || ((reason == am_normal) &&
+ ((from != p->connected) && (from != p->id)))) {
+ return;
+ }
+
+ if (IS_TRACED_FL(p, F_TRACE_PORTS)) {
+ trace_port(p, am_closed, reason);
+ }
+
+ erts_trace_check_exiting(p->id);
+
+ /*
+ * Setting the port to not busy here, frees the list of pending
+ * processes and makes them runnable.
+ */
+ set_busy_port((ErlDrvPort)internal_port_index(p->id), 0);
+
+ if (p->reg != NULL)
+ (void) erts_unregister_name(NULL, 0, p, p->reg->name);
+
+ erts_port_status_bor_set(p, ERTS_PORT_SFLG_EXITING);
+
+ {
+ SweepContext sc = {p->id, rreason};
+ lnk = p->nlinks;
+ p->nlinks = NULL;
+ erts_sweep_links(lnk, &sweep_one_link, &sc);
+ }
+ {
+ ErtsMonitor *moni = p->monitors;
+ p->monitors = NULL;
+ erts_sweep_monitors(moni, &sweep_one_monitor, NULL);
+ }
+
+
+ if ((p->status & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) {
+ erts_do_net_exits(p->dist_entry, rreason);
+ erts_deref_dist_entry(p->dist_entry);
+ p->dist_entry = NULL;
+ erts_port_status_band_set(p, ~ERTS_PORT_SFLG_DISTRIBUTION);
+ }
+
+ if ((reason != am_kill) && !is_port_ioq_empty(p)) {
+ erts_port_status_bandor_set(p,
+ ~ERTS_PORT_SFLG_EXITING, /* must turn it off */
+ ERTS_PORT_SFLG_CLOSING);
+ flush_port(p);
+ }
+ else {
+ terminate_port(p);
+ }
+}
+
+/* About the states ERTS_PORT_SFLG_EXITING and ERTS_PORT_SFLG_CLOSING used above.
+**
+** ERTS_PORT_SFLG_EXITING is a recursion protection for erts_do_exit_port().
+** It is unclear whether this state is necessary or not, it might be possible
+** to merge it with ERTS_PORT_SFLG_CLOSING. ERTS_PORT_SFLG_EXITING only persists
+** over a section of sequential (but highly recursive) code.
+**
+** ERTS_PORT_SFLG_CLOSING is a state where the port is in Limbo, waiting to
+** pass on. All links are removed, and the port receives in/out-put events so
+** as soon as the port queue gets empty terminate_port() is called.
+*/
+
+
+
+/* Command should be of the form
+** {PID, close}
+** {PID, {command, io-list}}
+** {PID, {connect, New_PID}}
+**
+**
+*/
+void erts_port_command(Process *proc,
+ Eterm caller_id,
+ Port *port,
+ Eterm command)
+{
+ Eterm *tp;
+ Eterm pid;
+
+ if (!port)
+ return;
+
+ erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_MAIN);
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ASSERT(!INVALID_PORT(port, port->id));
+
+ if (is_tuple_arity(command, 2)) {
+ tp = tuple_val(command);
+ if ((pid = port->connected) == tp[1]) {
+ /* PID must be connected */
+ if (tp[2] == am_close) {
+ erts_port_status_bor_set(port, ERTS_PORT_SFLG_SEND_CLOSED);
+ erts_do_exit_port(port, pid, am_normal);
+ goto done;
+ } else if (is_tuple_arity(tp[2], 2)) {
+ tp = tuple_val(tp[2]);
+ if (tp[1] == am_command) {
+ if (erts_write_to_port(caller_id, port, tp[2]) == 0)
+ goto done;
+ } else if ((tp[1] == am_connect) && is_internal_pid(tp[2])) {
+ port->connected = tp[2];
+ deliver_result(port->id, pid, am_connected);
+ goto done;
+ }
+ }
+ }
+ }
+
+ {
+ ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
+ Process* rp = erts_pid2proc_opt(NULL, 0,
+ port->connected, rp_locks,
+ ERTS_P2P_FLG_SMP_INC_REFC);
+ if (rp) {
+ (void) erts_send_exit_signal(NULL,
+ port->id,
+ rp,
+ &rp_locks,
+ am_badsig,
+ NIL,
+ NULL,
+ 0);
+ erts_smp_proc_unlock(rp, rp_locks);
+ erts_smp_proc_dec_refc(rp);
+ }
+
+ }
+ done:
+ erts_smp_proc_lock(proc, ERTS_PROC_LOCK_MAIN);
+}
+
+/*
+ * Control a port synchronously.
+ * Returns either a list or a binary.
+ */
+Eterm
+erts_port_control(Process* p, Port* prt, Uint command, Eterm iolist)
+{
+ byte* to_port = NULL; /* Buffer to write to port. */
+ /* Initialization is for shutting up
+ warning about use before set. */
+ int to_len = 0; /* Length of buffer. */
+ int must_free = 0; /* True if the buffer should be freed. */
+ char port_result[ERL_ONHEAP_BIN_LIMIT]; /* Default buffer for result from port. */
+ char* port_resp; /* Pointer to result buffer. */
+ int n;
+ int (*control)(ErlDrvData, unsigned, char*, int, char**, int);
+ int fpe_was_unmasked;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if ((control = prt->drv_ptr->control) == NULL) {
+ return THE_NON_VALUE;
+ }
+
+ /*
+ * Convert the iolist to a buffer, pointed to by to_port,
+ * and with its length in to_len.
+ */
+ if (is_binary(iolist) && binary_bitoffset(iolist) == 0) {
+ Uint bitoffs;
+ Uint bitsize;
+ ERTS_GET_BINARY_BYTES(iolist, to_port, bitoffs, bitsize);
+ to_len = binary_size(iolist);
+ } else {
+ int r;
+
+ /* Try with an 8KB buffer first (will often be enough I guess). */
+ to_len = 8*1024;
+ to_port = erts_alloc(ERTS_ALC_T_TMP, to_len);
+ must_free = 1;
+
+ /*
+ * In versions before R10B, we used to reserve random
+ * amounts of extra memory. From R10B, we allocate the
+ * exact amount.
+ */
+ r = io_list_to_buf(iolist, (char*) to_port, to_len);
+ if (r >= 0) {
+ to_len -= r;
+ } else if (r == -2) { /* Type error */
+ erts_free(ERTS_ALC_T_TMP, (void *) to_port);
+ return THE_NON_VALUE;
+ } else {
+ ASSERT(r == -1); /* Overflow */
+ erts_free(ERTS_ALC_T_TMP, (void *) to_port);
+ if ((to_len = io_list_len(iolist)) < 0) { /* Type error */
+ return THE_NON_VALUE;
+ }
+ must_free = 1;
+ to_port = erts_alloc(ERTS_ALC_T_TMP, to_len);
+ r = io_list_to_buf(iolist, (char*) to_port, to_len);
+ ASSERT(r == 0);
+ }
+ }
+
+ prt->caller = p->id; /* Internal pid */
+
+ erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN);
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ /*
+ * Call the port's control routine.
+ */
+
+ port_resp = port_result;
+ fpe_was_unmasked = erts_block_fpe();
+ n = control((ErlDrvData)prt->drv_data, command, (char*)to_port, to_len,
+ &port_resp, sizeof(port_result));
+ erts_unblock_fpe(fpe_was_unmasked);
+ if (must_free) {
+ erts_free(ERTS_ALC_T_TMP, (void *) to_port);
+ }
+ prt->caller = NIL;
+#ifdef ERTS_SMP
+ if (prt->xports)
+ erts_smp_xports_unlock(prt);
+ ASSERT(!prt->xports);
+#endif
+
+ erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN);
+ /*
+ * Handle the result.
+ */
+
+ if (n < 0) {
+ return THE_NON_VALUE;
+ }
+
+ if ((prt->control_flags & PORT_CONTROL_FLAG_BINARY) == 0) { /* List result */
+ Eterm ret;
+ Eterm* hp = HAlloc(p, 2*n);
+ ret = buf_to_intlist(&hp, port_resp, n, NIL);
+ if (port_resp != port_result) {
+ driver_free(port_resp);
+ }
+ return ret;
+ }
+ else if (port_resp == NULL) {
+ return NIL;
+ }
+ else { /* Binary result */
+ ErlDrvBinary *dbin;
+ ErlHeapBin *hbin;
+ if (port_resp != port_result) {
+ dbin = (ErlDrvBinary *) port_resp;
+ if (dbin->orig_size > ERL_ONHEAP_BIN_LIMIT) {
+ ProcBin* pb = (ProcBin *) HAlloc(p, PROC_BIN_SIZE);
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = dbin->orig_size;
+ pb->next = MSO(p).mso;
+ MSO(p).mso = pb;
+ pb->val = ErlDrvBinary2Binary(dbin);
+ pb->bytes = (byte*) dbin->orig_bytes;
+ pb->flags = 0;
+ MSO(p).overhead += dbin->orig_size / sizeof(Eterm);
+ return make_binary(pb);
+ }
+ port_resp = dbin->orig_bytes;
+ n = dbin->orig_size;
+ } else {
+ dbin = NULL;
+ }
+ hbin = (ErlHeapBin*) HAlloc(p, heap_bin_size(n));
+ ASSERT(n <= ERL_ONHEAP_BIN_LIMIT);
+ hbin->thing_word = header_heap_bin(n);
+ hbin->size = n;
+ sys_memcpy(hbin->data, port_resp, n);
+ if (dbin != NULL) {
+ driver_free_binary(dbin);
+ }
+ return make_binary(hbin);
+ }
+}
+
+typedef struct {
+ int to;
+ void *arg;
+} prt_one_lnk_data;
+
+static void prt_one_monitor(ErtsMonitor *mon, void *vprtd)
+{
+ prt_one_lnk_data *prtd = (prt_one_lnk_data *) vprtd;
+ erts_print(prtd->to, prtd->arg, "(%T,%T)", mon->pid,mon->ref);
+}
+
+static void prt_one_lnk(ErtsLink *lnk, void *vprtd)
+{
+ prt_one_lnk_data *prtd = (prt_one_lnk_data *) vprtd;
+ erts_print(prtd->to, prtd->arg, "%T", lnk->pid);
+}
+
+void
+print_port_info(int to, void *arg, int i)
+{
+ Port* p = &erts_port[i];
+
+ if (p->status & ERTS_PORT_SFLGS_DEAD)
+ return;
+
+ erts_print(to, arg, "=port:%T\n", p->id);
+ erts_print(to, arg, "Slot: %d\n", i);
+ if (p->status & ERTS_PORT_SFLG_CONNECTED) {
+ erts_print(to, arg, "Connected: %T", p->connected);
+ erts_print(to, arg, "\n");
+ }
+
+ if (p->nlinks != NULL) {
+ prt_one_lnk_data prtd;
+ prtd.to = to;
+ prtd.arg = arg;
+ erts_print(to, arg, "Links: ");
+ erts_doforall_links(p->nlinks, &prt_one_lnk, &prtd);
+ erts_print(to, arg, "\n");
+ }
+ if (p->monitors != NULL) {
+ prt_one_lnk_data prtd;
+ prtd.to = to;
+ prtd.arg = arg;
+ erts_print(to, arg, "Monitors: ");
+ erts_doforall_monitors(p->monitors, &prt_one_monitor, &prtd);
+ erts_print(to, arg, "\n");
+ }
+
+ if (p->reg != NULL)
+ erts_print(to, arg, "Registered as: %T\n", p->reg->name);
+
+ if (p->drv_ptr == &fd_driver) {
+ erts_print(to, arg, "Port is UNIX fd not opened by emulator: %s\n", p->name);
+ } else if (p->drv_ptr == &vanilla_driver) {
+ erts_print(to, arg, "Port is a file: %s\n",p->name);
+ } else if (p->drv_ptr == &spawn_driver) {
+ erts_print(to, arg, "Port controls external process: %s\n",p->name);
+ } else {
+ erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name);
+ }
+}
+
+void
+set_busy_port(ErlDrvPort port_num, int on)
+{
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
+
+ if (on) {
+ erts_port_status_bor_set(&erts_port[port_num],
+ ERTS_PORT_SFLG_PORT_BUSY);
+ } else {
+ ErtsProcList* plp = erts_port[port_num].suspended;
+ erts_port_status_band_set(&erts_port[port_num],
+ ~ERTS_PORT_SFLG_PORT_BUSY);
+ erts_port[port_num].suspended = NULL;
+
+ if (erts_port[port_num].dist_entry) {
+ /*
+ * Processes suspended on distribution ports are
+ * normally queued on the dist entry.
+ */
+ erts_dist_port_not_busy(&erts_port[port_num]);
+ }
+
+ /*
+ * Resume, in a round-robin fashion, all processes waiting on the port.
+ *
+ * This version submitted by Tony Rogvall. The earlier version used
+ * to resume the processes in order, which caused starvation of all but
+ * the first process.
+ */
+
+ if (plp) {
+ /* First proc should be resumed last */
+ if (plp->next) {
+ erts_resume_processes(plp->next);
+ plp->next = NULL;
+ }
+ erts_resume_processes(plp);
+ }
+ }
+}
+
+void set_port_control_flags(ErlDrvPort port_num, int flags)
+{
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
+
+ erts_port[port_num].control_flags = flags;
+}
+
+int get_port_flags(ErlDrvPort ix) {
+ Port* prt = erts_drvport2port(ix);
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if (prt == NULL)
+ return 0;
+
+ return (prt->status & ERTS_PORT_SFLG_BINARY_IO ? PORT_FLAG_BINARY : 0)
+ | (prt->status & ERTS_PORT_SFLG_LINEBUF_IO ? PORT_FLAG_LINE : 0);
+}
+
+
+void erts_raw_port_command(Port* p, byte* buf, Uint len)
+{
+ int fpe_was_unmasked;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+
+ if (len > (Uint) INT_MAX)
+ erl_exit(ERTS_ABORT_EXIT,
+ "Absurdly large data buffer (%bpu bytes) passed to"
+ "output callback of %s driver.\n",
+ len,
+ p->drv_ptr->name ? p->drv_ptr->name : "unknown");
+
+ p->caller = NIL;
+ fpe_was_unmasked = erts_block_fpe();
+ (*p->drv_ptr->output)((ErlDrvData)p->drv_data, (char*) buf, (int) len);
+ erts_unblock_fpe(fpe_was_unmasked);
+}
+
+int async_ready(Port *p, void* data)
+{
+ int need_free = 1;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if (p) {
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+ ASSERT(!(p->status & ERTS_PORT_SFLGS_DEAD));
+ if (p->drv_ptr->ready_async != NULL) {
+ (*p->drv_ptr->ready_async)((ErlDrvData)p->drv_data, data);
+ need_free = 0;
+#ifdef ERTS_SMP
+ if (p->xports)
+ erts_smp_xports_unlock(p);
+ ASSERT(!p->xports);
+#endif
+ }
+ if ((p->status & ERTS_PORT_SFLG_CLOSING) && is_port_ioq_empty(p)) {
+ terminate_port(p);
+ }
+ }
+ return need_free;
+}
+
+static void
+report_missing_drv_callback(Port *p, char *drv_type, char *callback)
+{
+ ErtsPortNames *pnp = erts_get_port_names(p->id);
+ char *unknown = "<unknown>";
+ char *drv_name = pnp->driver_name ? pnp->driver_name : unknown;
+ char *prt_name = pnp->name ? pnp->name : unknown;
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ erts_dsprintf(dsbufp, "%T: %s driver '%s' ", p->id, drv_type, drv_name);
+ if (sys_strcmp(drv_name, prt_name) != 0)
+ erts_dsprintf(dsbufp, "(%s) ", prt_name);
+ erts_dsprintf(dsbufp, "does not implement the %s callback!\n", callback);
+ erts_free_port_names(pnp);
+ erts_send_error_to_logger_nogl(dsbufp);
+}
+
+void
+erts_stale_drv_select(Eterm port,
+ ErlDrvEvent hndl,
+ int mode,
+ int deselect)
+{
+ char *type;
+ ErlDrvPort drv_port = internal_port_index(port);
+ ErtsPortNames *pnp = erts_get_port_names(port);
+ erts_dsprintf_buf_t *dsbufp;
+
+ switch (mode) {
+ case ERL_DRV_READ | ERL_DRV_WRITE:
+ type = "Input/Output";
+ goto deselect;
+ case ERL_DRV_WRITE:
+ type = "Output";
+ goto deselect;
+ case ERL_DRV_READ:
+ type = "Input";
+ deselect:
+ if (deselect) {
+ driver_select(drv_port, hndl,
+ mode | ERL_DRV_USE_NO_CALLBACK,
+ 0);
+ }
+ break;
+ default:
+ type = "Event";
+ if (deselect)
+ driver_event(drv_port, hndl, NULL);
+ break;
+ }
+
+ dsbufp = erts_create_logger_dsbuf();
+ erts_dsprintf(dsbufp,
+ "%T: %s: %s driver gone away without deselecting!\n",
+ port,
+ pnp->name ? pnp->name : "<unknown>",
+ type);
+ erts_free_port_names(pnp);
+ erts_send_error_to_logger_nogl(dsbufp);
+}
+
+ErtsPortNames *
+erts_get_port_names(Eterm id)
+{
+ ErtsPortNames *pnp;
+ ASSERT(is_nil(id) || is_internal_port(id));
+
+ if (is_not_internal_port(id)) {
+ pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, sizeof(ErtsPortNames));
+ pnp->name = NULL;
+ pnp->driver_name = NULL;
+ }
+ else {
+ Port* prt = &erts_port[internal_port_index(id)];
+ int do_realloc = 1;
+ int len = -1;
+ size_t pnp_len = sizeof(ErtsPortNames);
+#ifndef DEBUG
+ pnp_len += 100; /* In most cases 100 characters will be enough... */
+#endif
+ pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len);
+ do {
+ int nlen;
+ char *name, *driver_name;
+ if (len > 0) {
+ erts_free(ERTS_ALC_T_PORT_NAMES, pnp);
+ pnp_len = sizeof(ErtsPortNames) + len;
+ pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len);
+ }
+ erts_smp_port_state_lock(prt);
+ if (id != prt->id) {
+ len = nlen = 0;
+ name = driver_name = NULL;
+ }
+ else {
+ name = prt->name;
+ len = nlen = name ? sys_strlen(name) + 1 : 0;
+ driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL);
+ len += driver_name ? sys_strlen(driver_name) + 1 : 0;
+ }
+ if (len <= pnp_len - sizeof(ErtsPortNames)) {
+ if (!name)
+ pnp->name = NULL;
+ else {
+ pnp->name = ((char *) pnp) + sizeof(ErtsPortNames);
+ sys_strcpy(pnp->name, name);
+ }
+ if (!driver_name)
+ pnp->driver_name = NULL;
+ else {
+ pnp->driver_name = (((char *) pnp)
+ + sizeof(ErtsPortNames)
+ + nlen);
+ sys_strcpy(pnp->driver_name, driver_name);
+ }
+ do_realloc = 0;
+ }
+ erts_smp_port_state_unlock(prt);
+ } while (do_realloc);
+ }
+ return pnp;
+}
+
+void
+erts_free_port_names(ErtsPortNames *pnp)
+{
+ erts_free(ERTS_ALC_T_PORT_NAMES, pnp);
+}
+
+static void schedule_port_timeout(Port *p)
+{
+ /*
+ * Scheduling of port timeouts can be done without port locking, but
+ * since the task handle is stored in the port structure and the ptimer
+ * structure is protected by the port lock we require the port to be
+ * locked for now...
+ *
+ * TODO: Implement scheduling of port timeouts without locking
+ * the port.
+ * /Rickard
+ */
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+ (void) erts_port_task_schedule(p->id,
+ &p->timeout_task,
+ ERTS_PORT_TASK_TIMEOUT,
+ (ErlDrvEvent) -1,
+ NULL);
+}
+
+ErlDrvTermData driver_mk_term_nil(void)
+{
+ return driver_term_nil;
+}
+
+void driver_report_exit(int ix, int status)
+{
+ Port* prt = erts_drvport2port(ix);
+ Eterm* hp;
+ Eterm tuple;
+ Process *rp;
+ Eterm pid;
+ ErlHeapFragment *bp = NULL;
+ ErlOffHeap *ohp;
+ ErtsProcLocks rp_locks = 0;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ pid = prt->connected;
+ ASSERT(is_internal_pid(pid));
+ rp = erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC);
+ if (!rp)
+ return;
+
+ hp = erts_alloc_message_heap(3+3, &bp, &ohp, rp, &rp_locks);
+
+ tuple = TUPLE2(hp, am_exit_status, make_small(status));
+ hp += 3;
+ tuple = TUPLE2(hp, prt->id, tuple);
+
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+
+ erts_smp_proc_unlock(rp, rp_locks);
+ erts_smp_proc_dec_refc(rp);
+}
+
+
+static ERTS_INLINE int
+deliver_term_check_port(ErlDrvPort drvport)
+{
+ int res;
+ int ix = (int) drvport;
+ if (ix < 0 || erts_max_ports <= ix)
+ res = -1; /* invalid */
+ else {
+ Port* prt = &erts_port[ix];
+ erts_smp_port_state_lock(prt);
+ if (!(prt->status & ERTS_PORT_SFLGS_INVALID_LOOKUP))
+ res = 1; /* ok */
+ else if (prt->status & ERTS_PORT_SFLG_CLOSING)
+ res = 0; /* closing */
+ else
+ res = -1; /* invalid (dead) */
+ erts_smp_port_state_unlock(prt);
+ }
+ return res;
+}
+
+#define ERTS_B2T_STATES_DEF_STATES_SZ 5
+#define ERTS_B2T_STATES_DEF_STATES_INC 100
+
+struct b2t_states__ {
+ int len;
+ int ix;
+ int used;
+ ErtsBinary2TermState *state;
+ ErtsBinary2TermState def_states[ERTS_B2T_STATES_DEF_STATES_SZ];
+#ifdef DEBUG
+ byte **org_ext;
+ byte *def_org_ext[ERTS_B2T_STATES_DEF_STATES_SZ];
+#endif
+};
+
+static ERTS_INLINE void
+init_b2t_states(struct b2t_states__ *b2tsp)
+{
+ b2tsp->len = ERTS_B2T_STATES_DEF_STATES_SZ;
+ b2tsp->ix = 0;
+ b2tsp->used = 0;
+ b2tsp->state = &b2tsp->def_states[0];
+#ifdef DEBUG
+ b2tsp->org_ext = &b2tsp->def_org_ext[0];
+#endif
+}
+
+static ERTS_INLINE void
+grow_b2t_states(struct b2t_states__ *b2tsp)
+{
+ if (b2tsp->state != &b2tsp->def_states[0]) {
+ b2tsp->len += ERTS_B2T_STATES_DEF_STATES_INC;
+ b2tsp->state = erts_realloc(ERTS_ALC_T_TMP,
+ b2tsp->state,
+ sizeof(ErtsBinary2TermState)*b2tsp->len);
+#ifdef DEBUG
+ b2tsp->org_ext = erts_realloc(ERTS_ALC_T_TMP,
+ b2tsp->org_ext,
+ sizeof(char *)*b2tsp->len);
+#endif
+ }
+ else {
+ ErtsBinary2TermState *new_states;
+ new_states = erts_alloc(ERTS_ALC_T_TMP,
+ (sizeof(ErtsBinary2TermState)
+ *ERTS_B2T_STATES_DEF_STATES_INC));
+ sys_memcpy((void *) new_states,
+ (void *) b2tsp->state,
+ sizeof(ErtsBinary2TermState)*ERTS_B2T_STATES_DEF_STATES_SZ);
+ b2tsp->state = new_states;
+ b2tsp->len = ERTS_B2T_STATES_DEF_STATES_INC;
+#ifdef DEBUG
+ {
+ byte **new_org_ext = erts_alloc(ERTS_ALC_T_TMP,
+ (sizeof(char *)
+ *ERTS_B2T_STATES_DEF_STATES_INC));
+ sys_memcpy((void *) new_org_ext,
+ (void *) b2tsp->org_ext,
+ sizeof(char *)*ERTS_B2T_STATES_DEF_STATES_SZ);
+ b2tsp->org_ext = new_org_ext;
+ }
+#endif
+ }
+}
+
+static ERTS_INLINE void
+cleanup_b2t_states(struct b2t_states__ *b2tsp)
+{
+ if (b2tsp->state != &b2tsp->def_states[0]) {
+ erts_free(ERTS_ALC_T_TMP, b2tsp->state);
+#ifdef DEBUG
+ erts_free(ERTS_ALC_T_TMP, b2tsp->org_ext);
+#endif
+ }
+}
+
+
+/*
+ * Generate an Erlang term from data in an array (representing a simple stack
+ * machine to build terms).
+ * Returns:
+ * -1 on error in input data
+ * 0 if the message was not delivered (bad to pid or closed port)
+ * 1 if the message was delivered successfully
+ */
+
+static int
+driver_deliver_term(ErlDrvPort port,
+ Eterm to,
+ ErlDrvTermData* data,
+ int len)
+{
+#define ERTS_DDT_FAIL do { res = -1; goto done; } while (0)
+ Uint need = 0;
+ int depth = 0;
+ int res;
+ Eterm *hp = NULL, *hp_start = NULL, *hp_end = NULL;
+ ErlDrvTermData* ptr;
+ ErlDrvTermData* ptr_end;
+ DECLARE_ESTACK(stack);
+ Eterm mess = NIL; /* keeps compiler happy */
+ Process* rp = NULL;
+ ErlHeapFragment *bp = NULL;
+ ErlOffHeap *ohp;
+ ErtsProcLocks rp_locks = 0;
+ struct b2t_states__ b2t;
+
+ init_b2t_states(&b2t);
+
+ /*
+ * We used to check port and process here. In the SMP enabled emulator,
+ * however, we don't want to that until we have verified the term.
+ */
+
+ /*
+ * Check ErlDrvTermData for consistency and calculate needed heap size
+ * and stack depth.
+ */
+ ptr = data;
+ ptr_end = ptr + len;
+
+ while (ptr < ptr_end) {
+ ErlDrvTermData tag = *ptr++;
+
+#define ERTS_DDT_CHK_ENOUGH_ARGS(NEED) \
+ if (ptr+((NEED)-1) >= ptr_end) ERTS_DDT_FAIL;
+
+ switch(tag) {
+ case ERL_DRV_NIL: /* no arguments */
+ depth++;
+ break;
+ case ERL_DRV_ATOM: /* atom argument */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ if (is_not_atom(ptr[0])) ERTS_DDT_FAIL;
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_INT: /* signed int argument */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ /* check for bignum */
+ if (!IS_SSMALL((Sint)ptr[0]))
+ need += BIG_UINT_HEAP_SIZE; /* use small_to_big */
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_UINT: /* unsigned int argument */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ /* check for bignum */
+ if (!IS_USMALL(0, (Uint)ptr[0]))
+ need += BIG_UINT_HEAP_SIZE; /* use small_to_big */
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_INT64: /* pointer to signed 64-bit int argument */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ erts_bld_sint64(NULL, &need, *((Sint64 *) ptr[0]));
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_UINT64: /* pointer to unsigned 64-bit int argument */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ erts_bld_uint64(NULL, &need, *((Uint64 *) ptr[0]));
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_PORT: /* port argument */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ if (is_not_internal_port(ptr[0])) ERTS_DDT_FAIL;
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_BINARY: { /* ErlDrvBinary*, size, offs */
+ ErlDrvBinary* b;
+ Uint size;
+ Uint offset;
+ ERTS_DDT_CHK_ENOUGH_ARGS(3);
+ b = (ErlDrvBinary*) ptr[0];
+ size = ptr[1];
+ offset = ptr[2];
+ if (!b || size + offset > b->orig_size)
+ ERTS_DDT_FAIL; /* No binary or outside the binary */
+ need += (size <= ERL_ONHEAP_BIN_LIMIT
+ ? heap_bin_size(size)
+ : PROC_BIN_SIZE);
+ ptr += 3;
+ depth++;
+ break;
+ }
+ case ERL_DRV_BUF2BINARY: { /* char*, size */
+ byte *bufp;
+ Uint size;
+ ERTS_DDT_CHK_ENOUGH_ARGS(2);
+ bufp = (byte *) ptr[0];
+ size = (Uint) ptr[1];
+ if (!bufp && size > 0) ERTS_DDT_FAIL;
+ need += (size <= ERL_ONHEAP_BIN_LIMIT
+ ? heap_bin_size(size)
+ : PROC_BIN_SIZE);
+ ptr += 2;
+ depth++;
+ break;
+ }
+ case ERL_DRV_STRING: /* char*, length */
+ ERTS_DDT_CHK_ENOUGH_ARGS(2);
+ if ((char *) ptr[0] == NULL || (int) ptr[1] < 0) ERTS_DDT_FAIL;
+ need += ptr[1] * 2;
+ ptr += 2;
+ depth++;
+ break;
+ case ERL_DRV_STRING_CONS: /* char*, length */
+ ERTS_DDT_CHK_ENOUGH_ARGS(2);
+ if ((char *) ptr[0] == NULL || (int) ptr[1] < 0) ERTS_DDT_FAIL;
+ need += ptr[1] * 2;
+ if (depth < 1) ERTS_DDT_FAIL;
+ ptr += 2;
+ break;
+ case ERL_DRV_LIST: /* int */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ if ((int) ptr[0] <= 0) ERTS_DDT_FAIL;
+ need += (ptr[0]-1)*2; /* list cells */
+ depth -= ptr[0];
+ if (depth < 0) ERTS_DDT_FAIL;
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_TUPLE: { /* int */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ if ((int) ptr[0] < 0) ERTS_DDT_FAIL;
+ need += ptr[0]+1; /* vector positions + arityval */
+ depth -= ptr[0];
+ if (depth < 0) ERTS_DDT_FAIL;
+ ptr++;
+ depth++;
+ break;
+ }
+ case ERL_DRV_PID: /* pid argument */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ if (is_not_internal_pid(ptr[0])) ERTS_DDT_FAIL;
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_FLOAT: /* double * */
+ ERTS_DDT_CHK_ENOUGH_ARGS(1);
+ need += FLOAT_SIZE_OBJECT;
+ ptr++;
+ depth++;
+ break;
+ case ERL_DRV_EXT2TERM: { /* char *ext, int size */
+ byte* ext;
+ Sint size;
+ Sint hsz;
+
+ ERTS_DDT_CHK_ENOUGH_ARGS(2);
+ ext = (byte *) ptr[0];
+ size = (Sint) ptr[1];
+ if (!ext || size <= 0)
+ ERTS_DDT_FAIL;
+ if (b2t.len <= b2t.ix)
+ grow_b2t_states(&b2t);
+#ifdef DEBUG
+ b2t.org_ext[b2t.ix] = ext;
+#endif
+ hsz = erts_binary2term_prepare(&b2t.state[b2t.ix++], ext, size);
+ if (hsz < 0)
+ ERTS_DDT_FAIL; /* Invalid data */
+ need += hsz;
+ ptr += 2;
+ depth++;
+ break;
+ }
+ default:
+ ERTS_DDT_FAIL;
+ }
+#undef ERTS_DDT_CHK_ENOUGH_ARGS
+ }
+
+ if ((depth != 1) || (ptr != ptr_end))
+ ERTS_DDT_FAIL;
+
+ b2t.used = b2t.ix;
+ b2t.ix = 0;
+
+ /*
+ * The term is OK. Go ahead and validate the port and process.
+ */
+ res = deliver_term_check_port(port);
+ if (res <= 0)
+ goto done;
+
+ rp = erts_pid2proc_opt(NULL, 0, to, rp_locks, ERTS_P2P_FLG_SMP_INC_REFC);
+ if (!rp) {
+ res = 0;
+ goto done;
+ }
+
+ hp_start = hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ hp_end = hp + need;
+
+ /*
+ * Interpret the instructions and build the term.
+ */
+ ptr = data;
+ while (ptr < ptr_end) {
+ ErlDrvTermData tag = *ptr++;
+
+ switch(tag) {
+ case ERL_DRV_NIL: /* no arguments */
+ mess = NIL;
+ break;
+
+ case ERL_DRV_ATOM: /* atom argument */
+ mess = ptr[0];
+ ptr++;
+ break;
+
+ case ERL_DRV_INT: /* signed int argument */
+ if (IS_SSMALL((Sint)ptr[0]))
+ mess = make_small((Sint)ptr[0]);
+ else {
+ mess = small_to_big((Sint)ptr[0], hp);
+ hp += BIG_UINT_HEAP_SIZE;
+ }
+ ptr++;
+ break;
+
+ case ERL_DRV_UINT: /* unsigned int argument */
+ if (IS_USMALL(0, (Uint)ptr[0]))
+ mess = make_small((Uint)ptr[0]);
+ else {
+ mess = uint_to_big((Uint)ptr[0], hp);
+ hp += BIG_UINT_HEAP_SIZE;
+ }
+ ptr++;
+ break;
+
+ case ERL_DRV_INT64: /* pointer to unsigned 64-bit int argument */
+ mess = erts_bld_sint64(&hp, NULL, *((Sint64 *) ptr[0]));
+ ptr++;
+ break;
+
+ case ERL_DRV_UINT64: /* pointer to unsigned 64-bit int argument */
+ mess = erts_bld_uint64(&hp, NULL, *((Uint64 *) ptr[0]));
+ ptr++;
+ break;
+
+ case ERL_DRV_PORT: /* port argument */
+ mess = ptr[0];
+ ptr++;
+ break;
+
+ case ERL_DRV_BINARY: { /* ErlDrvBinary*, size, offs */
+ ErlDrvBinary* b = (ErlDrvBinary*) ptr[0];
+ Uint size = ptr[1];
+ Uint offset = ptr[2];
+
+ if (size <= ERL_ONHEAP_BIN_LIMIT) {
+ ErlHeapBin* hbp = (ErlHeapBin *) hp;
+ hp += heap_bin_size(size);
+ hbp->thing_word = header_heap_bin(size);
+ hbp->size = size;
+ if (size > 0) {
+ sys_memcpy((void *) hbp->data, (void *) (((byte*) b->orig_bytes) + offset), size);
+ }
+ mess = make_binary(hbp);
+ }
+ else {
+ ProcBin* pb = (ProcBin *) hp;
+ driver_binary_inc_refc(b); /* caller will free binary */
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = size;
+ pb->next = ohp->mso;
+ ohp->mso = pb;
+ pb->val = ErlDrvBinary2Binary(b);
+ pb->bytes = ((byte*) b->orig_bytes) + offset;
+ pb->flags = 0;
+ mess = make_binary(pb);
+ hp += PROC_BIN_SIZE;
+ ohp->overhead += pb->size / sizeof(Eterm);
+ }
+ ptr += 3;
+ break;
+ }
+
+ case ERL_DRV_BUF2BINARY: { /* char*, size */
+ byte *bufp = (byte *) ptr[0];
+ Uint size = (Uint) ptr[1];
+ if (size <= ERL_ONHEAP_BIN_LIMIT) {
+ ErlHeapBin* hbp = (ErlHeapBin *) hp;
+ hp += heap_bin_size(size);
+ hbp->thing_word = header_heap_bin(size);
+ hbp->size = size;
+ if (size > 0) {
+ ASSERT(bufp);
+ sys_memcpy((void *) hbp->data, (void *) bufp, size);
+ }
+ mess = make_binary(hbp);
+ }
+ else {
+ ProcBin* pbp;
+ Binary* bp = erts_bin_nrml_alloc(size);
+ ASSERT(bufp);
+ bp->flags = 0;
+ bp->orig_size = (long) size;
+ erts_refc_init(&bp->refc, 1);
+ sys_memcpy((void *) bp->orig_bytes, (void *) bufp, size);
+ pbp = (ProcBin *) hp;
+ hp += PROC_BIN_SIZE;
+ pbp->thing_word = HEADER_PROC_BIN;
+ pbp->size = size;
+ pbp->next = ohp->mso;
+ ohp->mso = pbp;
+ pbp->val = bp;
+ pbp->bytes = (byte*) bp->orig_bytes;
+ pbp->flags = 0;
+ ohp->overhead += (pbp->size / sizeof(Eterm));
+ mess = make_binary(pbp);
+ }
+ ptr += 2;
+ break;
+ }
+
+ case ERL_DRV_STRING: /* char*, length */
+ mess = buf_to_intlist(&hp, (char*)ptr[0], ptr[1], NIL);
+ ptr += 2;
+ break;
+
+ case ERL_DRV_STRING_CONS: /* char*, length */
+ mess = ESTACK_POP(stack);
+ mess = buf_to_intlist(&hp, (char*)ptr[0], ptr[1], mess);
+ ptr += 2;
+ break;
+
+ case ERL_DRV_LIST: { /* unsigned */
+ Uint i = (int) ptr[0]; /* i > 0 */
+
+ mess = ESTACK_POP(stack);
+ i--;
+ while(i > 0) {
+ Eterm hd = ESTACK_POP(stack);
+
+ mess = CONS(hp, hd, mess);
+ hp += 2;
+ i--;
+ }
+ ptr++;
+ break;
+ }
+
+ case ERL_DRV_TUPLE: { /* int */
+ int size = (int)ptr[0];
+ Eterm* tp = hp;
+
+ *tp = make_arityval(size);
+ mess = make_tuple(tp);
+
+ tp += size; /* point at last element */
+ hp = tp+1; /* advance "heap" pointer */
+
+ while(size--) {
+ *tp-- = ESTACK_POP(stack);
+ }
+ ptr++;
+ break;
+ }
+
+ case ERL_DRV_PID: /* pid argument */
+ mess = ptr[0];
+ ptr++;
+ break;
+
+ case ERL_DRV_FLOAT: { /* double * */
+ FloatDef f;
+
+ mess = make_float(hp);
+ f.fd = *((double *) ptr[0]);
+ PUT_DOUBLE(f, hp);
+ hp += FLOAT_SIZE_OBJECT;
+ ptr++;
+ break;
+ }
+
+ case ERL_DRV_EXT2TERM: /* char *ext, int size */
+ ASSERT(b2t.org_ext[b2t.ix] == (byte *) ptr[0]);
+ mess = erts_binary2term_create(&b2t.state[b2t.ix++], &hp, ohp);
+ if (mess == THE_NON_VALUE)
+ ERTS_DDT_FAIL;
+ ptr += 2;
+ break;
+
+ }
+ ESTACK_PUSH(stack, mess);
+ }
+
+ res = 1;
+
+ done:
+
+ if (res > 0) {
+ mess = ESTACK_POP(stack); /* get resulting value */
+ if (bp)
+ bp = erts_resize_message_buffer(bp, hp - hp_start, &mess, 1);
+ else {
+ ASSERT(hp);
+ HRelease(rp, hp_end, hp);
+ }
+ /* send message */
+ erts_queue_message(rp, &rp_locks, bp, mess, am_undefined);
+ }
+ else {
+ if (b2t.ix > b2t.used)
+ b2t.used = b2t.ix;
+ for (b2t.ix = 0; b2t.ix < b2t.used; b2t.ix++)
+ erts_binary2term_abort(&b2t.state[b2t.ix]);
+ if (bp)
+ free_message_buffer(bp);
+ else if (hp) {
+ HRelease(rp, hp_end, hp);
+ }
+ }
+#ifdef ERTS_SMP
+ if (rp) {
+ if (rp_locks)
+ erts_smp_proc_unlock(rp, rp_locks);
+ erts_smp_proc_dec_refc(rp);
+ }
+#endif
+ cleanup_b2t_states(&b2t);
+ DESTROY_ESTACK(stack);
+ return res;
+#undef ERTS_DDT_FAIL
+}
+
+
+int
+driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len)
+{
+ Port* prt = erts_drvport2port(ix);
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if (prt == NULL)
+ return -1;
+ return driver_deliver_term(ix, prt->connected, data, len);
+}
+
+
+int
+driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len)
+{
+ return driver_deliver_term(ix, to, data, len);
+}
+
+
+/*
+ * Output a binary with hlen bytes from hbuf as list header
+ * and data is len length of bin starting from offset offs.
+ */
+
+int driver_output_binary(ErlDrvPort ix, char* hbuf, int hlen,
+ ErlDrvBinary* bin, int offs, int len)
+{
+ Port* prt = erts_drvport2port(ix);
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if (prt == NULL)
+ return -1;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ if (prt->status & ERTS_PORT_SFLG_CLOSING)
+ return 0;
+
+ prt->bytes_in += (hlen + len);
+ erts_smp_atomic_add(&erts_bytes_in, (long) (hlen + len));
+ if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) {
+ return erts_net_message(prt,
+ prt->dist_entry,
+ (byte*) hbuf, hlen,
+ (byte*) (bin->orig_bytes+offs), len);
+ }
+ else
+ deliver_bin_message(prt, prt->connected,
+ hbuf, hlen, bin, offs, len);
+ return 0;
+}
+
+/* driver_output2:
+** Delivers hlen bytes from hbuf to the port owner as a list;
+** after that, the port settings apply, buf is sent as binary or list.
+**
+** Example: if hlen = 3 then the port owner will receive the data
+** [H1,H2,H3 | T]
+*/
+int driver_output2(ErlDrvPort ix, char* hbuf, int hlen, char* buf, int len)
+{
+ Port* prt = erts_drvport2port(ix);
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if (prt == NULL)
+ return -1;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if (prt->status & ERTS_PORT_SFLG_CLOSING)
+ return 0;
+
+ prt->bytes_in += (hlen + len);
+ erts_smp_atomic_add(&erts_bytes_in, (long) (hlen + len));
+ if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) {
+ if (len == 0)
+ return erts_net_message(prt,
+ prt->dist_entry,
+ NULL, 0,
+ (byte*) hbuf, hlen);
+ else
+ return erts_net_message(prt,
+ prt->dist_entry,
+ (byte*) hbuf, hlen,
+ (byte*) buf, len);
+ }
+ else if(prt->status & ERTS_PORT_SFLG_LINEBUF_IO)
+ deliver_linebuf_message(prt, prt->connected, hbuf, hlen, buf, len);
+ else
+ deliver_read_message(prt, prt->connected, hbuf, hlen, buf, len, 0);
+ return 0;
+}
+
+/* Interface functions available to driver writers */
+
+int driver_output(ErlDrvPort ix, char* buf, int len)
+{
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ return driver_output2(ix, NULL, 0, buf, len);
+}
+
+int driver_outputv(ErlDrvPort ix, char* hbuf, int hlen, ErlIOVec* vec, int skip)
+{
+ int n;
+ int len;
+ int size;
+ SysIOVec* iov;
+ ErlDrvBinary** binv;
+ Port* prt;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ size = vec->size - skip; /* Size of remaining bytes in vector */
+ ASSERT(size >= 0);
+ if (size <= 0)
+ return driver_output2(ix, hbuf, hlen, NULL, 0);
+ ASSERT(hlen >= 0); /* debug only */
+ if (hlen < 0)
+ hlen = 0;
+
+ prt = erts_drvport2port(ix);
+ if (prt == NULL)
+ return -1;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ if (prt->status & ERTS_PORT_SFLG_CLOSING)
+ return 0;
+
+ /* size > 0 ! */
+ iov = vec->iov;
+ binv = vec->binv;
+ n = vec->vsize;
+ /* we use do here to strip iov_len=0 from beginning */
+ do {
+ len = iov->iov_len;
+ if (len <= skip) {
+ skip -= len;
+ iov++;
+ binv++;
+ n--;
+ } else {
+ iov->iov_base += skip;
+ iov->iov_len -= skip;
+ skip = 0;
+ }
+ } while (skip > 0);
+
+ /* XXX handle distribution !!! */
+ prt->bytes_in += (hlen + size);
+ erts_smp_atomic_add(&erts_bytes_in, (long) (hlen + size));
+ deliver_vec_message(prt, prt->connected, hbuf, hlen, binv, iov, n, size);
+ return 0;
+}
+
+/* Copy bytes from a vector into a buffer
+** input is a vector a buffer and a max length
+** return bytes copied
+*/
+int driver_vec_to_buf(vec, buf, len)
+ErlIOVec* vec;
+char* buf;
+int len;
+{
+ SysIOVec* iov = vec->iov;
+ int n = vec->vsize;
+ int orig_len = len;
+
+ while(n--) {
+ int ilen = iov->iov_len;
+ if (ilen < len) {
+ sys_memcpy(buf, iov->iov_base, ilen);
+ len -= ilen;
+ buf += ilen;
+ iov++;
+ }
+ else {
+ sys_memcpy(buf, iov->iov_base, len);
+ return orig_len;
+ }
+ }
+ return (orig_len - len);
+}
+
+
+/*
+ * - driver_alloc_binary() is thread safe (efile driver depend on it).
+ * - driver_realloc_binary(), and driver_free_binary() are *not* thread safe.
+ */
+
+/*
+ * reference count on driver binaries...
+ */
+
+long
+driver_binary_get_refc(ErlDrvBinary *dbp)
+{
+ Binary* bp = ErlDrvBinary2Binary(dbp);
+ return erts_refc_read(&bp->refc, 1);
+}
+
+long
+driver_binary_inc_refc(ErlDrvBinary *dbp)
+{
+ Binary* bp = ErlDrvBinary2Binary(dbp);
+ return erts_refc_inctest(&bp->refc, 2);
+}
+
+long
+driver_binary_dec_refc(ErlDrvBinary *dbp)
+{
+ Binary* bp = ErlDrvBinary2Binary(dbp);
+ return erts_refc_dectest(&bp->refc, 1);
+}
+
+
+/*
+** Allocation/Deallocation of binary objects
+*/
+
+ErlDrvBinary*
+driver_alloc_binary(int size)
+{
+ Binary* bin;
+
+ if (size < 0)
+ return NULL;
+
+ bin = erts_bin_drv_alloc_fnf((Uint) size);
+ if (!bin)
+ return NULL; /* The driver write must take action */
+ bin->flags = BIN_FLAG_DRV;
+ erts_refc_init(&bin->refc, 1);
+ bin->orig_size = (long) size;
+ return Binary2ErlDrvBinary(bin);
+}
+
+/* Reallocate space hold by binary */
+
+ErlDrvBinary* driver_realloc_binary(ErlDrvBinary* bin, int size)
+{
+ Binary* oldbin;
+ Binary* newbin;
+
+ if (!bin || size < 0) {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ erts_dsprintf(dsbufp,
+ "Bad use of driver_realloc_binary(%p, %d): "
+ "called with ",
+ bin, size);
+ if (!bin) {
+ erts_dsprintf(dsbufp, "NULL pointer as first argument");
+ if (size < 0)
+ erts_dsprintf(dsbufp, ", and ");
+ }
+ if (size < 0) {
+ erts_dsprintf(dsbufp, "negative size as second argument");
+ size = 0;
+ }
+ erts_send_warning_to_logger_nogl(dsbufp);
+ if (!bin)
+ return driver_alloc_binary(size);
+ }
+
+ oldbin = ErlDrvBinary2Binary(bin);
+ newbin = (Binary *) erts_bin_realloc_fnf(oldbin, size);
+ if (!newbin)
+ return NULL;
+
+ newbin->orig_size = size;
+ return Binary2ErlDrvBinary(newbin);
+}
+
+
+void driver_free_binary(dbin)
+ErlDrvBinary* dbin;
+{
+ Binary *bin;
+ if (!dbin) {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ erts_dsprintf(dsbufp,
+ "Bad use of driver_free_binary(%p): called with "
+ "NULL pointer as argument", dbin);
+ erts_send_warning_to_logger_nogl(dsbufp);
+ return;
+ }
+
+ bin = ErlDrvBinary2Binary(dbin);
+ if (erts_refc_dectest(&bin->refc, 0) == 0)
+ erts_bin_free(bin);
+}
+
+
+/*
+ * Allocation/deallocation of memory for drivers
+ */
+
+void *driver_alloc(size_t size)
+{
+ return erts_alloc_fnf(ERTS_ALC_T_DRV, (Uint) size);
+}
+
+void *driver_realloc(void *ptr, size_t size)
+{
+ return erts_realloc_fnf(ERTS_ALC_T_DRV, ptr, (Uint) size);
+}
+
+void driver_free(void *ptr)
+{
+ erts_free(ERTS_ALC_T_DRV, ptr);
+}
+
+/*
+ * Port Data Lock
+ */
+
+static void
+pdl_init(void)
+{
+}
+
+static ERTS_INLINE void
+pdl_init_refc(ErlDrvPDL pdl)
+{
+ erts_atomic_init(&pdl->refc, 1);
+}
+
+static ERTS_INLINE long
+pdl_read_refc(ErlDrvPDL pdl)
+{
+ long refc = erts_atomic_read(&pdl->refc);
+ ERTS_LC_ASSERT(refc >= 0);
+ return refc;
+}
+
+static ERTS_INLINE void
+pdl_inc_refc(ErlDrvPDL pdl)
+{
+ erts_atomic_inc(&pdl->refc);
+ ERTS_LC_ASSERT(driver_pdl_get_refc(pdl) > 1);
+}
+
+static ERTS_INLINE long
+pdl_inctest_refc(ErlDrvPDL pdl)
+{
+ long refc = erts_atomic_inctest(&pdl->refc);
+ ERTS_LC_ASSERT(refc > 1);
+ return refc;
+}
+
+#if 0 /* unused */
+static ERTS_INLINE void
+pdl_dec_refc(ErlDrvPDL pdl)
+{
+ erts_atomic_dec(&pdl->refc);
+ ERTS_LC_ASSERT(driver_pdl_get_refc(pdl) > 0);
+}
+#endif
+
+static ERTS_INLINE long
+pdl_dectest_refc(ErlDrvPDL pdl)
+{
+ long refc = erts_atomic_dectest(&pdl->refc);
+ ERTS_LC_ASSERT(refc >= 0);
+ return refc;
+}
+
+static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl)
+{
+ ERTS_LC_ASSERT(driver_pdl_get_refc(pdl) == 0);
+ erts_mtx_destroy(&pdl->mtx);
+ erts_free(ERTS_ALC_T_PORT_DATA_LOCK, pdl);
+}
+
+/*
+ * exported driver_pdl_* functions ...
+ */
+
+ErlDrvPDL
+driver_pdl_create(ErlDrvPort dp)
+{
+ ErlDrvPDL pdl;
+ Port *pp = erts_drvport2port(dp);
+ if (!pp || pp->port_data_lock)
+ return NULL;
+ pdl = erts_alloc(ERTS_ALC_T_PORT_DATA_LOCK,
+ sizeof(struct erl_drv_port_data_lock));
+ erts_mtx_init(&pdl->mtx, "port_data_lock");
+ pdl_init_refc(pdl);
+ pp->port_data_lock = pdl;
+#ifdef HARDDEBUG
+ erts_fprintf(stderr, "driver_pdl_create(%T) -> 0x%08X\r\n",pp->id,(unsigned) pdl);
+#endif
+ return pdl;
+}
+
+void
+driver_pdl_lock(ErlDrvPDL pdl)
+{
+#ifdef HARDDEBUG
+ erts_fprintf(stderr, "driver_pdl_lock(0x%08X)\r\n",(unsigned) pdl);
+#endif
+ pdl_inc_refc(pdl);
+ erts_mtx_lock(&pdl->mtx);
+}
+
+void
+driver_pdl_unlock(ErlDrvPDL pdl)
+{
+ long refc;
+#ifdef HARDDEBUG
+ erts_fprintf(stderr, "driver_pdl_unlock(0x%08X)\r\n",(unsigned) pdl);
+#endif
+ erts_mtx_unlock(&pdl->mtx);
+ refc = pdl_dectest_refc(pdl);
+ if (!refc)
+ pdl_destroy(pdl);
+}
+
+long
+driver_pdl_get_refc(ErlDrvPDL pdl)
+{
+ return pdl_read_refc(pdl);
+}
+
+long
+driver_pdl_inc_refc(ErlDrvPDL pdl)
+{
+ long refc = pdl_inctest_refc(pdl);
+#ifdef HARDDEBUG
+ erts_fprintf(stderr, "driver_pdl_inc_refc(0x%08X) -> %ld\r\n",(unsigned) pdl, refc);
+#endif
+ return refc;
+}
+
+long
+driver_pdl_dec_refc(ErlDrvPDL pdl)
+{
+ long refc = pdl_dectest_refc(pdl);
+#ifdef HARDDEBUG
+ erts_fprintf(stderr, "driver_pdl_dec_refc(0x%08X) -> %ld\r\n",(unsigned) pdl, refc);
+#endif
+ if (!refc)
+ pdl_destroy(pdl);
+ return refc;
+}
+
+/* expand queue to hold n elements in tail or head */
+static int expandq(ErlIOQueue* q, int n, int tail)
+/* tail: 0 if make room in head, make room in tail otherwise */
+{
+ int h_sz; /* room before header */
+ int t_sz; /* room after tail */
+ int q_sz; /* occupied */
+ int nvsz;
+ SysIOVec* niov;
+ ErlDrvBinary** nbinv;
+
+ h_sz = q->v_head - q->v_start;
+ t_sz = q->v_end - q->v_tail;
+ q_sz = q->v_tail - q->v_head;
+
+ if (tail && (n <= t_sz)) /* do we need to expand tail? */
+ return 0;
+ else if (!tail && (n <= h_sz)) /* do we need to expand head? */
+ return 0;
+ else if (n > (h_sz + t_sz)) { /* need to allocate */
+ /* we may get little extra but it ok */
+ nvsz = (q->v_end - q->v_start) + n;
+
+ niov = erts_alloc_fnf(ERTS_ALC_T_IOQ, nvsz * sizeof(SysIOVec));
+ if (!niov)
+ return -1;
+ nbinv = erts_alloc_fnf(ERTS_ALC_T_IOQ, nvsz * sizeof(ErlDrvBinary**));
+ if (!nbinv) {
+ erts_free(ERTS_ALC_T_IOQ, (void *) niov);
+ return -1;
+ }
+ if (tail) {
+ sys_memcpy(niov, q->v_head, q_sz*sizeof(SysIOVec));
+ if (q->v_start != q->v_small)
+ erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
+ q->v_start = niov;
+ q->v_end = niov + nvsz;
+ q->v_head = q->v_start;
+ q->v_tail = q->v_head + q_sz;
+
+ sys_memcpy(nbinv, q->b_head, q_sz*sizeof(ErlDrvBinary*));
+ if (q->b_start != q->b_small)
+ erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
+ q->b_start = nbinv;
+ q->b_end = nbinv + nvsz;
+ q->b_head = q->b_start;
+ q->b_tail = q->b_head + q_sz;
+ }
+ else {
+ sys_memcpy(niov+nvsz-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
+ if (q->v_start != q->v_small)
+ erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
+ q->v_start = niov;
+ q->v_end = niov + nvsz;
+ q->v_tail = q->v_end;
+ q->v_head = q->v_tail - q_sz;
+
+ sys_memcpy(nbinv+nvsz-q_sz, q->b_head, q_sz*sizeof(ErlDrvBinary*));
+ if (q->b_start != q->b_small)
+ erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
+ q->b_start = nbinv;
+ q->b_end = nbinv + nvsz;
+ q->b_tail = q->b_end;
+ q->b_head = q->b_tail - q_sz;
+ }
+ }
+ else if (tail) { /* move to beginning to make room in tail */
+ sys_memmove(q->v_start, q->v_head, q_sz*sizeof(SysIOVec));
+ q->v_head = q->v_start;
+ q->v_tail = q->v_head + q_sz;
+ sys_memmove(q->b_start, q->b_head, q_sz*sizeof(ErlDrvBinary*));
+ q->b_head = q->b_start;
+ q->b_tail = q->b_head + q_sz;
+ }
+ else { /* move to end to make room */
+ sys_memmove(q->v_end-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
+ q->v_tail = q->v_end;
+ q->v_head = q->v_tail-q_sz;
+ sys_memmove(q->b_end-q_sz, q->b_head, q_sz*sizeof(ErlDrvBinary*));
+ q->b_tail = q->b_end;
+ q->b_head = q->b_tail-q_sz;
+ }
+
+ return 0;
+}
+
+
+
+/* Put elements from vec at q tail */
+int driver_enqv(ErlDrvPort ix, ErlIOVec* vec, int skip)
+{
+ int n;
+ int len;
+ int size;
+ SysIOVec* iov;
+ ErlDrvBinary** binv;
+ ErlDrvBinary* b;
+ ErlIOQueue* q = drvport2ioq(ix);
+
+ if (q == NULL)
+ return -1;
+
+ size = vec->size - skip;
+ ASSERT(size >= 0); /* debug only */
+ if (size <= 0)
+ return 0;
+
+ iov = vec->iov;
+ binv = vec->binv;
+ n = vec->vsize;
+
+ /* we use do here to strip iov_len=0 from beginning */
+ do {
+ len = iov->iov_len;
+ if (len <= skip) {
+ skip -= len;
+ iov++;
+ binv++;
+ n--;
+ }
+ else {
+ iov->iov_base += skip;
+ iov->iov_len -= skip;
+ skip = 0;
+ }
+ } while(skip > 0);
+
+ if (q->v_tail + n >= q->v_end)
+ expandq(q, n, 1);
+
+ /* Queue and reference all binaries (remove zero length items) */
+ while(n--) {
+ if ((len = iov->iov_len) > 0) {
+ if ((b = *binv) == NULL) { /* speical case create binary ! */
+ b = driver_alloc_binary(len);
+ sys_memcpy(b->orig_bytes, iov->iov_base, len);
+ *q->b_tail++ = b;
+ q->v_tail->iov_len = len;
+ q->v_tail->iov_base = b->orig_bytes;
+ q->v_tail++;
+ }
+ else {
+ driver_binary_inc_refc(b);
+ *q->b_tail++ = b;
+ *q->v_tail++ = *iov;
+ }
+ }
+ iov++;
+ binv++;
+ }
+ q->size += size; /* update total size in queue */
+ return 0;
+}
+
+/* Put elements from vec at q head */
+int driver_pushqv(ErlDrvPort ix, ErlIOVec* vec, int skip)
+{
+ int n;
+ int len;
+ int size;
+ SysIOVec* iov;
+ ErlDrvBinary** binv;
+ ErlDrvBinary* b;
+ ErlIOQueue* q = drvport2ioq(ix);
+
+ if (q == NULL)
+ return -1;
+
+ if ((size = vec->size - skip) <= 0)
+ return 0;
+ iov = vec->iov;
+ binv = vec->binv;
+ n = vec->vsize;
+
+ /* we use do here to strip iov_len=0 from beginning */
+ do {
+ len = iov->iov_len;
+ if (len <= skip) {
+ skip -= len;
+ iov++;
+ binv++;
+ n--;
+ }
+ else {
+ iov->iov_base += skip;
+ iov->iov_len -= skip;
+ skip = 0;
+ }
+ } while(skip > 0);
+
+ if (q->v_head - n < q->v_start)
+ expandq(q, n, 0);
+
+ /* Queue and reference all binaries (remove zero length items) */
+ iov += (n-1); /* move to end */
+ binv += (n-1); /* move to end */
+ while(n--) {
+ if ((len = iov->iov_len) > 0) {
+ if ((b = *binv) == NULL) { /* speical case create binary ! */
+ b = driver_alloc_binary(len);
+ sys_memcpy(b->orig_bytes, iov->iov_base, len);
+ *--q->b_head = b;
+ q->v_head--;
+ q->v_head->iov_len = len;
+ q->v_head->iov_base = b->orig_bytes;
+ }
+ else {
+ driver_binary_inc_refc(b);
+ *--q->b_head = b;
+ *--q->v_head = *iov;
+ }
+ }
+ iov--;
+ binv--;
+ }
+ q->size += size; /* update total size in queue */
+ return 0;
+}
+
+
+/*
+** Remove size bytes from queue head
+** Return number of bytes that remain in queue
+*/
+int driver_deq(ErlDrvPort ix, int size)
+{
+ ErlIOQueue* q = drvport2ioq(ix);
+ int len;
+ int sz;
+
+ if ((q == NULL) || (sz = (q->size - size)) < 0)
+ return -1;
+ q->size = sz;
+ while (size > 0) {
+ ASSERT(q->v_head != q->v_tail);
+
+ len = q->v_head->iov_len;
+ if (len <= size) {
+ size -= len;
+ driver_free_binary(*q->b_head);
+ *q->b_head++ = NULL;
+ q->v_head++;
+ }
+ else {
+ q->v_head->iov_base += size;
+ q->v_head->iov_len -= size;
+ size = 0;
+ }
+ }
+
+ /* restart pointers (optimised for enq) */
+ if (q->v_head == q->v_tail) {
+ q->v_head = q->v_tail = q->v_start;
+ q->b_head = q->b_tail = q->b_start;
+ }
+ return sz;
+}
+
+
+int driver_peekqv(ErlDrvPort ix, ErlIOVec *ev) {
+ ErlIOQueue *q = drvport2ioq(ix);
+ ASSERT(ev);
+
+ if (! q) {
+ return -1;
+ } else {
+ if ((ev->vsize = q->v_tail - q->v_head) == 0) {
+ ev->size = 0;
+ ev->iov = NULL;
+ ev->binv = NULL;
+ } else {
+ ev->size = q->size;
+ ev->iov = q->v_head;
+ ev->binv = q->b_head;
+ }
+ return q->size;
+ }
+}
+
+SysIOVec* driver_peekq(ErlDrvPort ix, int* vlenp) /* length of io-vector */
+{
+ ErlIOQueue* q = drvport2ioq(ix);
+
+ if (q == NULL) {
+ *vlenp = -1;
+ return NULL;
+ }
+ if ((*vlenp = (q->v_tail - q->v_head)) == 0)
+ return NULL;
+ return q->v_head;
+}
+
+
+int driver_sizeq(ErlDrvPort ix)
+{
+ ErlIOQueue* q = drvport2ioq(ix);
+
+ if (q == NULL)
+ return -1;
+ return q->size;
+}
+
+
+/* Utils */
+
+/* Enqueue a binary */
+int driver_enq_bin(ErlDrvPort ix, ErlDrvBinary* bin, int offs, int len)
+{
+ SysIOVec iov;
+ ErlIOVec ev;
+
+ ASSERT(len >= 0);
+ if (len == 0)
+ return 0;
+ iov.iov_base = bin->orig_bytes + offs;
+ iov.iov_len = len;
+ ev.vsize = 1;
+ ev.size = len;
+ ev.iov = &iov;
+ ev.binv = &bin;
+ return driver_enqv(ix, &ev, 0);
+}
+
+int driver_enq(ErlDrvPort ix, char* buffer, int len)
+{
+ int code;
+ ErlDrvBinary* bin;
+
+ ASSERT(len >= 0);
+ if (len == 0)
+ return 0;
+ if ((bin = driver_alloc_binary(len)) == NULL)
+ return -1;
+ sys_memcpy(bin->orig_bytes, buffer, len);
+ code = driver_enq_bin(ix, bin, 0, len);
+ driver_free_binary(bin); /* dereference */
+ return code;
+}
+
+int driver_pushq_bin(ErlDrvPort ix, ErlDrvBinary* bin, int offs, int len)
+{
+ SysIOVec iov;
+ ErlIOVec ev;
+
+ ASSERT(len >= 0);
+ if (len == 0)
+ return 0;
+ iov.iov_base = bin->orig_bytes + offs;
+ iov.iov_len = len;
+ ev.vsize = 1;
+ ev.size = len;
+ ev.iov = &iov;
+ ev.binv = &bin;
+ return driver_pushqv(ix, &ev, 0);
+}
+
+int driver_pushq(ErlDrvPort ix, char* buffer, int len)
+{
+ int code;
+ ErlDrvBinary* bin;
+
+ ASSERT(len >= 0);
+ if (len == 0)
+ return 0;
+
+ if ((bin = driver_alloc_binary(len)) == NULL)
+ return -1;
+ sys_memcpy(bin->orig_bytes, buffer, len);
+ code = driver_pushq_bin(ix, bin, 0, len);
+ driver_free_binary(bin); /* dereference */
+ return code;
+}
+
+static ERTS_INLINE void
+drv_cancel_timer(Port *prt)
+{
+#ifdef ERTS_SMP
+ erts_cancel_smp_ptimer(prt->ptimer);
+#else
+ erl_cancel_timer(&prt->tm);
+#endif
+ if (erts_port_task_is_scheduled(&prt->timeout_task))
+ erts_port_task_abort(prt->id, &prt->timeout_task);
+}
+
+int driver_set_timer(ErlDrvPort ix, Uint t)
+{
+ Port* prt = erts_drvport2port(ix);
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if (prt == NULL)
+ return -1;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ if (prt->drv_ptr->timeout == NULL)
+ return -1;
+ drv_cancel_timer(prt);
+#ifdef ERTS_SMP
+ erts_create_smp_ptimer(&prt->ptimer,
+ prt->id,
+ (ErlTimeoutProc) schedule_port_timeout,
+ t);
+#else
+ erl_set_timer(&prt->tm,
+ (ErlTimeoutProc) schedule_port_timeout,
+ NULL,
+ prt,
+ t);
+#endif
+ return 0;
+}
+
+int driver_cancel_timer(ErlDrvPort ix)
+{
+ Port* prt = erts_drvport2port(ix);
+ if (prt == NULL)
+ return -1;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ drv_cancel_timer(prt);
+ return 0;
+}
+
+
+int
+driver_read_timer(ErlDrvPort ix, unsigned long* t)
+{
+ Port* prt = erts_drvport2port(ix);
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if (prt == NULL)
+ return -1;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+#ifdef ERTS_SMP
+ *t = prt->ptimer ? time_left(&prt->ptimer->timer.tm) : 0;
+#else
+ *t = time_left(&prt->tm);
+#endif
+ return 0;
+}
+
+int
+driver_get_now(ErlDrvNowData *now_data)
+{
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if (now_data == NULL) {
+ return -1;
+ }
+ get_now(&(now_data->megasecs),&(now_data->secs),&(now_data->microsecs));
+ return 0;
+}
+
+static void ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon)
+{
+ RefThing *refp;
+ ASSERT(is_internal_ref(ref));
+ ASSERT(sizeof(RefThing) <= sizeof(ErlDrvMonitor));
+ refp = ref_thing_ptr(ref);
+ memset(mon,0,sizeof(ErlDrvMonitor));
+ memcpy(mon,refp,sizeof(RefThing));
+}
+
+int driver_monitor_process(ErlDrvPort port,
+ ErlDrvTermData process,
+ ErlDrvMonitor *monitor)
+{
+ Port *prt = erts_drvport2port(port);
+ Process *rp;
+ Eterm ref;
+ Eterm buf[REF_THING_SIZE];
+ if (prt->drv_ptr->process_exit == NULL) {
+ return -1;
+ }
+ rp = erts_pid2proc_opt(NULL, 0,
+ (Eterm) process, ERTS_PROC_LOCK_LINK,
+ ERTS_P2P_FLG_ALLOW_OTHER_X);
+ if (!rp) {
+ return 1;
+ }
+ ref = erts_make_ref_in_buffer(buf);
+ erts_add_monitor(&(prt->monitors), MON_ORIGIN, ref, rp->id, NIL);
+ erts_add_monitor(&(rp->monitors), MON_TARGET, ref, prt->id, NIL);
+
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ ref_to_driver_monitor(ref,monitor);
+ return 0;
+}
+
+int driver_demonitor_process(ErlDrvPort port,
+ const ErlDrvMonitor *monitor)
+{
+ Port *prt = erts_drvport2port(port);
+ Process *rp;
+ Eterm ref;
+ Eterm buf[REF_THING_SIZE];
+ ErtsMonitor *mon;
+ Eterm to;
+
+ memcpy(buf,monitor,sizeof(Eterm)*REF_THING_SIZE);
+ ref = make_internal_ref(buf);
+ mon = erts_lookup_monitor(prt->monitors, ref);
+ if (mon == NULL) {
+ return 1;
+ }
+ ASSERT(mon->type == MON_ORIGIN);
+ to = mon->pid;
+ ASSERT(is_internal_pid(to));
+ rp = erts_pid2proc_opt(NULL,
+ 0,
+ to,
+ ERTS_PROC_LOCK_LINK,
+ ERTS_P2P_FLG_ALLOW_OTHER_X);
+ mon = erts_remove_monitor(&(prt->monitors), ref);
+ if (mon) {
+ erts_destroy_monitor(mon);
+ }
+ if (rp) {
+ ErtsMonitor *rmon;
+ rmon = erts_remove_monitor(&(rp->monitors), ref);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ if (rmon != NULL) {
+ erts_destroy_monitor(rmon);
+ }
+ }
+ return 0;
+}
+
+ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
+ const ErlDrvMonitor *monitor)
+{
+ Port *prt = erts_drvport2port(port);
+ Eterm ref;
+ Eterm buf[REF_THING_SIZE];
+ ErtsMonitor *mon;
+ Eterm to;
+
+ memcpy(buf,monitor,sizeof(Eterm)*REF_THING_SIZE);
+ ref = make_internal_ref(buf);
+ mon = erts_lookup_monitor(prt->monitors, ref);
+ if (mon == NULL) {
+ return driver_term_nil;
+ }
+ ASSERT(mon->type == MON_ORIGIN);
+ to = mon->pid;
+ ASSERT(is_internal_pid(to));
+ return (ErlDrvTermData) to;
+}
+
+int driver_compare_monitors(const ErlDrvMonitor *monitor1,
+ const ErlDrvMonitor *monitor2)
+{
+ return memcmp(monitor1,monitor2,sizeof(ErlDrvMonitor));
+}
+
+void erts_fire_port_monitor(Port *prt, Eterm ref)
+{
+ ErtsMonitor *rmon;
+ void (*callback)(ErlDrvData drv_data, ErlDrvMonitor *monitor);
+ ErlDrvMonitor drv_monitor;
+ int fpe_was_unmasked;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ASSERT(prt->drv_ptr != NULL);
+
+ if (erts_lookup_monitor(prt->monitors,ref) == NULL) {
+ return;
+ }
+ callback = prt->drv_ptr->process_exit;
+ ASSERT(callback != NULL);
+ ref_to_driver_monitor(ref,&drv_monitor);
+ fpe_was_unmasked = erts_block_fpe();
+ (*callback)((ErlDrvData) (prt->drv_data), &drv_monitor);
+ erts_unblock_fpe(fpe_was_unmasked);
+ /* remove monitor *after* callback */
+ rmon = erts_remove_monitor(&(prt->monitors),ref);
+ if (rmon) {
+ erts_destroy_monitor(rmon);
+ }
+}
+
+
+static int
+driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
+{
+ Port* prt = erts_drvport2port(ix);
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if (prt == NULL)
+ return -1;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ if (eof)
+ flush_linebuf_messages(prt);
+ if (prt->status & ERTS_PORT_SFLG_CLOSING) {
+ terminate_port(prt);
+ } else if (eof && (prt->status & ERTS_PORT_SFLG_SOFT_EOF)) {
+ deliver_result(prt->id, prt->connected, am_eof);
+ } else {
+ /* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */
+ if (prt->port_data_lock)
+ driver_pdl_lock(prt->port_data_lock);
+ prt->ioq.size = 0;
+ if (prt->port_data_lock)
+ driver_pdl_unlock(prt->port_data_lock);
+ erts_do_exit_port(prt, prt->id, eof ? am_normal : term);
+ }
+ return 0;
+}
+
+
+
+/*
+** Do a (soft) exit. unlink the connected process before doing
+** driver posix error or (normal)
+*/
+int driver_exit(ErlDrvPort ix, int err)
+{
+ Port* prt = erts_drvport2port(ix);
+ Process* rp;
+ ErtsLink *lnk, *rlnk = NULL;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ if (prt == NULL)
+ return -1;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK);
+ if (rp) {
+ rlnk = erts_remove_link(&(rp->nlinks),prt->id);
+ }
+
+ lnk = erts_remove_link(&(prt->nlinks),prt->connected);
+
+#ifdef ERTS_SMP
+ if (rp)
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+#endif
+
+ if (rlnk != NULL) {
+ erts_destroy_link(rlnk);
+ }
+
+ if (lnk != NULL) {
+ erts_destroy_link(lnk);
+ }
+
+ if (err == 0)
+ return driver_failure_term(ix, am_normal, 0);
+ else {
+ char* err_str = erl_errno_id(err);
+ Eterm am_err = am_atom_put(err_str, sys_strlen(err_str));
+ return driver_failure_term(ix, am_err, 0);
+ }
+}
+
+
+int driver_failure(ErlDrvPort ix, int code)
+{
+ return driver_failure_term(ix, make_small(code), code == 0);
+}
+
+int driver_failure_atom(ErlDrvPort ix, char* string)
+{
+ Eterm am = am_atom_put(string, strlen(string));
+ return driver_failure_term(ix, am, 0);
+}
+
+int driver_failure_posix(ErlDrvPort ix, int err)
+{
+ return driver_failure_atom(ix, erl_errno_id(err));
+}
+
+int driver_failure_eof(ErlDrvPort ix)
+{
+ return driver_failure_term(ix, NIL, 1);
+}
+
+
+
+ErlDrvTermData driver_mk_atom(char* string)
+{
+ Eterm am = am_atom_put(string, sys_strlen(string));
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ return (ErlDrvTermData) am;
+}
+
+ErlDrvTermData driver_mk_port(ErlDrvPort ix)
+{
+ Port* prt = erts_drvport2port(ix);
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ return (ErlDrvTermData) prt->id;
+}
+
+ErlDrvTermData driver_connected(ErlDrvPort ix)
+{
+ Port* prt = erts_drvport2port(ix);
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ if (prt == NULL)
+ return NIL;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ return prt->connected;
+}
+
+ErlDrvTermData driver_caller(ErlDrvPort ix)
+{
+ Port* prt = erts_drvport2port(ix);
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+ if (prt == NULL)
+ return NIL;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ return prt->caller;
+}
+
+int driver_lock_driver(ErlDrvPort ix)
+{
+ Port* prt = erts_drvport2port(ix);
+ DE_Handle* dh;
+
+ ERTS_SMP_CHK_NO_PROC_LOCKS;
+
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+
+ if (prt == NULL) return -1;
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ if ((dh = (DE_Handle*)prt->drv_ptr->handle ) == NULL) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ return -1;
+ }
+ erts_ddll_lock_driver(dh, prt->drv_ptr->name);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ return 0;
+}
+
+
+static int maybe_lock_driver_list(void)
+{
+ void *rec_lock;
+ rec_lock = erts_smp_tsd_get(driver_list_lock_status_key);
+ if (rec_lock == 0) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ return 1;
+ }
+ return 0;
+}
+static void maybe_unlock_driver_list(int doit)
+{
+ if (doit) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+}
+/*
+ These old interfaces are certainly not MT friendly. Hopefully they are only used internally,
+ but you never know, so they are kept for BC. As The sys ddll code has no notion
+ of locking, I use the driver list lock to mutex this from the code in erl_bif_ddll.c.
+ To allow dynamic code loading in the init functions of a driver, recursive locking is
+ handled as in add_driver_entry etc.
+ A TSD variable holds the last error for a thread, so that code like
+ ...
+ x = driver_dl_open(...);
+ if (x == NULL)
+ y = driver_dl_error();
+ ...
+ works as long as execution happens in one driver callback even in an SMP emulator.
+ Writing code using these interfaces spanning several driver callbacks between loading/lookup
+ and error handling may give undesired results...
+*/
+void *driver_dl_open(char * path)
+{
+ void *ptr;
+ int res;
+ int *last_error_p = erts_smp_tsd_get(driver_list_last_error_key);
+ int locked = maybe_lock_driver_list();
+ if ((res = erts_sys_ddll_open(path, &ptr)) == 0) {
+ maybe_unlock_driver_list(locked);
+ return ptr;
+ } else {
+ if (!last_error_p) {
+ last_error_p = erts_alloc(ERTS_ALC_T_DDLL_ERRCODES, sizeof(int));
+ erts_smp_tsd_set(driver_list_last_error_key,last_error_p);
+ }
+ *last_error_p = res;
+ maybe_unlock_driver_list(locked);
+ return NULL;
+ }
+}
+
+void *driver_dl_sym(void * handle, char *func_name)
+{
+ void *ptr;
+ int res;
+ int *last_error_p = erts_smp_tsd_get(driver_list_lock_status_key);
+ int locked = maybe_lock_driver_list();
+ if ((res = erts_sys_ddll_sym(handle, func_name, &ptr)) == 0) {
+ maybe_unlock_driver_list(locked);
+ return ptr;
+ } else {
+ if (!last_error_p) {
+ last_error_p = erts_alloc(ERTS_ALC_T_DDLL_ERRCODES, sizeof(int));
+ erts_smp_tsd_set(driver_list_lock_status_key,last_error_p);
+ }
+ *last_error_p = res;
+ maybe_unlock_driver_list(locked);
+ return NULL;
+ }
+}
+
+int driver_dl_close(void *handle)
+{
+ int res;
+ int locked = maybe_lock_driver_list();
+ res = erts_sys_ddll_close(handle);
+ maybe_unlock_driver_list(locked);
+ return res;
+}
+
+char *driver_dl_error(void)
+{
+ char *res;
+ int *last_error_p = erts_smp_tsd_get(driver_list_lock_status_key);
+ int locked = maybe_lock_driver_list();
+ res = erts_ddll_error((last_error_p != NULL) ? (*last_error_p) : ERL_DE_ERROR_UNSPECIFIED);
+ maybe_unlock_driver_list(locked);
+ return res;
+}
+
+
+#define ERL_DRV_SYS_INFO_SIZE(LAST_FIELD) \
+ (((size_t) &((ErlDrvSysInfo *) 0)->LAST_FIELD) \
+ + sizeof(((ErlDrvSysInfo *) 0)->LAST_FIELD))
+
+void
+driver_system_info(ErlDrvSysInfo *sip, size_t si_size)
+{
+ /*
+ * When adding fields in the ErlDrvSysInfo struct
+ * remember to increment ERL_DRV_EXTENDED_MINOR_VERSION
+ */
+
+ /*
+ * 'smp_support' is the last field in the first version
+ * of ErlDrvSysInfo (introduced in driver version 1.0).
+ */
+ if (!sip || si_size < ERL_DRV_SYS_INFO_SIZE(smp_support))
+ erl_exit(1,
+ "driver_system_info(%p, %ld) called with invalid arguments\n",
+ sip, si_size);
+
+ /*
+ * 'smp_support' is the last field in the first version
+ * of ErlDrvSysInfo (introduced in driver version 1.0).
+ */
+ if (si_size >= ERL_DRV_SYS_INFO_SIZE(smp_support)) {
+ sip->driver_major_version = ERL_DRV_EXTENDED_MAJOR_VERSION;
+ sip->driver_minor_version = ERL_DRV_EXTENDED_MINOR_VERSION;
+ sip->erts_version = ERLANG_VERSION;
+ sip->otp_release = ERLANG_OTP_RELEASE;
+ sip->thread_support =
+#ifdef USE_THREADS
+ 1
+#else
+ 0
+#endif
+ ;
+ sip->smp_support =
+#ifdef ERTS_SMP
+ 1
+#else
+ 0
+#endif
+ ;
+
+ }
+
+ /*
+ * 'scheduler_threads' is the last field in the second version
+ * of ErlDrvSysInfo (introduced in driver version 1.1).
+ */
+ if (si_size >= ERL_DRV_SYS_INFO_SIZE(scheduler_threads)) {
+ sip->async_threads = erts_async_max_threads;
+ sip->scheduler_threads = erts_no_schedulers;
+ }
+
+}
+
+
+static ERTS_INLINE Port *
+get_current_port(void)
+{
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ ASSERT(esdp);
+ ASSERT(esdp->current_port);
+ return esdp->current_port;
+}
+
+/*
+ * Default callbacks used if not supplied by driver.
+ */
+
+static void
+no_output_callback(ErlDrvData drv_data, char *buf, int len)
+{
+
+}
+
+static void
+no_event_callback(ErlDrvData drv_data, ErlDrvEvent event, ErlDrvEventData event_data)
+{
+ Port *prt = get_current_port();
+ report_missing_drv_callback(prt, "Event", "event()");
+ driver_event((ErlDrvPort) internal_port_index(prt->id), event, NULL);
+}
+
+static void
+no_ready_input_callback(ErlDrvData drv_data, ErlDrvEvent event)
+{
+ Port *prt = get_current_port();
+ report_missing_drv_callback(prt, "Input", "ready_input()");
+ driver_select((ErlDrvPort) internal_port_index(prt->id), event,
+ (ERL_DRV_READ | ERL_DRV_USE_NO_CALLBACK), 0);
+}
+
+static void
+no_ready_output_callback(ErlDrvData drv_data, ErlDrvEvent event)
+{
+ Port *prt = get_current_port();
+ report_missing_drv_callback(prt, "Output", "ready_output()");
+ driver_select((ErlDrvPort) internal_port_index(prt->id), event,
+ (ERL_DRV_WRITE | ERL_DRV_USE_NO_CALLBACK), 0);
+}
+
+static void
+no_timeout_callback(ErlDrvData drv_data)
+{
+
+}
+
+static void
+no_stop_select_callback(ErlDrvEvent event, void* private)
+{
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ erts_dsprintf(dsbufp, "Driver does not implement stop_select callback "
+ "(event=%ld, private=%p)!\n", (long)event, private);
+ erts_send_error_to_logger_nogl(dsbufp);
+}
+
+
+static int
+init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
+{
+ drv->name = de->driver_name;
+ if (de->extended_marker == ERL_DRV_EXTENDED_MARKER) {
+ drv->version.major = de->major_version;
+ drv->version.minor = de->minor_version;
+ drv->flags = de->driver_flags;
+ }
+ else {
+ drv->version.major = 0;
+ drv->version.minor = 0;
+ drv->flags = 0;
+ }
+ drv->handle = handle;
+#ifdef ERTS_SMP
+ if (drv->flags & ERL_DRV_FLAG_USE_PORT_LOCKING)
+ drv->lock = NULL;
+ else {
+ drv->lock = erts_alloc(ERTS_ALC_T_DRIVER_LOCK,
+ sizeof(erts_smp_mtx_t));
+ erts_smp_mtx_init_x(drv->lock,
+ "driver_lock",
+#if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT)
+ am_atom_put(drv->name, sys_strlen(drv->name))
+#else
+ NIL
+#endif
+ );
+ }
+#endif
+ drv->entry = de;
+
+ drv->start = de->start;
+ drv->stop = de->stop;
+ drv->finish = de->finish;
+ drv->flush = de->flush;
+ drv->output = de->output ? de->output : no_output_callback;
+ drv->outputv = de->outputv;
+ drv->control = de->control;
+ drv->call = de->call;
+ drv->event = de->event ? de->event : no_event_callback;
+ drv->ready_input = de->ready_input ? de->ready_input : no_ready_input_callback;
+ drv->ready_output = de->ready_output ? de->ready_output : no_ready_output_callback;
+ drv->timeout = de->timeout ? de->timeout : no_timeout_callback;
+ drv->ready_async = de->ready_async;
+ if (de->extended_marker == ERL_DRV_EXTENDED_MARKER)
+ drv->process_exit = de->process_exit;
+ else
+ drv->process_exit = NULL;
+ if (de->minor_version >= 3/*R13A*/ && de->stop_select)
+ drv->stop_select = de->stop_select;
+ else
+ drv->stop_select = no_stop_select_callback;
+
+ if (!de->init)
+ return 0;
+ else {
+ int res;
+ int fpe_was_unmasked = erts_block_fpe();
+ res = (*de->init)();
+ erts_unblock_fpe(fpe_was_unmasked);
+ return res;
+ }
+}
+
+void
+erts_destroy_driver(erts_driver_t *drv)
+{
+#ifdef ERTS_SMP
+ if (drv->lock) {
+ erts_smp_mtx_destroy(drv->lock);
+ erts_free(ERTS_ALC_T_DRIVER_LOCK, drv->lock);
+ }
+#endif
+ erts_free(ERTS_ALC_T_DRIVER, drv);
+}
+
+/*
+ * Functions for maintaining a list of driver_entry struct
+ * Exposed in the driver interface, and therefore possibly locking directly.
+ */
+
+void add_driver_entry(ErlDrvEntry *drv){
+ void *rec_lock;
+ rec_lock = erts_smp_tsd_get(driver_list_lock_status_key);
+ /*
+ * Ignore result of erts_add_driver_entry, the init is not
+ * allowed to fail when drivers are added by drivers.
+ */
+ erts_add_driver_entry(drv, NULL, rec_lock != NULL);
+}
+
+int erts_add_driver_entry(ErlDrvEntry *de, DE_Handle *handle, int driver_list_locked)
+{
+ erts_driver_t *dp = erts_alloc(ERTS_ALC_T_DRIVER, sizeof(erts_driver_t));
+ int res;
+
+ if (!driver_list_locked) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ }
+
+ dp->next = driver_list;
+ dp->prev = NULL;
+ if (driver_list != NULL) {
+ driver_list->prev = dp;
+ }
+ driver_list = dp;
+
+ if (!driver_list_locked) {
+ erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1);
+ }
+
+ res = init_driver(dp, de, handle);
+
+ if (res != 0) {
+ /*
+ * Remove it all again...
+ */
+ driver_list = dp->next;
+ if (driver_list != NULL) {
+ driver_list->prev = NULL;
+ }
+ erts_destroy_driver(dp);
+ }
+
+ if (!driver_list_locked) {
+ erts_smp_tsd_set(driver_list_lock_status_key, NULL);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+ return res;
+}
+
+/* Not allowed for dynamic drivers */
+int remove_driver_entry(ErlDrvEntry *drv)
+{
+ erts_driver_t *dp;
+ void *rec_lock;
+
+ rec_lock = erts_smp_tsd_get(driver_list_lock_status_key);
+ if (rec_lock == NULL) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ }
+ dp = driver_list;
+ while (dp && dp->entry != drv)
+ dp = dp->next;
+ if (dp) {
+ if (dp->handle) {
+ if (rec_lock == NULL) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+ return -1;
+ }
+ if (dp->prev == NULL) {
+ driver_list = dp->next;
+ } else {
+ dp->prev->next = dp->next;
+ }
+ if (dp->next != NULL) {
+ dp->next->prev = dp->prev;
+ }
+ erts_destroy_driver(dp);
+ if (rec_lock == NULL) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+ return 1;
+ }
+ if (rec_lock == NULL) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+ return 0;
+}
+
+/* very useful function that can be used in entries that are not used
+ * so that not every driver writer must supply a personal version
+ */
+int null_func(void)
+{
+ return 0;
+}
+
+int
+erl_drv_putenv(char *key, char *value)
+{
+ return erts_write_env(key, value);
+}
+
+int
+erl_drv_getenv(char *key, char *value, size_t *value_size)
+{
+ return erts_sys_getenv(key, value, value_size);
+}