aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c876
1 files changed, 383 insertions, 493 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index b772783f64..4dcec356a9 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -57,15 +57,14 @@ static erts_smp_tsd_key_t driver_list_lock_status_key; /*stop recursive locks wh
static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error on a
per thread basis (for BC interfaces) */
-Port* erts_port; /* The port table */
+ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */
erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */
erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */
-Uint erts_max_ports;
-Uint erts_port_tab_index_mask;
-
const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL;
+const Port erts_invalid_port = {{ERTS_INVALID_PORT}};
+
erts_driver_t vanilla_driver;
erts_driver_t spawn_driver;
erts_driver_t fd_driver;
@@ -89,36 +88,12 @@ static void driver_monitor_unlock_pdl(Port *p);
static ERTS_INLINE ErlIOQueue*
drvport2ioq(ErlDrvPort drvport)
{
- int ix = (int) drvport;
- erts_aint32_t state;
-
- if (ix < 0 || erts_max_ports <= ix)
- return NULL;
-
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
-
-#ifdef ERTS_ENABLE_LOCK_CHECK
-
- if (erts_get_scheduler_data()) {
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
- ERTS_LC_ASSERT(!erts_port[ix].port_data_lock
- || erts_lc_mtx_is_locked(
- &erts_port[ix].port_data_lock->mtx));
- }
- else {
- ERTS_LC_ASSERT((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
- || erts_port[ix].port_data_lock);
- ERTS_LC_ASSERT(!erts_port[ix].port_data_lock
- || erts_lc_mtx_is_locked(
- &erts_port[ix].port_data_lock->mtx));
- }
-
-#endif
-
+ Port *prt = erts_thr_drvport2port_raw(drvport);
+ erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return NULL;
else
- return &erts_port[ix].ioq;
+ return &prt->ioq;
}
static ERTS_INLINE int
@@ -196,27 +171,13 @@ typedef struct line_buf_context {
dtrace_port_str((PORT), port_str);
#endif
-/* The 'number' field in a port now has two parts: the lowest bits
- contain the index in the port table, and the higher bits are a counter
- which is incremented each time we look for a free port and start from
- the beginning of the table. erts_max_ports is the number of file descriptors,
- rounded up to a power of 2.
- To get the index from a port, use the macro 'internal_port_index';
- 'port_number' returns the whole number field.
-*/
-
-static erts_smp_spinlock_t get_free_port_lck;
-static Uint last_port_num;
-static Uint port_num_mask;
-erts_smp_atomic32_t erts_ports_snapshot; /* Identifies the _next_ snapshot (not the ongoing) */
-
-
static ERTS_INLINE void
kill_port(Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ erts_ptab_delete_element(&erts_port, &pp->common); /* Time of death */
erts_port_task_free_port(pp);
- ASSERT(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD);
+ /* In non-smp case the port structure may have been deallocated now */
}
#ifdef ERTS_SMP
@@ -227,149 +188,194 @@ erts_lc_is_port_locked(Port *prt)
{
if (!prt)
return 0;
+ ERTS_SMP_LC_ASSERT(prt->lock);
return erts_smp_lc_mtx_is_locked(prt->lock);
}
#endif
#endif /* #ifdef ERTS_SMP */
-static int
-get_free_port(void)
-{
- Uint num;
- Uint tries = erts_max_ports;
- Port* port;
-
- erts_smp_spin_lock(&get_free_port_lck);
- num = last_port_num + 1;
- for (;; ++num) {
- erts_aint32_t act;
-
- port = &erts_port[num & erts_port_tab_index_mask];
-
- act = erts_smp_atomic32_read_nob(&port->state);
-
- while (act & ERTS_PORT_SFLG_FREE) {
- erts_aint32_t exp = act;
- act = erts_smp_atomic32_cmpxchg_relb(&port->state,
- ERTS_PORT_SFLG_INITIALIZING,
- exp);
- if (act == exp) {
- last_port_num = num;
- erts_smp_spin_unlock(&get_free_port_lck);
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 0);
- erts_smp_atomic32_set_nob(&port->common.refc, 2); /* Port alive + lock */
- return num & port_num_mask;
- }
- }
+static void initq(Port* prt);
- if (--tries == 0) {
- erts_smp_spin_unlock(&get_free_port_lck);
- return -1;
- }
- }
+static void insert_port_struct(void *vprt, Eterm data)
+{
+ Port *prt = (Port *) vprt;
+ Eterm id = make_internal_port(data);
+#ifdef ERTS_SMP
+ ASSERT(prt->drv_ptr && prt->lock);
+ /*
+ * We are breaking lock order in the port specific locking
+ * case. This is, however, safe since the lock has not been
+ * published, yet.
+ */
+ if (!prt->drv_ptr->lock)
+ erts_mtx_init_locked_x(prt->lock, "port_lock", id);
+#endif
+ prt->common.id = id;
+ erts_atomic32_init_relb(&prt->state, ERTS_PORT_SFLG_INITIALIZING);
}
-/*
- * erts_test_next_port() is only used for testing.
- */
-Sint
-erts_test_next_port(int set, Uint next)
+static Port *create_port(char *name,
+ erts_driver_t *driver,
+ erts_mtx_t *driver_lock,
+ Eterm pid,
+ int *enop)
{
- Uint i, num;
- Sint res = -1;
+ Port *prt;
+ char *p;
+ size_t port_size, size;
+ erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
+#ifdef DEBUG
+ /* Make sure the debug flags survives until port is freed */
+ state |= ERTS_PORT_SFLG_PORT_DEBUG;
+#endif
- erts_smp_spin_lock(&get_free_port_lck);
- if (set) {
- last_port_num = (next - 1) & port_num_mask;
+#ifdef ERTS_SMP
+ if (!driver_lock) {
+ /* Align size for mutex following port struct */
+ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+ size += sizeof(erts_mtx_t);
}
- num = last_port_num + 1;
-
- for (i=0; i < erts_max_ports && res<0; ++i, ++num) {
-
- Port* port = &erts_port[num & erts_port_tab_index_mask];
+ else
+#endif
+ port_size = size = sizeof(Port);
- erts_aint32_t state = erts_smp_atomic32_read_nob(&port->state);
+ size += sys_strlen(name) + 1;
- if (state & ERTS_PORT_SFLG_FREE) {
- last_port_num = num - 1;
- res = num & port_num_mask;
- }
+ p = erts_alloc_fnf(ERTS_ALC_T_PORT, size);
+ if (!p) {
+ if (enop)
+ *enop = ENOMEM;
+ return NULL;
}
- erts_smp_spin_unlock(&get_free_port_lck);
- return res;
-}
-static void port_cleanup(Port *prt);
+ prt = (Port *) p;
+ p += port_size;
#ifdef ERTS_SMP
+ if (driver_lock) {
+ prt->lock = driver_lock;
+ erts_mtx_lock(driver_lock);
+ }
+ else {
+ prt->lock = (erts_mtx_t *) p;
+ p += sizeof(erts_mtx_t);
+ state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
+ }
+ erts_smp_atomic_set_nob(&prt->run_queue,
+ (erts_aint_t) erts_get_runq_current(NULL));
+ prt->xports = NULL;
+#else
+ erts_atomic32_init_nob(&prt->refc, 1);
+ prt->cleanup = 0;
+#endif
+
+ prt->name = p;
+ sys_strcpy(p, name);
+ prt->drv_ptr = driver;
+ ERTS_P_LINKS(prt) = NULL;
+ ERTS_P_MONITORS(prt) = NULL;
+ prt->linebuf = NULL;
+ prt->bp = NULL;
+ prt->suspended = NULL;
+ prt->data = am_undefined;
+ prt->port_data_lock = NULL;
+ prt->control_flags = 0;
+ prt->bytes_in = 0;
+ prt->bytes_out = 0;
+ prt->dist_entry = NULL;
+ prt->connected = pid;
+ prt->common.u.alive.reg = NULL;
+#ifdef ERTS_SMP
+ prt->common.u.alive.ptimer = NULL;
+#else
+ sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer));
+#endif
+ erts_port_task_handle_init(&prt->timeout_task);
+ erts_port_task_init_sched(&prt->sched);
+ prt->psd = NULL;
+ prt->drv_data = (SWord) 0;
-static void
-sched_port_cleanup(void *vprt)
-{
- Port *prt = (Port *) vprt;
- erts_smp_mtx_lock(prt->lock);
- port_cleanup(prt);
-}
+ /* Set default tracing */
+ erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt));
+
+ ASSERT(((char *) prt) == ((char *) &prt->common));
+ if (!erts_ptab_new_element(&erts_port,
+ &prt->common,
+ (void *) prt,
+ insert_port_struct)) {
+#ifdef ERTS_SMP
+ if (driver_lock)
+ erts_mtx_unlock(driver_lock);
#endif
+ if (enop)
+ *enop = 0;
+ return NULL;
+ }
+
+ ASSERT(prt == (Port *) (erts_ptab_pix2intptr_nob(
+ &erts_port,
+ internal_port_index(prt->common.id))));
+ initq(prt);
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ erts_atomic32_set_relb(&prt->state, state);
+ return prt;
+}
+
+#ifndef ERTS_SMP
void
erts_port_cleanup(Port *prt)
{
-#ifdef ERTS_SMP
- if (erts_smp_mtx_trylock(prt->lock) == EBUSY)
- erts_schedule_misc_op(sched_port_cleanup, (void *) prt);
- else
-#endif
- port_cleanup(prt);
+ if (prt->drv_ptr && prt->drv_ptr->handle)
+ erts_ddll_dereference_driver(prt->drv_ptr->handle);
+ prt->drv_ptr = NULL;
+ erts_port_dec_refc(prt);
}
+#endif
void
-port_cleanup(Port *prt)
+erts_port_free(Port *prt)
{
-#ifdef ERTS_SMP
- erts_smp_mtx_t *mtx;
-#endif
#if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK)
- erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state);
+ erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
#endif
- erts_driver_t *driver;
-
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- driver = prt->drv_ptr;
- prt->drv_ptr = NULL;
- ASSERT(driver);
-
- ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_FREE_SCHEDULED);
- ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
+ ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_FREE_SCHEDULED
+ | ERTS_PORT_SFLG_INITIALIZING));
+ ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE));
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) == 0);
-
#ifdef ERTS_SMP
- mtx = prt->lock;
- ASSERT(mtx);
-
- prt->lock = NULL;
-
- erts_smp_mtx_unlock(mtx);
+ ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0);
+#else
+ ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0);
#endif
- erts_smp_atomic32_set_relb(&prt->state, ERTS_PORT_SFLG_FREE);
-
#ifdef ERTS_SMP
- if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) {
- erts_smp_mtx_destroy(mtx);
- erts_free(ERTS_ALC_T_PORT_LOCK, mtx);
- }
-#endif
+ ASSERT(prt->lock);
+ if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
+ erts_mtx_destroy(prt->lock);
- if (driver->handle)
- erts_ddll_dereference_driver(driver->handle);
+ /*
+ * We cannot dereference a driver using driver
+ * locking until here in smp case. Otherwise,
+ * the driver lock may still be in use by others.
+ *
+ * In the non-smp case we cannot do it here since
+ * this function may be called by non-scheduler
+ * threads. This is done in erts_port_cleanup()
+ * in the non-smp case.
+ */
+ if (prt->drv_ptr->handle)
+ erts_ddll_dereference_driver(prt->drv_ptr->handle);
+#endif
+ erts_atomic32_set_nob(&prt->state, ERTS_PORT_SFLG_FREE);
+ erts_free(ERTS_ALC_T_PORT, prt);
}
-
/*
** Initialize v_start to point to the small fixed vector.
** Once (reallocated) we never reset the pointer to the small vector
@@ -416,73 +422,7 @@ static void stopq(Port* prt)
if (prt->port_data_lock) {
driver_pdl_unlock(prt->port_data_lock);
driver_pdl_dec_refc(prt->port_data_lock);
- prt->port_data_lock = NULL;
- }
-}
-
-
-
-static void
-setup_port(Port* prt, Eterm pid, erts_driver_t *driver,
- ErlDrvData drv_data, char *name, erts_aint32_t xstate)
-{
- ErtsRunQueue *runq = erts_get_runq_current(NULL);
- char *new_name, *old_name;
-#ifdef DEBUG
- /* Make sure the debug flags survives until port is freed */
- xstate |= ERTS_PORT_SFLG_PORT_DEBUG;
-#endif
- ASSERT(runq);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
-
- new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1);
- sys_strcpy(new_name, name);
- erts_smp_runq_lock(runq);
- prt->snapshot = erts_smp_atomic32_read_nob(&erts_ports_snapshot);
- old_name = prt->name;
- prt->name = new_name;
-#ifdef ERTS_SMP
- erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq);
-#endif
- ASSERT(!prt->drv_ptr);
- prt->drv_ptr = driver;
- erts_smp_atomic32_set_relb(&prt->state,
- ERTS_PORT_SFLG_CONNECTED | xstate);
- erts_smp_runq_unlock(runq);
-#ifdef ERTS_SMP
- ASSERT(!prt->xports);
-#endif
- if (old_name) {
- erts_free(ERTS_ALC_T_PORT_NAME, (void *) old_name);
}
-
- prt->control_flags = 0;
- prt->connected = pid;
- prt->drv_data = (SWord) drv_data;
- prt->bytes_in = 0;
- prt->bytes_out = 0;
- prt->dist_entry = NULL;
- prt->common.u.alive.reg = NULL;
-#ifdef ERTS_SMP
- prt->common.u.alive.ptimer = NULL;
-#else
- sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer));
-#endif
- erts_port_task_handle_init(&prt->timeout_task);
- prt->suspended = NULL;
- sys_strcpy(prt->name, name);
- ERTS_P_LINKS(prt) = NULL;
- ERTS_P_MONITORS(prt) = NULL;
- prt->linebuf = NULL;
- prt->bp = NULL;
- prt->data = am_undefined;
- /* Set default tracing */
- erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt));
-
- prt->psd = NULL;
-
- initq(prt);
}
void
@@ -493,7 +433,7 @@ erts_wake_process_later(Port *prt, Process *process)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (erts_smp_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD)
+ if (erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD)
return;
for (p = &(prt->suspended); *p != NULL; p = &((*p)->next))
@@ -513,36 +453,34 @@ erts_wake_process_later(Port *prt, Process *process)
(*error_number_ptr must contain either BADARG or SYSTEM_LIMIT).
The driver start function must obey the same conventions.
*/
-int
+Port *
erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
Eterm pid, /* Current process. */
char* name, /* Driver name. */
SysDriverOpts* opts, /* Options. */
- int *error_number_ptr) /* errno in case -2 is returned */
+ int *error_type_ptr, /* error type */
+ int *error_number_ptr) /* errno in case of error type -2 */
{
- int port_num;
- int port_ix;
+
+#undef ERTS_OPEN_DRIVER_RET
+#define ERTS_OPEN_DRIVER_RET(Prt, EType, ENo) \
+ do { \
+ if (error_type_ptr) \
+ *error_type_ptr = (EType); \
+ if (error_number_ptr) \
+ *error_number_ptr = (ENo); \
+ return (Prt); \
+ } while (0)
+
ErlDrvData drv_data = 0;
- erts_aint32_t xstate = 0;
Port *port;
int fpe_was_unmasked;
-
- if (error_number_ptr)
- *error_number_ptr = 0;
+ int error_type, error_number;
+ int port_errno = 0;
+ erts_mtx_t *driver_lock = NULL;
ERTS_SMP_CHK_NO_PROC_LOCKS;
- if ((port_num = get_free_port()) < 0) {
- if (error_number_ptr) {
- *error_number_ptr = SYSTEM_LIMIT;
- }
- return -3;
- }
-
- port_ix = port_num & erts_port_tab_index_mask;
- port = &erts_port[port_ix];
- port->common.id = make_internal_port(port_num);
-
erts_smp_mtx_lock(&erts_driver_list_lock);
if (!driver) {
for (driver = driver_list; driver; driver = driver->next) {
@@ -551,9 +489,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
if (!driver) {
erts_smp_mtx_unlock(&erts_driver_list_lock);
- if (error_number_ptr)
- *error_number_ptr = BADARG;
- return -3;
+ ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
}
}
if (driver == &spawn_driver) {
@@ -598,32 +534,11 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
if (driver == NULL || (driver != &spawn_driver && opts->exit_status)) {
erts_smp_mtx_unlock(&erts_driver_list_lock);
- if (error_number_ptr) {
- *error_number_ptr = BADARG;
- }
- /* Need to mark the port as free again */
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 2);
- erts_smp_atomic32_set_nob(&port->common.refc, 0);
- erts_smp_atomic32_set_relb(&port->state, ERTS_PORT_SFLG_FREE);
- return -3;
+ ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
}
- /*
- * We'll set up the port before calling the start function,
- * to allow message sending and setting timers in the start function.
- */
-
#ifdef ERTS_SMP
- ASSERT(!port->lock);
- port->lock = driver->lock;
- if (!port->lock) {
- port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK,
- sizeof(erts_smp_mtx_t));
- erts_smp_mtx_init_x(port->lock,
- "port_lock",
- port->common.id);
- xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
- }
+ driver_lock = driver->lock;
#endif
if (driver->handle != NULL) {
@@ -632,18 +547,32 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
erts_smp_mtx_unlock(&erts_driver_list_lock);
-#ifdef ERTS_SMP
- erts_smp_mtx_lock(port->lock);
-#endif
+ /*
+ * We'll set up the port before calling the start function,
+ * to allow message sending and setting timers in the start function.
+ */
- setup_port(port, pid, driver, drv_data, name, xstate);
+ port = create_port(name, driver, driver_lock, pid, &port_errno);
+ if (!port) {
+ if (driver->handle) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ erts_ddll_decrement_port_count(driver->handle);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ erts_ddll_dereference_driver(driver->handle);
+ }
+ if (port_errno)
+ ERTS_OPEN_DRIVER_RET(NULL, -2, port_errno);
+ else
+ ERTS_OPEN_DRIVER_RET(NULL, -3, SYSTEM_LIMIT);
+ }
if (IS_TRACED_FL(port, F_TRACE_PORTS)) {
trace_port_open(port,
pid,
am_atom_put(port->name, strlen(port->name)));
}
-
+
+ error_number = error_type = 0;
if (driver->start) {
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_in, am_start);
@@ -656,15 +585,28 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
#endif
fpe_was_unmasked = erts_block_fpe();
- drv_data = (*driver->start)((ErlDrvPort)(port_ix),
- name, opts);
+ drv_data = (*driver->start)((ErlDrvPort) port, name, opts);
+ if (((SWord) drv_data) == -1)
+ error_type = -1;
+ else if (((SWord) drv_data) == -2) {
+ /*
+ * We need to save errno quickly after the
+ * call to the 'start' callback before
+ * something else modify it.
+ */
+ error_type = -2;
+ error_number = errno;
+ }
+ else if (((SWord) drv_data) == -3) {
+ error_type = -3;
+ error_number = BADARG;
+ }
+
erts_unblock_fpe(fpe_was_unmasked);
port->caller = NIL;
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_out, am_start);
}
- if (error_number_ptr && ((SWord) drv_data) == (SWord) -2)
- *error_number_ptr = errno;
#ifdef ERTS_SMP
if (port->xports)
erts_smp_xports_unlock(port);
@@ -672,15 +614,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
#endif
}
- if (((SWord)drv_data) == -1 ||
- ((SWord)drv_data) == -2 ||
- ((SWord)drv_data) == -3) {
- int res = (int) ((SWord) drv_data);
-
- if (res == -3 && error_number_ptr) {
- *error_number_ptr = BADARG;
- }
-
+ if (error_type) {
/*
* Must clean up the port.
*/
@@ -690,7 +624,6 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
erts_cancel_timer(&(port->common.u.alive.tm));
#endif
stopq(port);
- kill_port(port);
if (port->linebuf != NULL) {
erts_free(ERTS_ALC_T_LINEBUF,
(void *) port->linebuf);
@@ -701,11 +634,14 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
erts_ddll_decrement_port_count(driver->handle);
erts_smp_mtx_unlock(&erts_driver_list_lock);
}
+ kill_port(port);
erts_port_release(port);
- return res;
+ ERTS_OPEN_DRIVER_RET(NULL, error_type, error_number);
}
- port->drv_data = (SWord) drv_data;
- return port_ix;
+ port->drv_data = (UWord) drv_data;
+ ERTS_OPEN_DRIVER_RET(port, 0, 0);
+
+#undef ERTS_OPEN_DRIVER_RET
}
#ifdef ERTS_SMP
@@ -734,15 +670,21 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
Port* port;
erts_driver_t *driver;
Process *rp;
- int port_num;
- Eterm port_id;
- erts_aint32_t xstate = 0;
+ erts_mtx_t *driver_lock = NULL;
ERTS_SMP_CHK_NO_PROC_LOCKS;
+ /* Need to be called from a scheduler thread */
+ if (!erts_get_scheduler_id())
+ return ERTS_INVALID_ERL_DRV_PORT;
+
creator_port = erts_drvport2port(creator_port_ix, NULL);
if (!creator_port)
- return (ErlDrvTermData) -1;
+ return ERTS_INVALID_ERL_DRV_PORT;
+
+ rp = erts_proc_lookup(pid);
+ if (!rp)
+ return ERTS_INVALID_ERL_DRV_PORT;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(creator_port));
@@ -750,55 +692,61 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
erts_smp_mtx_lock(&erts_driver_list_lock);
if (!erts_ddll_driver_ok(driver->handle)) {
erts_smp_mtx_unlock(&erts_driver_list_lock);
- return (ErlDrvTermData) -1;
+ return ERTS_INVALID_ERL_DRV_PORT;
}
- rp = erts_pid2proc(NULL, 0, pid, ERTS_PROC_LOCK_LINK);
- if (!rp) {
- erts_smp_mtx_unlock(&erts_driver_list_lock);
- return (ErlDrvTermData) -1; /* pid does not exist */
+ if (driver->handle != NULL) {
+ erts_ddll_increment_port_count(driver->handle);
+ erts_ddll_reference_referenced_driver(driver->handle);
+ }
+
+#ifdef ERTS_SMP
+ driver_lock = driver->lock;
+#endif
+
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+
+ port = create_port(name, driver, driver_lock, pid, NULL);
+ if (!port) {
+ if (driver->handle) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ erts_ddll_decrement_port_count(driver->handle);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ erts_ddll_dereference_driver(driver->handle);
+ }
+ return ERTS_INVALID_ERL_DRV_PORT;
}
- if ((port_num = get_free_port()) < 0) {
- errno = SYSTEM_LIMIT;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port));
+
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
+ if (ERTS_PROC_IS_EXITING(rp)) {
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- erts_smp_mtx_unlock(&erts_driver_list_lock);
- return (ErlDrvTermData) -1;
+ if (driver->handle) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ erts_ddll_decrement_port_count(driver->handle);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+ kill_port(port);
+ erts_port_release(port);
+ return ERTS_INVALID_ERL_DRV_PORT;
}
- port_id = make_internal_port(port_num);
- port = &erts_port[port_num & erts_port_tab_index_mask];
+ erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid);
+ erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port->common.id);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
#ifdef ERTS_SMP
- ASSERT(!port->lock);
- port->lock = driver->lock;
- if (!port->lock) {
+ if (!driver_lock) {
ErtsXPortsList *xplp = xports_list_alloc();
xplp->port = port;
xplp->next = creator_port->xports;
creator_port->xports = xplp;
- port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK,
- sizeof(erts_smp_mtx_t));
- erts_smp_mtx_init_locked_x(port->lock, "port_lock", port_id);
- xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
}
-
#endif
- if (driver->handle != NULL) {
- erts_ddll_increment_port_count(driver->handle);
- erts_ddll_reference_referenced_driver(driver->handle);
- }
- erts_smp_mtx_unlock(&erts_driver_list_lock);
+ port->drv_data = (UWord) drv_data;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port));
-
- setup_port(port, pid, driver, drv_data, name, xstate);
- port->common.id = port_id;
-
- erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid);
- erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port_id);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- return port_num & erts_port_tab_index_mask;
+ return (ErlDrvPort) port;
}
#ifdef ERTS_SMP
@@ -1282,15 +1230,22 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
}
}
+#ifdef ERTS_SMP
+static void
+release_port(void *vport)
+{
+ erts_port_dec_refc((Port *) vport);
+}
+#endif
+
/* initialize the port array */
void init_io(void)
{
- int i;
ErlDrvEntry** dp;
char maxports[21]; /* enough for any 64-bit integer */
size_t maxportssize = sizeof(maxports);
Uint ports_bits = ERTS_PORTS_BITS;
- Sint port_extra_shift;
+ int port_tab_size;
#ifdef ERTS_SMP
init_xports_list_alloc();
@@ -1299,66 +1254,39 @@ void init_io(void)
pdl_init();
if (erts_sys_getenv("ERL_MAX_PORTS", maxports, &maxportssize) == 0)
- erts_max_ports = atoi(maxports);
+ port_tab_size = atoi(maxports);
else
- erts_max_ports = sys_max_files();
+ port_tab_size = sys_max_files();
- if (erts_max_ports > ERTS_MAX_PORTS)
- erts_max_ports = ERTS_MAX_PORTS;
- if (erts_max_ports < 1024)
- erts_max_ports = 1024;
+ if (port_tab_size > ERTS_MAX_PORTS)
+ port_tab_size = ERTS_MAX_PORTS;
+ if (port_tab_size < 1024)
+ port_tab_size = 1024;
if (erts_use_r9_pids_ports) {
ports_bits = ERTS_R9_PORTS_BITS;
- if (erts_max_ports > ERTS_MAX_R9_PORTS)
- erts_max_ports = ERTS_MAX_R9_PORTS;
+ if (port_tab_size > ERTS_MAX_R9_PORTS)
+ port_tab_size = ERTS_MAX_R9_PORTS;
}
- port_extra_shift = erts_fit_in_bits_int32(erts_max_ports - 1);
- port_num_mask = (1 << ports_bits) - 1;
-
- erts_port_tab_index_mask = ~(~((Uint) 0) << port_extra_shift);
- erts_max_ports = 1 << port_extra_shift;
-
erts_smp_mtx_init(&erts_driver_list_lock,"driver_list");
driver_list = NULL;
erts_smp_tsd_key_create(&driver_list_lock_status_key);
erts_smp_tsd_key_create(&driver_list_last_error_key);
- if (erts_max_ports * sizeof(Port) <= erts_max_ports) {
- /* More memory needed than the whole address space. */
- erts_alloc_enomem(ERTS_ALC_T_PORT_TABLE, ~((Uint) 0));
- }
-
- erts_port = (Port *) erts_alloc(ERTS_ALC_T_PORT_TABLE,
- erts_max_ports * sizeof(Port));
-
- erts_smp_atomic_init_nob(&erts_bytes_out, 0);
- erts_smp_atomic_init_nob(&erts_bytes_in, 0);
-
- for (i = 0; i < erts_max_ports; i++) {
- erts_port_task_init_sched(&erts_port[i].sched);
- erts_smp_atomic32_init_nob(&erts_port[i].common.refc, 0);
+ erts_ptab_init_table(&erts_port,
+ ERTS_ALC_T_PORT_TABLE,
#ifdef ERTS_SMP
- erts_port[i].lock = NULL;
- erts_port[i].xports = NULL;
- erts_smp_spinlock_init_x(&erts_port[i].state_lck, "port_state", make_small(i));
+ release_port,
+#else
+ NULL,
#endif
- ERTS_TRACER_PROC(&erts_port[i]) = NIL;
- ERTS_TRACE_FLAGS(&erts_port[i]) = 0;
-
- erts_port[i].drv_ptr = NULL;
- erts_smp_atomic32_init_nob(&erts_port[i].state, ERTS_PORT_SFLG_FREE);
- erts_port[i].name = NULL;
- ERTS_P_LINKS(&erts_port[i]) = NULL;
- ERTS_P_MONITORS(&erts_port[i]) = NULL;
- erts_port[i].linebuf = NULL;
- erts_port[i].port_data_lock = NULL;
- }
+ (ErtsPTabElementCommon *) &erts_invalid_port.common,
+ port_tab_size,
+ "port_table");
- erts_smp_atomic32_init_nob(&erts_ports_snapshot, (erts_aint32_t) 0);
- last_port_num = 0;
- erts_smp_spinlock_init(&get_free_port_lck, "get_free_port");
+ erts_smp_atomic_init_nob(&erts_bytes_out, 0);
+ erts_smp_atomic_init_nob(&erts_bytes_in, 0);
sys_init_io();
@@ -1769,7 +1697,7 @@ deliver_vec_message(Port* prt, /* Port */
if (!rp)
return;
- state = erts_smp_atomic32_read_nob(&prt->state);
+ state = erts_atomic32_read_nob(&prt->state);
/*
* Calculate the exact number of heap words needed.
*/
@@ -1907,7 +1835,7 @@ static void flush_port(Port *p)
ASSERT(!p->xports);
#endif
}
- if ((erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0
+ if ((erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0
&& is_port_ioq_empty(p)) {
terminate_port(p);
}
@@ -1929,8 +1857,8 @@ terminate_port(Port *prt)
ASSERT(!ERTS_P_MONITORS(prt));
/* state may be altered by kill_port() below */
- state = erts_smp_atomic32_read_band_nob(&prt->state,
- ~ERTS_PORT_SFLG_SEND_CLOSED);
+ state = erts_atomic32_read_band_nob(&prt->state,
+ ~ERTS_PORT_SFLG_SEND_CLOSED);
if (state & ERTS_PORT_SFLG_SEND_CLOSED) {
send_closed_port_id = prt->common.id;
connected_id = prt->connected;
@@ -1981,6 +1909,8 @@ terminate_port(Port *prt)
if (prt->psd)
erts_free(ERTS_ALC_T_PRTSD, prt->psd);
+ ASSERT(prt->dist_entry == NULL);
+
kill_port(prt);
/*
@@ -1989,13 +1919,11 @@ terminate_port(Port *prt)
*/
if ((state & ERTS_PORT_SFLG_HALT)
&& (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
- erts_smp_port_unlock(prt); /* We will exit and never return */
+ erts_port_release(prt); /* We will exit and never return */
erl_exit_flush_async(erts_halt_code, "");
}
if (is_internal_port(send_closed_port_id))
deliver_result(send_closed_port_id, connected_id, am_closed);
-
- ASSERT(prt->dist_entry == NULL);
}
void
@@ -2130,7 +2058,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
}
#endif
- state = erts_smp_atomic32_read_nob(&p->state);
+ state = erts_atomic32_read_nob(&p->state);
if ((state & (ERTS_PORT_SFLGS_DEAD
| ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_IMMORTAL))
@@ -2149,12 +2077,12 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
* Setting the port to not busy here, frees the list of pending
* processes and makes them runnable.
*/
- set_busy_port((ErlDrvPort)internal_port_index(p->common.id), 0);
+ set_busy_port((ErlDrvPort) p, 0);
if (p->common.u.alive.reg != NULL)
(void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name);
- state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING);
+ state = erts_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING);
{
SweepContext sc = {p->common.id, rreason};
@@ -2174,16 +2102,16 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
erts_do_net_exits(p->dist_entry, rreason);
erts_deref_dist_entry(p->dist_entry);
p->dist_entry = NULL;
- erts_smp_atomic32_read_band_relb(&p->state,
- ~ERTS_PORT_SFLG_DISTRIBUTION);
+ erts_atomic32_read_band_relb(&p->state,
+ ~ERTS_PORT_SFLG_DISTRIBUTION);
}
if ((reason != am_kill) && !is_port_ioq_empty(p)) {
/* must turn exiting flag off */
- erts_smp_atomic32_read_bset_relb(&p->state,
- (ERTS_PORT_SFLG_EXITING
- | ERTS_PORT_SFLG_CLOSING),
- ERTS_PORT_SFLG_CLOSING);
+ erts_atomic32_read_bset_relb(&p->state,
+ (ERTS_PORT_SFLG_EXITING
+ | ERTS_PORT_SFLG_CLOSING),
+ ERTS_PORT_SFLG_CLOSING);
flush_port(p);
}
else {
@@ -2232,8 +2160,8 @@ void erts_port_command(Process *proc,
if ((pid = port->connected) == tp[1]) {
/* PID must be connected */
if (tp[2] == am_close) {
- erts_smp_atomic32_read_bor_relb(&port->state,
- ERTS_PORT_SFLG_SEND_CLOSED);
+ erts_atomic32_read_bor_relb(&port->state,
+ ERTS_PORT_SFLG_SEND_CLOSED);
erts_do_exit_port(port, pid, am_normal);
#ifdef USE_VM_PROBES
@@ -2456,16 +2384,15 @@ static void prt_one_lnk(ErtsLink *lnk, void *vprtd)
}
void
-print_port_info(int to, void *arg, int i)
+print_port_info(Port *p, int to, void *arg)
{
- Port* p = &erts_port[i];
- erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
+ erts_aint32_t state = erts_atomic32_read_nob(&p->state);
if (state & ERTS_PORT_SFLGS_DEAD)
return;
erts_print(to, arg, "=port:%T\n", p->common.id);
- erts_print(to, arg, "Slot: %d\n", i);
+ erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id));
if (state & ERTS_PORT_SFLG_CONNECTED) {
erts_print(to, arg, "Connected: %T", p->connected);
erts_print(to, arg, "\n");
@@ -2505,43 +2432,46 @@ print_port_info(int to, void *arg, int i)
void
set_busy_port(ErlDrvPort port_num, int on)
{
+ Port *prt;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(port_str, 16);
#endif
ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
+ prt = erts_drvport2port_raw(port_num);
+ if (!prt)
+ return;
if (on) {
- erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state,
- ERTS_PORT_SFLG_PORT_BUSY);
+ erts_atomic32_read_bor_relb(&prt->state,
+ ERTS_PORT_SFLG_PORT_BUSY);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_busy)) {
erts_snprintf(port_str, sizeof(port_str),
- "%T", erts_port[port_num].id);
+ "%T", prt->id);
DTRACE1(port_busy, port_str);
}
#endif
} else {
- ErtsProcList* plp = erts_port[port_num].suspended;
- erts_smp_atomic32_read_band_relb(&erts_port[port_num].state,
- ~ERTS_PORT_SFLG_PORT_BUSY);
- erts_port[port_num].suspended = NULL;
+ ErtsProcList* plp = prt->suspended;
+ erts_atomic32_read_band_relb(&prt->state,
+ ~ERTS_PORT_SFLG_PORT_BUSY);
+ prt->suspended = NULL;
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_not_busy)) {
erts_snprintf(port_str, sizeof(port_str),
- "%T", erts_port[port_num].id);
+ "%T", prt->id);
DTRACE1(port_not_busy, port_str);
}
#endif
- if (erts_port[port_num].dist_entry) {
+ if (prt->dist_entry) {
/*
* Processes suspended on distribution ports are
* normally queued on the dist entry.
*/
- erts_dist_port_not_busy(&erts_port[port_num]);
+ erts_dist_port_not_busy(prt);
}
/*
@@ -2587,10 +2517,9 @@ set_busy_port(ErlDrvPort port_num, int on)
void set_port_control_flags(ErlDrvPort port_num, int flags)
{
-
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
-
- erts_port[port_num].control_flags = flags;
+ Port *prt = erts_drvport2port_raw(port_num);
+ if (prt)
+ prt->control_flags = flags;
}
int get_port_flags(ErlDrvPort ix)
@@ -2649,7 +2578,7 @@ int async_ready(Port *p, void* data)
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (p) {
- erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
+ erts_aint32_t state = erts_atomic32_read_nob(&p->state);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
ASSERT(!(state & ERTS_PORT_SFLGS_DEAD));
if (p->drv_ptr->ready_async != NULL) {
@@ -2697,7 +2626,7 @@ erts_stale_drv_select(Eterm port,
int deselect)
{
char *type;
- ErlDrvPort drv_port = internal_port_index(port);
+ ErlDrvPort drv_port = (ErlDrvPort) erts_port_lookup_raw(port);
ErtsPortNames *pnp = erts_get_port_names(port);
erts_dsprintf_buf_t *dsbufp;
@@ -2737,16 +2666,16 @@ erts_stale_drv_select(Eterm port,
ErtsPortNames *
erts_get_port_names(Eterm id)
{
+ Port *prt = erts_port_lookup_raw(id);
ErtsPortNames *pnp;
ASSERT(is_nil(id) || is_internal_port(id));
-
- if (is_not_internal_port(id)) {
+
+ if (!prt) {
pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, sizeof(ErtsPortNames));
pnp->name = NULL;
pnp->driver_name = NULL;
}
else {
- Port* prt = &erts_port[internal_port_index(id)];
int do_realloc = 1;
int len = -1;
size_t pnp_len = sizeof(ErtsPortNames);
@@ -2762,17 +2691,10 @@ erts_get_port_names(Eterm id)
pnp_len = sizeof(ErtsPortNames) + len;
pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len);
}
- erts_smp_port_minor_lock(prt);
- if (id != prt->common.id) {
- len = nlen = 0;
- name = driver_name = NULL;
- }
- else {
- name = prt->name;
- len = nlen = name ? sys_strlen(name) + 1 : 0;
- driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL);
- len += driver_name ? sys_strlen(driver_name) + 1 : 0;
- }
+ name = prt->name;
+ len = nlen = name ? sys_strlen(name) + 1 : 0;
+ driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL);
+ len += driver_name ? sys_strlen(driver_name) + 1 : 0;
if (len <= pnp_len - sizeof(ErtsPortNames)) {
if (!name)
pnp->name = NULL;
@@ -2790,7 +2712,6 @@ erts_get_port_names(Eterm id)
}
do_realloc = 0;
}
- erts_smp_port_minor_unlock(prt);
} while (do_realloc);
}
return pnp;
@@ -2827,7 +2748,7 @@ ErlDrvTermData driver_mk_term_nil(void)
return driver_term_nil;
}
-void driver_report_exit(int ix, int status)
+void driver_report_exit(ErlDrvPort ix, int status)
{
Port* prt = erts_drvport2port(ix, NULL);
Eterm* hp;
@@ -2868,27 +2789,6 @@ void driver_report_exit(int ix, int status)
erts_smp_proc_dec_refc(rp);
}
-
-static ERTS_INLINE int
-deliver_term_check_port(ErlDrvPort drvport)
-{
- int res;
- int ix = (int) drvport;
- if (ix < 0 || erts_max_ports <= ix)
- res = -1; /* invalid */
- else {
- Port* prt = &erts_port[ix];
- erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state);
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
- res = 1; /* ok */
- else if (state & ERTS_PORT_SFLG_CLOSING)
- res = 0; /* closing */
- else
- res = -1; /* invalid (dead) */
- }
- return res;
-}
-
#define ERTS_B2T_STATES_DEF_STATES_SZ 5
#define ERTS_B2T_STATES_DEF_STATES_INC 100
@@ -2976,10 +2876,7 @@ cleanup_b2t_states(struct b2t_states__ *b2tsp)
*/
static int
-driver_deliver_term(ErlDrvPort port,
- Eterm to,
- ErlDrvTermData* data,
- int len)
+driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
{
#define ERTS_DDT_FAIL do { res = -1; goto done; } while (0)
Uint need = 0;
@@ -3181,11 +3078,8 @@ driver_deliver_term(ErlDrvPort port,
b2t.ix = 0;
/*
- * The term is OK. Go ahead and validate the port and process.
+ * The term is OK. Go ahead and validate the process.
*/
- res = deliver_term_check_port(port);
- if (res <= 0)
- goto done;
/*
* Increase refc on proc if done from a non-scheduler thread.
@@ -3449,25 +3343,29 @@ driver_deliver_term(ErlDrvPort port,
#undef ERTS_DDT_FAIL
}
-
int
-driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len)
+driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len)
{
- Port* prt = erts_drvport2port(ix, NULL);
-
+ erts_aint32_t state;
+ Port* prt = erts_drvport2port(drvport, &state);
+ if (!prt)
+ return -1; /* invalid (dead) */
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
- if (prt == NULL)
- return -1;
- return driver_deliver_term(ix, prt->connected, data, len);
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
+ return driver_deliver_term(prt->connected, data, len);
+ else if (state & ERTS_PORT_SFLG_CLOSING)
+ return 0; /* closing */
+ else
+ return -1; /* invalid (dead) */
}
int
driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len)
{
- return driver_deliver_term(ix, to, data, len);
+ /* driver_send_term() assume port is ok... */
+ return driver_deliver_term(to, data, len);
}
@@ -3817,6 +3715,7 @@ static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl)
{
ERTS_LC_ASSERT(driver_pdl_get_refc(pdl) == 0);
erts_mtx_destroy(&pdl->mtx);
+ erts_port_dec_refc(pdl->prt);
erts_free(ERTS_ALC_T_PORT_DATA_LOCK, pdl);
}
@@ -3861,6 +3760,8 @@ driver_pdl_create(ErlDrvPort dp)
sizeof(struct erl_drv_port_data_lock));
erts_mtx_init(&pdl->mtx, "port_data_lock");
pdl_init_refc(pdl);
+ erts_port_inc_refc(pp);
+ pdl->prt = pp;
pp->port_data_lock = pdl;
#ifdef HARDDEBUG
erts_fprintf(stderr, "driver_pdl_create(%T) -> 0x%08X\r\n",pp->common.id,(unsigned) pdl);
@@ -4317,7 +4218,7 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t)
if (prt == NULL)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
if (prt->drv_ptr->timeout == NULL)
return -1;
drv_cancel_timer(prt);
@@ -4424,7 +4325,7 @@ static int do_driver_monitor_process(Port *prt,
/*
* This can be called from a non scheduler thread iff a port_data_lock exists
*/
-int driver_monitor_process(ErlDrvPort port,
+int driver_monitor_process(ErlDrvPort drvport,
ErlDrvTermData process,
ErlDrvMonitor *monitor)
{
@@ -4434,15 +4335,12 @@ int driver_monitor_process(ErlDrvPort port,
#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
- int ix = (int) port;
- if (ix < 0 || erts_max_ports <= ix) {
- return -1;
- }
- prt = &erts_port[ix];
+
+ prt = erts_thr_drvport2port_raw(drvport);
DRV_MONITOR_LOCK_PDL(prt);
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
@@ -4509,7 +4407,7 @@ static int do_driver_demonitor_process(Port *prt, Eterm *buf,
return 0;
}
-int driver_demonitor_process(ErlDrvPort port,
+int driver_demonitor_process(ErlDrvPort drvport,
const ErlDrvMonitor *monitor)
{
Port *prt;
@@ -4518,15 +4416,12 @@ int driver_demonitor_process(ErlDrvPort port,
#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
- int ix = (int) port;
- if (ix < 0 || erts_max_ports <= ix) {
- return -1;
- }
- prt = &erts_port[ix];
+
+ prt = erts_thr_drvport2port_raw(drvport);
DRV_MONITOR_LOCK_PDL(prt);
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
@@ -4575,7 +4470,7 @@ static ErlDrvTermData do_driver_get_monitored_process(Port *prt, Eterm *buf,
}
-ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
+ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
const ErlDrvMonitor *monitor)
{
Port *prt;
@@ -4584,15 +4479,12 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
- int ix = (int) port;
- if (ix < 0 || erts_max_ports <= ix) {
- return driver_term_nil;
- }
- prt = &erts_port[ix];
+
+ prt = erts_thr_drvport2port_raw(drvport);
DRV_MONITOR_LOCK_PDL(prt);
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
return driver_term_nil;
@@ -4711,8 +4603,6 @@ int driver_exit(ErlDrvPort ix, int err)
if (prt == NULL)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK);
if (rp) {
rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id);
@@ -5007,7 +4897,7 @@ no_event_callback(ErlDrvData drv_data, ErlDrvEvent event, ErlDrvEventData event_
{
Port *prt = get_current_port();
report_missing_drv_callback(prt, "Event", "event()");
- driver_event((ErlDrvPort) internal_port_index(prt->common.id), event, NULL);
+ driver_event((ErlDrvPort) prt, event, NULL);
}
static void
@@ -5015,7 +4905,7 @@ no_ready_input_callback(ErlDrvData drv_data, ErlDrvEvent event)
{
Port *prt = get_current_port();
report_missing_drv_callback(prt, "Input", "ready_input()");
- driver_select((ErlDrvPort) internal_port_index(prt->common.id), event,
+ driver_select((ErlDrvPort) prt, event,
(ERL_DRV_READ | ERL_DRV_USE_NO_CALLBACK), 0);
}
@@ -5024,7 +4914,7 @@ no_ready_output_callback(ErlDrvData drv_data, ErlDrvEvent event)
{
Port *prt = get_current_port();
report_missing_drv_callback(prt, "Output", "ready_output()");
- driver_select((ErlDrvPort) internal_port_index(prt->common.id), event,
+ driver_select((ErlDrvPort) prt, event,
(ERL_DRV_WRITE | ERL_DRV_USE_NO_CALLBACK), 0);
}
@@ -5059,13 +4949,13 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
drv->lock = NULL;
else {
drv->lock = erts_alloc(ERTS_ALC_T_DRIVER_LOCK,
- sizeof(erts_smp_mtx_t));
- erts_smp_mtx_init_x(drv->lock,
- "driver_lock",
+ sizeof(erts_mtx_t));
+ erts_mtx_init_x(drv->lock,
+ "driver_lock",
#if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT)
- am_atom_put(drv->name, sys_strlen(drv->name))
+ am_atom_put(drv->name, sys_strlen(drv->name))
#else
- NIL
+ NIL
#endif
);
}