aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c2709
1 files changed, 904 insertions, 1805 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 77dbe92241..5325480901 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2016. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,6 +52,8 @@
#include "erl_bif_unique.h"
#include "erl_hl_timer.h"
#include "erl_time.h"
+#include "erl_io_queue.h"
+#include "erl_proc_sig_queue.h"
extern ErlDrvEntry fd_driver_entry;
extern ErlDrvEntry vanilla_driver_entry;
@@ -59,13 +61,13 @@ extern ErlDrvEntry spawn_driver_entry;
#ifndef __WIN32__
extern ErlDrvEntry forker_driver_entry;
#endif
-extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */
+extern ErtsStaticDriver driver_tab[]; /* table of static drivers, only used during initialization */
erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */
-erts_smp_rwmtx_t erts_driver_list_lock; /* Mutex for driver list */
-static erts_smp_tsd_key_t driver_list_lock_status_key; /*stop recursive locks when calling
+erts_rwmtx_t erts_driver_list_lock; /* Mutex for driver list */
+static erts_tsd_key_t driver_list_lock_status_key; /*stop recursive locks when calling
driver init */
-static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error on a
+static erts_tsd_key_t driver_list_last_error_key; /* Save last DDLL error on a
per thread basis (for BC interfaces) */
ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */
@@ -93,22 +95,16 @@ static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
static void terminate_port(Port *p);
static void pdl_init(void);
static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof);
-#ifdef ERTS_SMP
static void driver_monitor_lock_pdl(Port *p);
static void driver_monitor_unlock_pdl(Port *p);
#define DRV_MONITOR_LOOKUP_PORT_LOCK_PDL(Port) erts_thr_drvport2port((Port), 1)
#define DRV_MONITOR_LOCK_PDL(Port) driver_monitor_lock_pdl(Port)
#define DRV_MONITOR_UNLOCK_PDL(Port) driver_monitor_unlock_pdl(Port)
-#else
-#define DRV_MONITOR_LOOKUP_PORT_LOCK_PDL(Port) erts_thr_drvport2port((Port), 0)
-#define DRV_MONITOR_LOCK_PDL(Port) /* nothing */
-#define DRV_MONITOR_UNLOCK_PDL(Port) /* nothing */
-#endif
#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT)
#define SMALL_WRITE_VEC 16
-static ERTS_INLINE ErlIOQueue*
+static ERTS_INLINE ErlPortIOQueue*
drvport2ioq(ErlDrvPort drvport)
{
Port *prt = erts_thr_drvport2port(drvport, 0);
@@ -121,13 +117,13 @@ static ERTS_INLINE int
is_port_ioq_empty(Port *pp)
{
int res;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
if (!pp->port_data_lock)
- res = (pp->ioq.size == 0);
+ res = (erts_ioq_size(&pp->ioq) == 0);
else {
ErlDrvPDL pdl = pp->port_data_lock;
erts_mtx_lock(&pdl->mtx);
- res = (pp->ioq.size == 0);
+ res = (erts_ioq_size(&pp->ioq) == 0);
erts_mtx_unlock(&pdl->mtx);
}
return res;
@@ -142,14 +138,14 @@ erts_is_port_ioq_empty(Port *pp)
Uint
erts_port_ioq_size(Port *pp)
{
- int res;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ErlDrvSizeT res;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
if (!pp->port_data_lock)
- res = pp->ioq.size;
+ res = erts_ioq_size(&pp->ioq);
else {
ErlDrvPDL pdl = pp->port_data_lock;
erts_mtx_lock(&pdl->mtx);
- res = pp->ioq.size;
+ res = erts_ioq_size(&pp->ioq);
erts_mtx_unlock(&pdl->mtx);
}
return (Uint) res;
@@ -211,14 +207,13 @@ dtrace_drvport_str(ErlDrvPort drvport, char *port_buf)
static ERTS_INLINE void
kill_port(Port *pp)
{
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
ERTS_TRACER_CLEAR(&ERTS_TRACER(pp));
erts_ptab_delete_element(&erts_port, &pp->common); /* Time of death */
erts_port_task_free_port(pp);
/* In non-smp case the port structure may have been deallocated now */
}
-#ifdef ERTS_SMP
#ifdef ERTS_ENABLE_LOCK_CHECK
int
@@ -226,12 +221,11 @@ erts_lc_is_port_locked(Port *prt)
{
if (!prt)
return 0;
- ERTS_SMP_LC_ASSERT(prt->lock);
- return erts_smp_lc_mtx_is_locked(prt->lock);
+ ERTS_LC_ASSERT(prt->lock);
+ return erts_lc_mtx_is_locked(prt->lock);
}
#endif
-#endif /* #ifdef ERTS_SMP */
static void initq(Port* prt);
@@ -255,32 +249,21 @@ static ERTS_INLINE void port_init_instr(Port *prt
* Stuff that need to be initialized with the port id
* in the instrumented case, but not in the normal case.
*/
-#ifdef ERTS_SMP
ASSERT(prt->drv_ptr && prt->lock);
if (!prt->drv_ptr->lock) {
- char *lock_str = "port_lock";
- erts_mtx_init_locked_x(prt->lock, lock_str, id,
-#ifdef ERTS_ENABLE_LOCK_COUNT
- (erts_lcnt_rt_options & ERTS_LCNT_OPT_PORTLOCK)
-#else
- 0
-#endif
- );
+ erts_mtx_init_locked(prt->lock, "port_lock", id, ERTS_LOCK_FLAGS_CATEGORY_IO);
}
-#endif
erts_port_task_init_sched(&prt->sched, id);
}
#if !ERTS_PORT_INIT_INSTR_NEED_ID
static ERTS_INLINE void port_init_instr_abort(Port *prt)
{
-#ifdef ERTS_SMP
ASSERT(prt->drv_ptr && prt->lock);
if (!prt->drv_ptr->lock) {
erts_mtx_unlock(prt->lock);
erts_mtx_destroy(prt->lock);
}
-#endif
erts_port_task_fini_sched(&prt->sched);
}
#endif
@@ -316,15 +299,12 @@ static Port *create_port(char *name,
erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
erts_aint32_t x_pts_flgs = 0;
-#ifdef ERTS_SMP
- ErtsRunQueue *runq;
if (!driver_lock) {
/* Align size for mutex following port struct */
port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
size += sizeof(erts_mtx_t);
}
else
-#endif
port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
#ifdef DEBUG
@@ -358,7 +338,6 @@ static Port *create_port(char *name,
p += busy_port_queue_size;
}
-#ifdef ERTS_SMP
if (driver_lock) {
prt->lock = driver_lock;
erts_mtx_lock(driver_lock);
@@ -368,17 +347,18 @@ static Port *create_port(char *name,
p += sizeof(erts_mtx_t);
state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
}
- if (erts_get_scheduler_data())
- runq = erts_get_runq_current(NULL);
- else
- runq = ERTS_RUNQ_IX(0);
- erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq);
+
+ {
+ ErtsRunQueue *runq;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ if (esdp)
+ runq = erts_get_runq_current(esdp);
+ else
+ runq = ERTS_RUNQ_IX(0);
+ erts_init_runq_port(prt, runq);
+ }
prt->xports = NULL;
-#else
- erts_atomic32_init_nob(&prt->refc, 1);
- prt->cleanup = 0;
-#endif
erts_port_task_pre_init_sched(&prt->sched, busy_port_queue);
@@ -387,6 +367,7 @@ static Port *create_port(char *name,
prt->drv_ptr = driver;
ERTS_P_LINKS(prt) = NULL;
ERTS_P_MONITORS(prt) = NULL;
+ ERTS_P_LT_MONITORS(prt) = NULL;
prt->linebuf = NULL;
prt->suspended = NULL;
erts_init_port_data(prt);
@@ -394,12 +375,11 @@ static Port *create_port(char *name,
prt->control_flags = 0;
prt->bytes_in = 0;
prt->bytes_out = 0;
- prt->dist_entry = NULL;
ERTS_PORT_INIT_CONNECTED(prt, pid);
prt->common.u.alive.reg = NULL;
ERTS_PTMR_INIT(prt);
erts_port_task_handle_init(&prt->timeout_task);
- erts_smp_atomic_init_nob(&prt->psd, (erts_aint_t) NULL);
+ erts_atomic_init_nob(&prt->psd, (erts_aint_t) NULL);
prt->async_open_port = NULL;
prt->drv_data = (SWord) 0;
prt->os_pid = -1;
@@ -426,10 +406,8 @@ static Port *create_port(char *name,
#if !ERTS_PORT_INIT_INSTR_NEED_ID
port_init_instr_abort(prt);
#endif
-#ifdef ERTS_SMP
if (driver_lock)
erts_mtx_unlock(driver_lock);
-#endif
if (enop)
*enop = 0;
erts_free(ERTS_ALC_T_PORT, prt);
@@ -442,7 +420,7 @@ static Port *create_port(char *name,
initq(prt);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (erts_port_schedule_all_ops)
x_pts_flgs |= ERTS_PTS_FLG_FORCE_SCHED;
@@ -451,29 +429,17 @@ static Port *create_port(char *name,
x_pts_flgs |= ERTS_PTS_FLG_PARALLELISM;
if (x_pts_flgs)
- erts_smp_atomic32_read_bor_nob(&prt->sched.flags, x_pts_flgs);
+ erts_atomic32_read_bor_nob(&prt->sched.flags, x_pts_flgs);
erts_atomic32_set_relb(&prt->state, state);
return prt;
}
-#ifndef ERTS_SMP
-void
-erts_port_cleanup(Port *prt)
-{
- if (prt->drv_ptr && prt->drv_ptr->handle)
- erts_ddll_dereference_driver(prt->drv_ptr->handle);
- prt->drv_ptr = NULL;
- erts_port_dec_refc(prt);
-}
-#endif
void
erts_port_free(Port *prt)
{
-#if defined(ERTS_SMP) || defined(DEBUG) || defined(ERTS_ENABLE_LOCK_CHECK)
erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
-#endif
ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_INITIALIZING
| ERTS_PORT_SFLG_FREE));
ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
@@ -487,7 +453,6 @@ erts_port_free(Port *prt)
prt->async_open_port = NULL;
}
-#ifdef ERTS_SMP
ASSERT(prt->lock);
if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
erts_mtx_destroy(prt->lock);
@@ -504,7 +469,6 @@ erts_port_free(Port *prt)
*/
if (prt->drv_ptr->handle)
erts_ddll_dereference_driver(prt->drv_ptr->handle);
-#endif
erts_free(ERTS_ALC_T_PORT, prt);
}
@@ -515,41 +479,17 @@ erts_port_free(Port *prt)
*/
static void initq(Port* prt)
{
- ErlIOQueue* q = &prt->ioq;
-
ERTS_LC_ASSERT(!prt->port_data_lock);
-
- q->size = 0;
- q->v_head = q->v_tail = q->v_start = q->v_small;
- q->v_end = q->v_small + SMALL_IO_QUEUE;
- q->b_head = q->b_tail = q->b_start = q->b_small;
- q->b_end = q->b_small + SMALL_IO_QUEUE;
+ erts_ioq_init(&prt->ioq, ERTS_ALC_T_IOQ, 1);
}
static void stopq(Port* prt)
{
- ErlIOQueue* q;
- ErlDrvBinary** binp;
if (prt->port_data_lock)
driver_pdl_lock(prt->port_data_lock);
- q = &prt->ioq;
- binp = q->b_head;
-
- if (q->v_start != q->v_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
-
- while(binp < q->b_tail) {
- if (*binp != NULL)
- driver_free_binary(*binp);
- binp++;
- }
- if (q->b_start != q->b_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
- q->v_start = q->v_end = q->v_head = q->v_tail = NULL;
- q->b_start = q->b_end = q->b_head = q->b_tail = NULL;
- q->size = 0;
+ erts_ioq_clear(&prt->ioq);
if (prt->port_data_lock) {
driver_pdl_unlock(prt->port_data_lock);
@@ -563,7 +503,7 @@ erts_save_suspend_process_on_port(Port *prt, Process *process)
int saved;
erts_aint32_t flags;
erts_port_task_sched_lock(&prt->sched);
- flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ flags = erts_atomic32_read_nob(&prt->sched.flags);
saved = (flags & ERTS_PTS_FLGS_BUSY) && !(flags & ERTS_PTS_FLG_EXIT);
if (saved)
erts_proclist_store_last(&prt->suspended, erts_proclist_create(process));
@@ -607,16 +547,16 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
erts_mtx_t *driver_lock = NULL;
int cprt_flgs = 0;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
- erts_smp_rwmtx_rlock(&erts_driver_list_lock);
+ erts_rwmtx_rlock(&erts_driver_list_lock);
if (!driver) {
for (driver = driver_list; driver; driver = driver->next) {
if (sys_strcmp(driver->name, name) == 0)
break;
}
if (!driver) {
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
}
}
@@ -649,7 +589,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
*/
for (d = driver_list; d; d = d->next) {
- if (strcmp(d->name, name) == 0 &&
+ if (sys_strcmp(d->name, name) == 0 &&
erts_ddll_driver_ok(d->handle)) {
driver = d;
break;
@@ -661,19 +601,17 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
if (driver == NULL || (driver != &spawn_driver && opts->exit_status)) {
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
}
-#ifdef ERTS_SMP
driver_lock = driver->lock;
-#endif
if (driver->handle != NULL) {
erts_ddll_increment_port_count(driver->handle);
erts_ddll_reference_driver(driver->handle);
}
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
/*
* We'll set up the port before calling the start function,
@@ -686,9 +624,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
port = create_port(name, driver, driver_lock, cprt_flgs, pid, &port_errno);
if (!port) {
if (driver->handle) {
- erts_smp_rwmtx_rlock(&erts_driver_list_lock);
+ erts_rwmtx_rlock(&erts_driver_list_lock);
erts_ddll_decrement_port_count(driver->handle);
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
erts_ddll_dereference_driver(driver->handle);
}
if (port_errno)
@@ -701,7 +639,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
trace_port_open(port,
pid,
erts_atom_put((byte *) port->name,
- strlen(port->name),
+ sys_strlen(port->name),
ERTS_ATOM_ENC_LATIN1,
1));
}
@@ -756,11 +694,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_out, am_open);
}
-#ifdef ERTS_SMP
if (port->xports)
erts_port_handle_xports(port);
ASSERT(!port->xports);
-#endif
}
if (error_type) {
@@ -775,9 +711,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
port->linebuf = NULL;
}
if (driver->handle != NULL) {
- erts_smp_rwmtx_rlock(&erts_driver_list_lock);
+ erts_rwmtx_rlock(&erts_driver_list_lock);
erts_ddll_decrement_port_count(driver->handle);
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
}
kill_port(port);
erts_port_release(port);
@@ -789,7 +725,6 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
#undef ERTS_OPEN_DRIVER_RET
}
-#ifdef ERTS_SMP
struct ErtsXPortsList_ {
ErtsXPortsList *next;
@@ -798,7 +733,6 @@ struct ErtsXPortsList_ {
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(xports_list, ErtsXPortsList, 50, ERTS_ALC_T_XPORTS_LIST)
-#endif
/*
* Driver function to create new instances of a driver
@@ -815,10 +749,10 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
Port *creator_port;
Port* port;
erts_driver_t *driver;
- Process *rp;
erts_mtx_t *driver_lock = NULL;
+ ErtsLinkData *ldp;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
/* Need to be called from a scheduler thread */
if (!erts_get_scheduler_id())
@@ -828,17 +762,13 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
if (creator_port == ERTS_INVALID_ERL_DRV_PORT)
return ERTS_INVALID_ERL_DRV_PORT;
- rp = erts_proc_lookup(pid);
- if (!rp)
- return ERTS_INVALID_ERL_DRV_PORT;
-
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(creator_port));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(creator_port));
driver = creator_port->drv_ptr;
- erts_smp_rwmtx_rlock(&erts_driver_list_lock);
+ erts_rwmtx_rlock(&erts_driver_list_lock);
if (!erts_ddll_driver_ok(driver->handle)) {
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
- return ERTS_INVALID_ERL_DRV_PORT;
+ erts_rwmtx_runlock(&erts_driver_list_lock);
+ return ERTS_INVALID_ERL_DRV_PORT;
}
if (driver->handle != NULL) {
@@ -846,60 +776,57 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
erts_ddll_reference_referenced_driver(driver->handle);
}
-#ifdef ERTS_SMP
driver_lock = driver->lock;
-#endif
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
/* Inherit parallelism flag from parent */
if (ERTS_PTS_FLG_PARALLELISM &
- erts_smp_atomic32_read_nob(&creator_port->sched.flags))
+ erts_atomic32_read_nob(&creator_port->sched.flags))
cprt_flgs |= ERTS_CREATE_PORT_FLAG_PARALLELISM;
port = create_port(name, driver, driver_lock, cprt_flgs, pid, NULL);
if (!port) {
if (driver->handle) {
- erts_smp_rwmtx_rlock(&erts_driver_list_lock);
+ erts_rwmtx_rlock(&erts_driver_list_lock);
erts_ddll_decrement_port_count(driver->handle);
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
erts_ddll_dereference_driver(driver->handle);
}
return ERTS_INVALID_ERL_DRV_PORT;
}
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(port));
- erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
- if (ERTS_PROC_IS_EXITING(rp)) {
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ ldp = erts_link_create(ERTS_LNK_TYPE_PORT,
+ port->common.id, pid);
+ ASSERT(ldp->a.other.item == pid);
+ ASSERT(ldp->b.other.item == port->common.id);
+ erts_link_tree_insert(&ERTS_P_LINKS(port), &ldp->a);
+
+ if (!erts_proc_sig_send_link(NULL, pid, &ldp->b)) {
+ erts_link_tree_delete(&ERTS_P_LINKS(port), &ldp->a);
+ erts_link_release_both(ldp);
if (driver->handle) {
- erts_smp_rwmtx_rlock(&erts_driver_list_lock);
+ erts_rwmtx_rlock(&erts_driver_list_lock);
erts_ddll_decrement_port_count(driver->handle);
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
}
kill_port(port);
erts_port_release(port);
return ERTS_INVALID_ERL_DRV_PORT;
}
- erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid);
- erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port->common.id);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
-
-#ifdef ERTS_SMP
if (!driver_lock) {
ErtsXPortsList *xplp = xports_list_alloc();
xplp->port = port;
xplp->next = creator_port->xports;
creator_port->xports = xplp;
}
-#endif
port->drv_data = (UWord) drv_data;
return ERTS_Port2ErlDrvPort(port);
}
-#ifdef ERTS_SMP
int erts_port_handle_xports(Port *prt)
{
int reds = 0;
@@ -928,312 +855,6 @@ int erts_port_handle_xports(Port *prt)
prt->xports = NULL;
return reds;
}
-#endif
-
-/* Fills a possibly deep list of chars and binaries into vec
-** Small characters are first stored in the buffer buf of length ln
-** binaries found are copied and linked into msoh
-** Return vector length on succsess,
-** -1 on overflow
-** -2 on type error
-*/
-
-#ifdef DEBUG
-#define MAX_SYSIOVEC_IOVLEN (1ull << (32 - 1))
-#else
-#define MAX_SYSIOVEC_IOVLEN (1ull << (sizeof(((SysIOVec*)0)->iov_len) * 8 - 1))
-#endif
-
-static ERTS_INLINE void
-io_list_to_vec_set_vec(SysIOVec **iov, ErlDrvBinary ***binv,
- ErlDrvBinary *bin, byte *ptr, Uint len,
- int *vlen)
-{
- while (len > MAX_SYSIOVEC_IOVLEN) {
- (*iov)->iov_base = ptr;
- (*iov)->iov_len = MAX_SYSIOVEC_IOVLEN;
- ptr += MAX_SYSIOVEC_IOVLEN;
- len -= MAX_SYSIOVEC_IOVLEN;
- (*iov)++;
- (*vlen)++;
- *(*binv)++ = bin;
- }
- (*iov)->iov_base = ptr;
- (*iov)->iov_len = len;
- *(*binv)++ = bin;
- (*iov)++;
- (*vlen)++;
-}
-
-static int
-io_list_to_vec(Eterm obj, /* io-list */
- SysIOVec* iov, /* io vector */
- ErlDrvBinary** binv, /* binary reference vector */
- ErlDrvBinary* cbin, /* binary to store characters */
- ErlDrvSizeT bin_limit) /* small binaries limit */
-{
- DECLARE_ESTACK(s);
- Eterm* objp;
- byte *buf = (byte*)cbin->orig_bytes;
- Uint len = cbin->orig_size;
- Uint csize = 0;
- int vlen = 0;
- byte* cptr = buf;
-
- goto L_jump_start; /* avoid push */
-
- while (!ESTACK_ISEMPTY(s)) {
- obj = ESTACK_POP(s);
- L_jump_start:
- if (is_list(obj)) {
- L_iter_list:
- objp = list_val(obj);
- obj = CAR(objp);
- if (is_byte(obj)) {
- if (len == 0)
- goto L_overflow;
- *buf++ = unsigned_val(obj);
- csize++;
- len--;
- } else if (is_binary(obj)) {
- ESTACK_PUSH(s, CDR(objp));
- goto handle_binary;
- } else if (is_list(obj)) {
- ESTACK_PUSH(s, CDR(objp));
- goto L_iter_list; /* on head */
- } else if (!is_nil(obj)) {
- goto L_type_error;
- }
- obj = CDR(objp);
- if (is_list(obj))
- goto L_iter_list; /* on tail */
- else if (is_binary(obj)) {
- goto handle_binary;
- } else if (!is_nil(obj)) {
- goto L_type_error;
- }
- } else if (is_binary(obj)) {
- Eterm real_bin;
- Uint offset;
- Eterm* bptr;
- ErlDrvSizeT size;
- int bitoffs;
- int bitsize;
-
- handle_binary:
- size = binary_size(obj);
- ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
- ASSERT(bitsize == 0);
- bptr = binary_val(real_bin);
- if (*bptr == HEADER_PROC_BIN) {
- ProcBin* pb = (ProcBin *) bptr;
- if (bitoffs != 0) {
- if (len < size) {
- goto L_overflow;
- }
- erts_copy_bits(pb->bytes+offset, bitoffs, 1,
- (byte *) buf, 0, 1, size*8);
- csize += size;
- buf += size;
- len -= size;
- } else if (bin_limit && size < bin_limit) {
- if (len < size) {
- goto L_overflow;
- }
- sys_memcpy(buf, pb->bytes+offset, size);
- csize += size;
- buf += size;
- len -= size;
- } else {
- if (csize != 0) {
- io_list_to_vec_set_vec(&iov, &binv, cbin,
- cptr, csize, &vlen);
- cptr = buf;
- csize = 0;
- }
- if (pb->flags) {
- erts_emasculate_writable_binary(pb);
- }
- io_list_to_vec_set_vec(
- &iov, &binv, Binary2ErlDrvBinary(pb->val),
- pb->bytes+offset, size, &vlen);
- }
- } else {
- ErlHeapBin* hb = (ErlHeapBin *) bptr;
- if (len < size) {
- goto L_overflow;
- }
- copy_binary_to_buffer(buf, 0,
- ((byte *) hb->data)+offset, bitoffs,
- 8*size);
- csize += size;
- buf += size;
- len -= size;
- }
- } else if (!is_nil(obj)) {
- goto L_type_error;
- }
- }
-
- if (csize != 0) {
- io_list_to_vec_set_vec(&iov, &binv, cbin, cptr, csize, &vlen);
- }
-
- DESTROY_ESTACK(s);
- return vlen;
-
- L_type_error:
- DESTROY_ESTACK(s);
- return -2;
-
- L_overflow:
- DESTROY_ESTACK(s);
- return -1;
-}
-
-#define IO_LIST_VEC_COUNT(obj) \
-do { \
- Uint _size = binary_size(obj); \
- Eterm _real; \
- ERTS_DECLARE_DUMMY(Uint _offset); \
- int _bitoffs; \
- int _bitsize; \
- ERTS_GET_REAL_BIN(obj, _real, _offset, _bitoffs, _bitsize); \
- if (_bitsize != 0) goto L_type_error; \
- if (thing_subtag(*binary_val(_real)) == REFC_BINARY_SUBTAG && \
- _bitoffs == 0) { \
- b_size += _size; \
- if (b_size < _size) goto L_overflow_error; \
- in_clist = 0; \
- v_size++; \
- /* If iov_len is smaller then Uint we split the binary into*/ \
- /* multiple smaller (2GB) elements in the iolist.*/ \
- v_size += _size / MAX_SYSIOVEC_IOVLEN; \
- if (_size >= ERL_SMALL_IO_BIN_LIMIT) { \
- p_in_clist = 0; \
- p_v_size++; \
- } else { \
- p_c_size += _size; \
- if (!p_in_clist) { \
- p_in_clist = 1; \
- p_v_size++; \
- } \
- } \
- } else { \
- c_size += _size; \
- if (c_size < _size) goto L_overflow_error; \
- if (!in_clist) { \
- in_clist = 1; \
- v_size++; \
- } \
- p_c_size += _size; \
- if (!p_in_clist) { \
- p_in_clist = 1; \
- p_v_size++; \
- } \
- } \
-} while (0)
-
-
-/*
- * Returns 0 if successful and a non-zero value otherwise.
- *
- * Return values through pointers:
- * *vsize - SysIOVec size needed for a writev
- * *csize - Number of bytes not in binary (in the common binary)
- * *pvsize - SysIOVec size needed if packing small binaries
- * *pcsize - Number of bytes in the common binary if packing
- * *total_size - Total size of iolist in bytes
- */
-
-static int
-io_list_vec_len(Eterm obj, int* vsize, Uint* csize,
- Uint* pvsize, Uint* pcsize,
- ErlDrvSizeT* total_size)
-{
- DECLARE_ESTACK(s);
- Eterm* objp;
- Uint v_size = 0;
- Uint c_size = 0;
- Uint b_size = 0;
- Uint in_clist = 0;
- Uint p_v_size = 0;
- Uint p_c_size = 0;
- Uint p_in_clist = 0;
- Uint total;
-
- goto L_jump_start; /* avoid a push */
-
- while (!ESTACK_ISEMPTY(s)) {
- obj = ESTACK_POP(s);
- L_jump_start:
- if (is_list(obj)) {
- L_iter_list:
- objp = list_val(obj);
- obj = CAR(objp);
-
- if (is_byte(obj)) {
- c_size++;
- if (c_size == 0) {
- goto L_overflow_error;
- }
- if (!in_clist) {
- in_clist = 1;
- v_size++;
- }
- p_c_size++;
- if (!p_in_clist) {
- p_in_clist = 1;
- p_v_size++;
- }
- }
- else if (is_binary(obj)) {
- IO_LIST_VEC_COUNT(obj);
- }
- else if (is_list(obj)) {
- ESTACK_PUSH(s, CDR(objp));
- goto L_iter_list; /* on head */
- }
- else if (!is_nil(obj)) {
- goto L_type_error;
- }
-
- obj = CDR(objp);
- if (is_list(obj))
- goto L_iter_list; /* on tail */
- else if (is_binary(obj)) { /* binary tail is OK */
- IO_LIST_VEC_COUNT(obj);
- }
- else if (!is_nil(obj)) {
- goto L_type_error;
- }
- }
- else if (is_binary(obj)) {
- IO_LIST_VEC_COUNT(obj);
- }
- else if (!is_nil(obj)) {
- goto L_type_error;
- }
- }
-
- total = c_size + b_size;
- if (total < c_size) {
- goto L_overflow_error;
- }
- *total_size = (ErlDrvSizeT) total;
-
- DESTROY_ESTACK(s);
- *vsize = v_size;
- *csize = c_size;
- *pvsize = p_v_size;
- *pcsize = p_c_size;
- return 0;
-
- L_type_error:
- L_overflow_error:
- DESTROY_ESTACK(s);
- return 1;
-}
typedef enum {
ERTS_TRY_IMM_DRV_CALL_OK,
@@ -1280,12 +901,12 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp)
invalid_sched_flags |= ERTS_PTS_FLG_PARALLELISM;
if (sp->pre_chk_sched_flags) {
- sp->sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ sp->sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
if (sp->sched_flags & invalid_sched_flags)
return ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS;
}
- if (erts_smp_port_trylock(prt) == EBUSY)
+ if (erts_port_trylock(prt) == EBUSY)
return ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK;
invalid_state = sp->state;
@@ -1299,7 +920,7 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp)
if (prof_runnable_ports)
erts_port_task_sched_lock(&prt->sched);
- act = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ act = erts_atomic32_read_nob(&prt->sched.flags);
do {
erts_aint32_t new;
@@ -1311,7 +932,7 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp)
}
exp = act;
new = act | ERTS_PTS_FLG_EXEC_IMM;
- act = erts_smp_atomic32_cmpxchg_mb(&prt->sched.flags, new, exp);
+ act = erts_atomic32_cmpxchg_mb(&prt->sched.flags, new, exp);
} while (act != exp);
sp->sched_flags = act;
@@ -1333,14 +954,17 @@ try_imm_drv_call(ErtsTryImmDrvCallState *sp)
profile_runnable_proc(c_p, am_inactive);
reds_left_in = ERTS_BIF_REDS_LEFT(c_p);
- erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN);
+
+ ASSERT((c_p->scheduler_data)->current_port == NULL);
+ (c_p->scheduler_data)->current_port = prt;
}
ASSERT(0 <= reds_left_in && reds_left_in <= CONTEXT_REDS);
sp->reds_left_in = reds_left_in;
prt->reds = CONTEXT_REDS - reds_left_in;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prof_runnable_ports | IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) {
if (prof_runnable_ports && !(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
@@ -1378,9 +1002,9 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp)
if (prof_runnable_ports)
erts_port_task_sched_lock(&prt->sched);
- act = erts_smp_atomic32_read_band_mb(&prt->sched.flags,
+ act = erts_atomic32_read_band_mb(&prt->sched.flags,
~ERTS_PTS_FLG_EXEC_IMM);
- ERTS_SMP_LC_ASSERT(act & ERTS_PTS_FLG_EXEC_IMM);
+ ERTS_LC_ASSERT(act & ERTS_PTS_FLG_EXEC_IMM);
if (prof_runnable_ports | IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) {
if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS))
@@ -1395,7 +1019,10 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp)
erts_port_release(prt);
if (c_p) {
- erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);
+ ASSERT((c_p->scheduler_data)->current_port == prt);
+ (c_p->scheduler_data)->current_port = NULL;
+
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);
if (reds != (CONTEXT_REDS - sp->reds_left_in)) {
int bump_reds = reds - (CONTEXT_REDS - sp->reds_left_in);
@@ -1447,7 +1074,7 @@ finalize_force_imm_drv_call(ErtsTryImmDrvCallState *sp)
erts_unblock_fpe(sp->fpe_was_unmasked);
}
-#define ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE (REF_THING_SIZE + 3)
+#define ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE (ERTS_REF_THING_SIZE + 3)
static ERTS_INLINE void
queue_port_sched_op_reply(Process *rp,
@@ -1462,7 +1089,7 @@ queue_port_sched_op_reply(Process *rp,
ref= make_internal_ref(hp);
write_ref_thing(hp, ref_num[0], ref_num[1], ref_num[2]);
- hp += REF_THING_SIZE;
+ hp += ERTS_REF_THING_SIZE;
msg = TUPLE2(hp, ref, msg);
@@ -1506,12 +1133,12 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg, Port* prt)
prt);
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
}
-ErtsPortOpResult
+static ErtsPortOpResult
erts_schedule_proc2port_signal(Process *c_p,
Port *prt,
Eterm caller,
@@ -1524,7 +1151,7 @@ erts_schedule_proc2port_signal(Process *c_p,
int sched_res;
if (!refp) {
if (c_p)
- erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN);
}
else {
ASSERT(c_p);
@@ -1545,21 +1172,10 @@ erts_schedule_proc2port_signal(Process *c_p,
* otherwise, next receive will *not* work
* as expected!
*/
- erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
-
- if (ERTS_PROC_PENDING_EXIT(c_p)) {
- /* need to exit caller instead */
- erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
- KILL_CATCHES(c_p);
- c_p->freason = EXC_EXIT;
- return ERTS_PORT_OP_CALLER_EXIT;
- }
-
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
- c_p->msg.save = c_p->msg.last;
- erts_smp_proc_unlock(c_p, (ERTS_PROC_LOCKS_MSG_RECEIVE
- | ERTS_PROC_LOCK_MAIN));
+ ERTS_RECV_MARK_SAVE(c_p);
+ ERTS_RECV_MARK_SET(c_p);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN);
}
@@ -1574,7 +1190,7 @@ erts_schedule_proc2port_signal(Process *c_p,
task_flags);
if (c_p)
- erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);
if (sched_res != 0) {
if (refp) {
@@ -1585,9 +1201,9 @@ erts_schedule_proc2port_signal(Process *c_p,
* containing the reference created above...
*/
ASSERT(c_p);
- erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
+ erts_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
JOIN_MESSAGE(c_p);
- erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
*refp = NIL;
}
return ERTS_PORT_OP_DROPPED;
@@ -1617,29 +1233,12 @@ erts_schedule_port2port_signal(Eterm port_num, ErtsProc2PortSigData *sigdp,
static ERTS_INLINE void
send_badsig(Port *prt) {
- ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
- Process* rp;
Eterm connected = ERTS_PORT_GET_CONNECTED(prt);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
ERTS_LC_ASSERT(erts_get_scheduler_id());
-
ASSERT(is_internal_pid(connected));
-
- rp = erts_proc_lookup_raw(connected);
- if (rp) {
- erts_smp_proc_lock(rp, rp_locks);
- if (!ERTS_PROC_IS_EXITING(rp))
- (void) erts_send_exit_signal(NULL,
- prt->common.id,
- rp,
- &rp_locks,
- am_badsig,
- NIL,
- NULL,
- 0);
- if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
- } /* exit sent */
+ erts_proc_sig_send_exit(NULL, prt->common.id, connected,
+ am_badsig, NIL, 0);
} /* send_badsig */
static void
@@ -1761,7 +1360,7 @@ call_driver_outputv(int bang_op,
ErlDrvSizeT size = evp->size;
ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
@@ -1804,8 +1403,7 @@ cleanup_scheduled_outputv(ErlIOVec *ev, ErlDrvBinary *cbinp)
int i;
/* Need to free all binaries */
for (i = 1; i < ev->vsize; i++)
- if (ev->binv[i])
- driver_free_binary(ev->binv[i]);
+ driver_free_binary(ev->binv[i]);
if (cbinp)
driver_free_binary(cbinp);
}
@@ -1819,7 +1417,7 @@ port_sig_outputv(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *s
case ERTS_PROC2PORT_SIG_EXEC:
/* Execution of a scheduled outputv() call */
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
reply = am_badarg;
@@ -1875,7 +1473,7 @@ call_driver_output(int bang_op,
else {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
#ifdef USE_VM_PROBES
@@ -1926,7 +1524,7 @@ port_sig_output(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *si
case ERTS_PROC2PORT_SIG_EXEC:
/* Execution of a scheduled output() call */
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
reply = am_badarg;
@@ -1968,21 +1566,19 @@ int
erts_port_output_async(Port *prt, Eterm from, Eterm list)
{
- ErtsPortOpResult res;
ErtsProc2PortSigData *sigdp;
erts_driver_t *drv = prt->drv_ptr;
size_t size;
int task_flags;
ErtsProc2PortSigCallback port_sig_callback;
- ErlDrvBinary *cbin = NULL;
- ErlIOVec *evp = NULL;
+ ErtsIOQBinary *cbin = NULL;
+ ErtsIOVec *evp = NULL;
char *buf = NULL;
ErtsPortTaskHandle *ns_pthp;
if (drv->outputv) {
- ErlIOVec ev;
SysIOVec* ivp;
- ErlDrvBinary** bvp;
+ ErtsIOQBinary** bvp;
int vsize;
Uint csize;
Uint pvsize;
@@ -1992,91 +1588,63 @@ erts_port_output_async(Port *prt, Eterm from, Eterm list)
char *ptr;
int i;
- Eterm* bptr = NULL;
- Uint offset;
-
- if (is_binary(list)) {
- /* We optimize for when we get a procbin without offset */
- Eterm real_bin;
- int bitoffs;
- int bitsize;
- ERTS_GET_REAL_BIN(list, real_bin, offset, bitoffs, bitsize);
- bptr = binary_val(real_bin);
- if (*bptr == HEADER_PROC_BIN && bitoffs == 0) {
- size = binary_size(list);
- vsize = 1;
- } else
- bptr = NULL;
- }
-
- if (!bptr) {
- if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size))
- goto bad_value;
+ if (erts_ioq_iodata_vec_len(list, &vsize, &csize, &pvsize, &pcsize,
+ &size, ERL_SMALL_IO_BIN_LIMIT))
+ goto bad_value;
- /* To pack or not to pack (small binaries) ...? */
- if (vsize >= SMALL_WRITE_VEC) {
- /* Do pack */
- vsize = pvsize + 1;
- csize = pcsize;
- blimit = ERL_SMALL_IO_BIN_LIMIT;
- }
- cbin = driver_alloc_binary(csize);
+ /* To pack or not to pack (small binaries) ...? */
+ if (vsize >= SMALL_WRITE_VEC) {
+ /* Do pack */
+ vsize = pvsize + 1;
+ csize = pcsize;
+ blimit = ERL_SMALL_IO_BIN_LIMIT;
+ }
+ if (csize) {
+ cbin = (ErtsIOQBinary *)driver_alloc_binary(csize);
if (!cbin)
erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
}
-
iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec));
binv_offset = iov_offset;
binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec));
alloc_size = binv_offset;
- alloc_size += (vsize+1)*sizeof(ErlDrvBinary *);
+ alloc_size += (vsize+1)*sizeof(ErtsIOQBinary *);
sigdp = erts_port_task_alloc_p2p_sig_data_extra(alloc_size, (void**)&ptr);
- evp = (ErlIOVec *) ptr;
- ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
- bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
+ evp = (ErtsIOVec *) ptr;
+ ivp = evp->driver.iov = (SysIOVec *) (ptr + iov_offset);
+ bvp = evp->common.binv = (ErtsIOQBinary **) (ptr + binv_offset);
ivp[0].iov_base = NULL;
ivp[0].iov_len = 0;
bvp[0] = NULL;
- if (bptr) {
- ProcBin* pb = (ProcBin *) bptr;
-
- ivp[1].iov_base = pb->bytes+offset;
- ivp[1].iov_len = size;
- bvp[1] = Binary2ErlDrvBinary(pb->val);
-
- evp->vsize = 1;
- } else {
-
- evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
- if (evp->vsize < 0) {
- if (evp != &ev)
- erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp);
- driver_free_binary(cbin);
- goto bad_value;
- }
+ evp->driver.vsize = erts_ioq_iodata_to_vec(list, ivp+1, bvp+1, cbin,
+ blimit, 1);
+ if (evp->driver.vsize < 0) {
+ erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp);
+ driver_free_binary(&cbin->driver);
+ goto bad_value;
}
#if 0
/* This assertion may say something useful, but it can
be falsified during the emulator test suites. */
ASSERT(evp->vsize == vsize);
#endif
- evp->vsize++;
- evp->size = size; /* total size */
+ evp->driver.vsize++;
+ evp->driver.size = size; /* total size */
/* Need to increase refc on all binaries */
- for (i = 1; i < evp->vsize; i++)
+ for (i = 1; i < evp->driver.vsize; i++)
if (bvp[i])
- driver_binary_inc_refc(bvp[i]);
+ driver_binary_inc_refc(&bvp[i]->driver);
sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
- sigdp->u.outputv.evp = evp;
- sigdp->u.outputv.cbinp = cbin;
+ sigdp->u.outputv.evp = &evp->driver;
+ sigdp->u.outputv.cbinp = &cbin->driver;
port_sig_callback = port_sig_outputv;
} else {
ErlDrvSizeT ERTS_DECLARE_DUMMY(r);
@@ -2102,26 +1670,18 @@ erts_port_output_async(Port *prt, Eterm from, Eterm list)
sigdp->u.output.size = size;
port_sig_callback = port_sig_output;
}
- sigdp->flags = 0;
ns_pthp = NULL;
task_flags = 0;
- res = erts_schedule_proc2port_signal(NULL,
- prt,
- ERTS_INVALID_PID,
- NULL,
- sigdp,
- task_flags,
- ns_pthp,
- port_sig_callback);
+ erts_schedule_proc2port_signal(NULL,
+ prt,
+ ERTS_INVALID_PID,
+ NULL,
+ sigdp,
+ task_flags,
+ ns_pthp,
+ port_sig_callback);
- if (res != ERTS_PORT_OP_SCHEDULED) {
- if (drv->outputv)
- cleanup_scheduled_outputv(evp, cbin);
- else
- cleanup_scheduled_output(buf);
- return 1;
- }
return 1;
bad_value:
@@ -2155,8 +1715,8 @@ erts_port_output(Process *c_p,
erts_aint32_t sched_flags, busy_flgs, invalid_flags;
int task_flags;
ErtsProc2PortSigCallback port_sig_callback;
- ErlDrvBinary *cbin = NULL;
- ErlIOVec *evp = NULL;
+ ErtsIOQBinary *cbin = NULL;
+ ErtsIOVec *evp = NULL;
char *buf = NULL;
int force_immediate_call = (flags & ERTS_PORT_SIG_FLG_FORCE_IMM_CALL);
int async_nosuspend;
@@ -2179,7 +1739,7 @@ erts_port_output(Process *c_p,
* Assumes caller have checked that port is valid...
*/
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))
return ((sched_flags & ERTS_PTS_FLG_EXIT)
? ERTS_PORT_OP_DROPPED
@@ -2202,11 +1762,11 @@ erts_port_output(Process *c_p,
}
#endif
if (drv->outputv) {
- ErlIOVec ev;
+ ErtsIOVec ev;
SysIOVec iv[SMALL_WRITE_VEC];
- ErlDrvBinary* bv[SMALL_WRITE_VEC];
+ ErtsIOQBinary* bv[SMALL_WRITE_VEC];
SysIOVec* ivp;
- ErlDrvBinary** bvp;
+ ErtsIOQBinary** bvp;
int vsize;
Uint csize;
Uint pvsize;
@@ -2214,18 +1774,19 @@ erts_port_output(Process *c_p,
Uint blimit;
size_t iov_offset, binv_offset, alloc_size;
- if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size))
+ if (erts_ioq_iodata_vec_len(list, &vsize, &csize, &pvsize, &pcsize,
+ &size, ERL_SMALL_IO_BIN_LIMIT))
goto bad_value;
iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec));
binv_offset = iov_offset;
binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec));
alloc_size = binv_offset;
- alloc_size += (vsize+1)*sizeof(ErlDrvBinary *);
+ alloc_size += (vsize+1)*sizeof(ErtsIOQBinary *);
if (try_call && vsize < SMALL_WRITE_VEC) {
- ivp = ev.iov = iv;
- bvp = ev.binv = bv;
+ ivp = ev.common.iov = iv;
+ bvp = ev.common.binv = bv;
evp = &ev;
}
else {
@@ -2236,9 +1797,9 @@ erts_port_output(Process *c_p,
sigdp = erts_port_task_alloc_p2p_sig_data_extra(
alloc_size, (void**)&ptr);
}
- evp = (ErlIOVec *) ptr;
- ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
- bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
+ evp = (ErtsIOVec *) ptr;
+ ivp = evp->driver.iov = (SysIOVec *) (ptr + iov_offset);
+ bvp = evp->common.binv = (ErtsIOQBinary **) (ptr + binv_offset);
}
/* To pack or not to pack (small binaries) ...? */
@@ -2254,23 +1815,26 @@ erts_port_output(Process *c_p,
}
/* Use vsize and csize from now on */
- cbin = driver_alloc_binary(csize);
- if (!cbin)
- erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
+ if (csize) {
+ cbin = (ErtsIOQBinary *)driver_alloc_binary(csize);
+ if (!cbin)
+ erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
+ }
/* Element 0 is for driver usage to add header block */
ivp[0].iov_base = NULL;
ivp[0].iov_len = 0;
bvp[0] = NULL;
- evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
- if (evp->vsize < 0) {
+ evp->driver.vsize = erts_ioq_iodata_to_vec(list, ivp+1, bvp+1,
+ cbin, blimit, 1);
+ if (evp->driver.vsize < 0) {
if (evp != &ev) {
if (try_call)
erts_free(ERTS_ALC_T_TMP, evp);
else
erts_port_task_free_p2p_sig_data(sigdp);
}
- driver_free_binary(cbin);
+ driver_free_binary(&cbin->driver);
goto bad_value;
}
#if 0
@@ -2278,19 +1842,19 @@ erts_port_output(Process *c_p,
be falsified during the emulator test suites. */
ASSERT(evp->vsize == vsize);
#endif
- evp->vsize++;
- evp->size = size; /* total size */
+ evp->driver.vsize++;
+ evp->driver.size = size; /* total size */
if (!try_call) {
int i;
/* Need to increase refc on all binaries */
- for (i = 1; i < evp->vsize; i++)
- if (bvp[i])
- driver_binary_inc_refc(bvp[i]);
+ for (i = 1; i < evp->driver.vsize; i++)
+ if (bvp[i])
+ driver_binary_inc_refc(&bvp[i]->driver);
}
else {
int i;
- ErlIOVec *new_evp;
+ ErtsIOVec *new_evp;
ErtsTryImmDrvCallResult try_call_res;
ErtsTryImmDrvCallState try_call_state
= ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
@@ -2313,14 +1877,14 @@ erts_port_output(Process *c_p,
from,
prt,
drv,
- evp);
+ &evp->driver);
if (force_immediate_call)
finalize_force_imm_drv_call(&try_call_state);
else
finalize_imm_drv_call(&try_call_state);
/* Fall through... */
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
- driver_free_binary(cbin);
+ driver_free_binary(&cbin->driver);
if (evp != &ev) {
ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
@@ -2334,7 +1898,7 @@ erts_port_output(Process *c_p,
sched_flags = try_call_state.sched_flags;
if (async_nosuspend
&& (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) {
- driver_free_binary(cbin);
+ driver_free_binary(&cbin->driver);
if (evp != &ev) {
ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
@@ -2349,9 +1913,9 @@ erts_port_output(Process *c_p,
}
/* Need to increase refc on all binaries */
- for (i = 1; i < evp->vsize; i++)
+ for (i = 1; i < evp->driver.vsize; i++)
if (bvp[i])
- driver_binary_inc_refc(bvp[i]);
+ driver_binary_inc_refc(&bvp[i]->driver);
/* The port task and iovec is allocated in the
same structure as an optimization. This
@@ -2364,18 +1928,18 @@ erts_port_output(Process *c_p,
if (evp != &ev) {
/* Copy from TMP alloc to port task */
sys_memcpy((void *) new_evp, (void *) evp, alloc_size);
- new_evp->iov = (SysIOVec *) (((char *) new_evp)
- + iov_offset);
- bvp = new_evp->binv = (ErlDrvBinary **) (((char *) new_evp)
- + binv_offset);
+ new_evp->driver.iov = (SysIOVec *) (((char *) new_evp)
+ + iov_offset);
+ bvp = new_evp->common.binv = (ErtsIOQBinary **) (((char *) new_evp)
+ + binv_offset);
#ifdef DEBUG
- ASSERT(new_evp->vsize == evp->vsize);
- ASSERT(new_evp->size == evp->size);
- for (i = 0; i < evp->vsize; i++) {
- ASSERT(new_evp->iov[i].iov_len == evp->iov[i].iov_len);
- ASSERT(new_evp->iov[i].iov_base == evp->iov[i].iov_base);
- ASSERT(new_evp->binv[i] == evp->binv[i]);
+ ASSERT(new_evp->driver.vsize == evp->driver.vsize);
+ ASSERT(new_evp->driver.size == evp->driver.size);
+ for (i = 0; i < evp->driver.vsize; i++) {
+ ASSERT(new_evp->driver.iov[i].iov_len == evp->driver.iov[i].iov_len);
+ ASSERT(new_evp->driver.iov[i].iov_base == evp->driver.iov[i].iov_base);
+ ASSERT(new_evp->driver.binv[i] == evp->driver.binv[i]);
}
#endif
@@ -2384,24 +1948,24 @@ erts_port_output(Process *c_p,
else { /* from stack allocated structure; offsets may differ */
sys_memcpy((void *) new_evp, (void *) evp, sizeof(ErlIOVec));
- new_evp->iov = (SysIOVec *) (((char *) new_evp)
- + iov_offset);
- sys_memcpy((void *) new_evp->iov,
- (void *) evp->iov,
- evp->vsize * sizeof(SysIOVec));
- new_evp->binv = (ErlDrvBinary **) (((char *) new_evp)
- + binv_offset);
- sys_memcpy((void *) new_evp->binv,
- (void *) evp->binv,
- evp->vsize * sizeof(ErlDrvBinary *));
+ new_evp->driver.iov = (SysIOVec *) (((char *) new_evp)
+ + iov_offset);
+ sys_memcpy((void *) new_evp->driver.iov,
+ (void *) evp->driver.iov,
+ evp->driver.vsize * sizeof(SysIOVec));
+ new_evp->common.binv = (ErtsIOQBinary **) (((char *) new_evp)
+ + binv_offset);
+ sys_memcpy((void *) new_evp->common.binv,
+ (void *) evp->common.binv,
+ evp->driver.vsize * sizeof(ErtsIOQBinary *));
#ifdef DEBUG
- ASSERT(new_evp->vsize == evp->vsize);
- ASSERT(new_evp->size == evp->size);
- for (i = 0; i < evp->vsize; i++) {
- ASSERT(new_evp->iov[i].iov_len == evp->iov[i].iov_len);
- ASSERT(new_evp->iov[i].iov_base == evp->iov[i].iov_base);
- ASSERT(new_evp->binv[i] == evp->binv[i]);
+ ASSERT(new_evp->driver.vsize == evp->driver.vsize);
+ ASSERT(new_evp->driver.size == evp->driver.size);
+ for (i = 0; i < evp->driver.vsize; i++) {
+ ASSERT(new_evp->driver.iov[i].iov_len == evp->driver.iov[i].iov_len);
+ ASSERT(new_evp->driver.iov[i].iov_base == evp->driver.iov[i].iov_base);
+ ASSERT(new_evp->driver.binv[i] == evp->driver.binv[i]);
}
#endif
@@ -2412,8 +1976,8 @@ erts_port_output(Process *c_p,
sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
- sigdp->u.outputv.evp = evp;
- sigdp->u.outputv.cbinp = cbin;
+ sigdp->u.outputv.evp = &evp->driver;
+ sigdp->u.outputv.cbinp = &cbin->driver;
port_sig_callback = port_sig_outputv;
}
else {
@@ -2554,15 +2118,11 @@ erts_port_output(Process *c_p,
port_sig_callback);
if (res != ERTS_PORT_OP_SCHEDULED) {
- if (drv->outputv)
- cleanup_scheduled_outputv(evp, cbin);
- else
- cleanup_scheduled_output(buf);
return res;
}
if (!(flags & ERTS_PORT_SIG_FLG_FORCE)) {
- sched_flags = erts_smp_atomic32_read_acqb(&prt->sched.flags);
+ sched_flags = erts_atomic32_read_acqb(&prt->sched.flags);
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)) {
if (async_nosuspend)
erts_port_task_tmp_handle_detach(ns_pthp);
@@ -2611,11 +2171,11 @@ call_deliver_port_exit(int bang_op,
}
if (broken_link) {
- ErtsLink *lnk = erts_remove_link(&ERTS_P_LINKS(prt), from);
- if (lnk)
- erts_destroy_link(lnk);
- else
+ ErtsLink *lnk = erts_link_tree_lookup(ERTS_P_LINKS(prt), from);
+ if (!lnk)
return ERTS_PORT_OP_DROPPED;
+ erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk);
+ erts_link_release(lnk);
}
if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
@@ -2736,21 +2296,14 @@ erts_port_exit(Process *c_p,
&bp->off_heap);
}
- res = erts_schedule_proc2port_signal(c_p,
- prt,
- c_p ? c_p->common.id : from,
- refp,
- sigdp,
- 0,
- NULL,
- port_sig_exit);
-
- if (res == ERTS_PORT_OP_DROPPED) {
- if (bp)
- free_message_buffer(bp);
- }
-
- return res;
+ return erts_schedule_proc2port_signal(c_p,
+ prt,
+ c_p ? c_p->common.id : from,
+ refp,
+ sigdp,
+ 0,
+ NULL,
+ port_sig_exit);
}
static ErtsPortOpResult
@@ -2791,27 +2344,45 @@ set_port_connected(int bang_op,
#endif
}
else { /* Port BIF operation */
- Process *rp = erts_proc_lookup_raw(connect);
- if (!rp)
- return ERTS_PORT_OP_DROPPED;
- erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
- if (ERTS_PROC_IS_EXITING(rp)) {
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- return ERTS_PORT_OP_DROPPED;
- }
+ int created;
+ ErtsLink *lnk;
+
+ if (is_not_internal_pid(connect))
+ return ERTS_PORT_OP_DROPPED;
+
+ lnk = erts_link_tree_lookup_create(&ERTS_P_LINKS(prt), &created,
+ ERTS_LNK_TYPE_PORT, prt->common.id,
+ connect);
+ if (created) {
+ ErtsLinkData *ldp;
+ ErtsLink *olnk = erts_link_to_other(lnk, &ldp);
+ ASSERT(olnk->other.item == prt->common.id);
+ if (!erts_proc_sig_send_link(NULL, connect, olnk)) {
+ erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk);
+ erts_link_release_both(ldp);
+ return ERTS_PORT_OP_DROPPED;
+ }
+ if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_getting_linked, connect);
+ }
- erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, prt->common.id);
- erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, connect);
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(port_connect)) {
+ Eterm old_connected = ERTS_PORT_GET_CONNECTED(prt);
+ DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);
+ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
+ DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE);
- if (IS_TRACED_FL(rp, F_TRACE_PROCS))
- trace_proc(NULL, 0, rp, am_getting_linked, prt->common.id);
+ dtrace_pid_str(old_connected, process_str);
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
+ "%T", prt->common.id);
+ dtrace_pid_str(connect, newprocess_str);
+ DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str);
+ }
+#endif
ERTS_PORT_SET_CONNECTED(prt, connect);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
-
- if (IS_TRACED_FL(prt, F_TRACE_PORTS))
- trace_port(prt, am_getting_linked, connect);
if (IS_TRACED_FL(prt, F_TRACE_RECEIVE))
trace_port_receive(prt, from, am_connect, connect);
if (IS_TRACED_FL(prt, F_TRACE_SEND)) {
@@ -2819,18 +2390,6 @@ set_port_connected(int bang_op,
trace_port_send(prt, from, TUPLE2(hp, prt->common.id, am_connected), 1);
}
-#ifdef USE_VM_PROBES
- if (DTRACE_ENABLED(port_connect)) {
- DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);
- DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
- DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE);
-
- dtrace_pid_str(connect, process_str);
- erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id);
- dtrace_proc_str(rp, newprocess_str);
- DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str);
- }
-#endif
}
return ERTS_PORT_OP_DONE;
@@ -2918,13 +2477,24 @@ erts_port_connect(Process *c_p,
}
static void
-port_unlink(Port *prt, Eterm from)
+port_unlink(Port *prt, ErtsLink *lnk)
{
- ErtsLink *lnk = erts_remove_link(&ERTS_P_LINKS(prt), from);
- if (lnk) {
+ ErtsLinkData *ldp;
+ ErtsLink *dlnk, *llnk;
+
+ llnk = erts_link_to_other(lnk, &ldp);
+ dlnk = erts_link_tree_key_delete(&ERTS_P_LINKS(prt), llnk);
+ if (!dlnk)
+ erts_link_release(lnk);
+ else {
if (IS_TRACED_FL(prt, F_TRACE_PORTS))
- trace_port(prt, am_getting_unlinked, from);
- erts_destroy_link(lnk);
+ trace_port(prt, am_getting_unlinked, llnk->other.item);
+ if (dlnk == llnk)
+ erts_link_release_both(ldp);
+ else {
+ erts_link_release(lnk);
+ erts_link_release(dlnk);
+ }
}
}
@@ -2932,14 +2502,14 @@ static int
port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
{
if (op == ERTS_PROC2PORT_SIG_EXEC)
- port_unlink(prt, sigdp->u.unlink.from);
+ port_unlink(prt, sigdp->u.unlink.lnk);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
return ERTS_PORT_REDS_UNLINK;
}
ErtsPortOpResult
-erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp)
+erts_port_unlink(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp)
{
ErtsProc2PortSigData *sigdp;
ErtsTryImmDrvCallState try_call_state
@@ -2952,7 +2522,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp)
switch (try_imm_drv_call(&try_call_state)) {
case ERTS_TRY_IMM_DRV_CALL_OK:
- port_unlink(prt, from);
+ port_unlink(prt, lnk);
finalize_imm_drv_call(&try_call_state);
BUMP_REDS(c_p, ERTS_PORT_REDS_UNLINK);
return ERTS_PORT_OP_DONE;
@@ -2965,11 +2535,12 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp)
sigdp = erts_port_task_alloc_p2p_sig_data();
sigdp->flags = ERTS_P2P_SIG_TYPE_UNLINK;
- sigdp->u.unlink.from = from;
-
+ sigdp->u.unlink.port_id = prt->common.id;
+ sigdp->u.unlink.lnk = lnk;
+
return erts_schedule_proc2port_signal(c_p,
prt,
- c_p ? c_p->common.id : from,
+ c_p->common.id,
refp,
sigdp,
0,
@@ -2978,45 +2549,24 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp)
}
static void
-port_link_failure(Eterm port_id, Eterm linker)
+port_link_failure(Eterm port_id, ErtsLink *lnk)
{
- Process *rp;
- ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND;
- ASSERT(is_internal_pid(linker));
- rp = erts_pid2proc(NULL, 0, linker, rp_locks);
- if (rp) {
- ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), port_id);
- if (rlnk) {
- int xres = erts_send_exit_signal(NULL,
- port_id,
- rp,
- &rp_locks,
- am_noproc,
- NIL,
- NULL,
- 0);
- if (xres >= 0) {
- /* We didn't exit the process and it is traced */
- if (IS_TRACED_FL(rp, F_TRACE_PROCS))
- trace_proc(NULL, 0, rp, am_getting_unlinked, port_id);
- }
- if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
- }
- }
+ erts_proc_sig_send_link_exit(NULL, port_id, lnk, am_noproc, NIL);
}
static void
-port_link(Port *prt, erts_aint32_t state, Eterm to)
+port_link(Port *prt, erts_aint32_t state, ErtsLink *lnk)
{
- if (IS_TRACED_FL(prt, F_TRACE_PORTS))
- trace_port(prt, am_getting_linked, to);
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
- erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, to);
- } else {
- port_link_failure(prt->common.id, to);
- if (IS_TRACED_FL(prt, F_TRACE_PORTS))
- trace_port(prt, am_unlink, to);
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
+ port_link_failure(prt->common.id, lnk);
+ else {
+ ErtsLink *rlnk;
+ rlnk = erts_link_tree_insert_addr_replace(&ERTS_P_LINKS(prt),
+ lnk);
+ if (rlnk)
+ erts_link_release(rlnk);
+ else if (IS_TRACED_FL(prt, F_TRACE_PORTS))
+ trace_port(prt, am_getting_linked, lnk->other.item);
}
}
@@ -3024,17 +2574,16 @@ static int
port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
{
if (op == ERTS_PROC2PORT_SIG_EXEC)
- port_link(prt, state, sigdp->u.link.to);
- else {
- port_link_failure(sigdp->u.link.port, sigdp->u.link.to);
- }
+ port_link(prt, state, sigdp->u.link.lnk);
+ else
+ port_link_failure(sigdp->u.link.port_id, sigdp->u.link.lnk);
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
return ERTS_PORT_REDS_LINK;
}
ErtsPortOpResult
-erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
+erts_port_link(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp)
{
ErtsProc2PortSigData *sigdp;
ErtsTryImmDrvCallState try_call_state
@@ -3047,7 +2596,7 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
switch (try_imm_drv_call(&try_call_state)) {
case ERTS_TRY_IMM_DRV_CALL_OK:
- port_link(prt, try_call_state.state, to);
+ port_link(prt, try_call_state.state, lnk);
finalize_imm_drv_call(&try_call_state);
BUMP_REDS(c_p, ERTS_PORT_REDS_LINK);
return ERTS_PORT_OP_DONE;
@@ -3060,12 +2609,12 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
sigdp = erts_port_task_alloc_p2p_sig_data();
sigdp->flags = ERTS_P2P_SIG_TYPE_LINK;
- sigdp->u.link.port = prt->common.id;
- sigdp->u.link.to = to;
+ sigdp->u.link.port_id = prt->common.id;
+ sigdp->u.link.lnk = lnk;
return erts_schedule_proc2port_signal(c_p,
prt,
- c_p ? c_p->common.id : to,
+ c_p->common.id,
refp,
sigdp,
0,
@@ -3074,51 +2623,23 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
}
static void
-port_monitor_failure(Eterm port_id, Eterm origin, Eterm ref_DOWN)
+port_monitor_failure(Eterm port_id, ErtsMonitor *mon)
{
- Process *origin_p;
- ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
- ASSERT(is_internal_pid(origin));
-
- origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
- if (! origin_p) { return; }
-
- /* Send the DOWN message immediately. Ref is made on the fly because
- * caller has never seen it yet. */
- erts_queue_monitor_message(origin_p, &p_locks, ref_DOWN,
- am_port, port_id, am_noproc);
- erts_smp_proc_unlock(origin_p, p_locks);
+ erts_proc_sig_send_monitor_down(mon, am_noproc);
}
/* Origin wants to monitor port Prt. State contains possible error, which has
* happened just before. Name is either NIL or an atom, if user monitors
* a port by name. Ref is premade reference that will be returned to user */
static void
-port_monitor(Port *prt, erts_aint32_t state, Eterm origin,
- Eterm name, Eterm ref)
+port_monitor(Port *prt, erts_aint32_t state, ErtsMonitor *mon)
{
- Eterm name_or_nil = is_atom(name) ? name : NIL;
-
- ASSERT(is_pid(origin));
- ASSERT(is_atom(name) || is_port(name) || name == NIL);
- ASSERT(is_internal_ref(ref));
-
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
- ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
-
- Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
- if (! origin_p) {
- goto failure;
- }
- erts_add_monitor(&ERTS_P_MONITORS(origin_p), MON_ORIGIN, ref,
- prt->common.id, name_or_nil);
- erts_add_monitor(&ERTS_P_MONITORS(prt), MON_TARGET, ref,
- origin, name_or_nil);
-
- erts_smp_proc_unlock(origin_p, p_locks);
- } else {
-failure:
- port_monitor_failure(prt->common.id, origin, ref);
+ ASSERT(prt);
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
+ port_monitor_failure(prt->common.id, mon);
+ else {
+ ASSERT(erts_monitor_is_target(mon));
+ erts_monitor_list_insert(&ERTS_P_LT_MONITORS(prt), mon);
}
}
@@ -3126,24 +2647,11 @@ static int
port_sig_monitor(Port *prt, erts_aint32_t state, int op,
ErtsProc2PortSigData *sigdp)
{
- Eterm hp[REF_THING_SIZE];
- Eterm ref = make_internal_ref(&hp);
- write_ref_thing(hp, sigdp->ref[0], sigdp->ref[1], sigdp->ref[2]);
-
- if (op == ERTS_PROC2PORT_SIG_EXEC) {
- /* erts_add_monitor call inside port_monitor will copy ref from hp */
- port_monitor(prt, state,
- sigdp->u.monitor.origin,
- sigdp->u.monitor.name,
- ref);
- } else {
- port_monitor_failure(sigdp->u.monitor.name,
- sigdp->u.monitor.origin,
- ref);
- }
- if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
- }
+ if (op == ERTS_PROC2PORT_SIG_EXEC)
+ port_monitor(prt, state, sigdp->u.monitor.mon);
+ else
+ port_monitor_failure(sigdp->u.monitor.port_id,
+ sigdp->u.monitor.mon);
return ERTS_PORT_REDS_MONITOR;
}
@@ -3151,95 +2659,63 @@ port_sig_monitor(Port *prt, erts_aint32_t state, int op,
* a reference (ref may be rewritten to be used to serve additionally as a
* signal id). Name is atom if user monitors port by name or NIL */
ErtsPortOpResult
-erts_port_monitor(Process *origin, Port *port, Eterm name, Eterm *refp)
+erts_port_monitor(Process *c_p, Port *port, ErtsMonitor *mon)
{
ErtsProc2PortSigData *sigdp;
ErtsTryImmDrvCallState try_call_state
= ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
- origin, port, ERTS_PORT_SFLGS_INVALID_LOOKUP,
+ c_p,
+ port,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP,
0,
- 0, /* trap_ref is always set so !trap_ref always is false */
+ !0,
am_monitor);
- ASSERT(origin);
+ ASSERT(c_p);
ASSERT(port);
- ASSERT(is_atom(name) || is_port(name));
- ASSERT(refp);
+ ASSERT(mon);
switch (try_imm_drv_call(&try_call_state)) {
case ERTS_TRY_IMM_DRV_CALL_OK:
- port_monitor(port, try_call_state.state, origin->common.id, name, *refp);
+ port_monitor(port, try_call_state.state, mon);
finalize_imm_drv_call(&try_call_state);
- BUMP_REDS(origin, ERTS_PORT_REDS_MONITOR);
+ BUMP_REDS(c_p, ERTS_PORT_REDS_MONITOR);
return ERTS_PORT_OP_DONE;
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
- return ERTS_PORT_OP_BADARG;
+ return ERTS_PORT_OP_DROPPED;
default:
break; /* Schedule call instead... */
}
sigdp = erts_port_task_alloc_p2p_sig_data();
sigdp->flags = ERTS_P2P_SIG_TYPE_MONITOR;
- sigdp->u.monitor.origin = origin->common.id;
- sigdp->u.monitor.name = name; /* either named monitor, or port id */
+ sigdp->u.monitor.port_id = port->common.id;
+ sigdp->u.monitor.mon = mon;
/* Ref contents will be initialized here */
- return erts_schedule_proc2port_signal(origin, port, origin->common.id,
- refp, sigdp, 0, NULL,
+ return erts_schedule_proc2port_signal(c_p,
+ port,
+ c_p->common.id,
+ NULL,
+ sigdp,
+ 0,
+ NULL,
port_sig_monitor);
}
-static void
-port_demonitor_failure(Eterm port_id, Eterm origin, Eterm ref)
-{
- Process *origin_p;
- ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
- ErtsMonitor *mon1;
- ASSERT(is_internal_pid(origin));
-
- origin_p = erts_pid2proc(NULL, 0, origin, rp_locks);
- if (! origin_p) { return; }
-
- /* do not send any DOWN messages, drop monitors on process */
- mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p), ref);
- if (mon1 != NULL) {
- erts_destroy_monitor(mon1);
- }
-
- erts_smp_proc_unlock(origin_p, rp_locks);
-}
-
/* Origin wants to demonitor port Prt. State contains possible error, which has
* happened just before. Ref is reference to monitor */
static void
-port_demonitor(Port *port, erts_aint32_t state, Eterm origin, Eterm ref)
+port_demonitor(Port *port, erts_aint32_t state, ErtsMonitor *mon)
{
- ASSERT(port);
- ASSERT(is_pid(origin));
- ASSERT(is_internal_ref(ref));
-
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) {
- ErtsProcLocks p_locks = ERTS_PROC_LOCK_LINK;
- Process *origin_p = erts_pid2proc(NULL, 0, origin, p_locks);
- if (origin_p) {
- ErtsMonitor *mon1 = erts_remove_monitor(&ERTS_P_MONITORS(origin_p),
- ref);
- if (mon1 != NULL) {
- erts_destroy_monitor(mon1);
- }
- }
- if (1) {
- ErtsMonitor *mon2 = erts_remove_monitor(&ERTS_P_MONITORS(port),
- ref);
- if (mon2 != NULL) {
- erts_destroy_monitor(mon2);
- }
- }
- if (origin_p) { /* when origin is dying, it won't be found */
- erts_smp_proc_unlock(origin_p, p_locks);
- }
- } else {
- port_demonitor_failure(port->common.id, origin, ref);
+ ErtsMonitorData *mdp = erts_monitor_to_data(mon);
+ ASSERT(port && mon);
+ ASSERT(erts_monitor_is_origin(mon));
+ if (!erts_monitor_is_in_table(&mdp->target))
+ erts_monitor_release(mon);
+ else {
+ erts_monitor_list_delete(&ERTS_P_LT_MONITORS(port), &mdp->target);
+ erts_monitor_release_both(mdp);
}
}
@@ -3247,73 +2723,47 @@ static int
port_sig_demonitor(Port *prt, erts_aint32_t state, int op,
ErtsProc2PortSigData *sigdp)
{
- Eterm hp[REF_THING_SIZE];
- Eterm ref = make_internal_ref(&hp);
- write_ref_thing(hp, sigdp->u.demonitor.ref[0],
- sigdp->u.demonitor.ref[1],
- sigdp->u.demonitor.ref[2]);
- if (op == ERTS_PROC2PORT_SIG_EXEC) {
- port_demonitor(prt, state, sigdp->u.demonitor.origin, ref);
- } else {
- port_demonitor_failure(sigdp->u.demonitor.name,
- sigdp->u.demonitor.origin,
- ref);
- }
- if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
- }
+ if (op == ERTS_PROC2PORT_SIG_EXEC)
+ port_demonitor(prt, state, sigdp->u.demonitor.mon);
+ else
+ erts_monitor_release(sigdp->u.demonitor.mon);
return ERTS_PORT_REDS_DEMONITOR;
}
-/* Removes monitor between origin and target, identified by ref.
- * Mode defines normal or relaxed demonitor rules (process is at death) */
-ErtsPortOpResult erts_port_demonitor(Process *origin, ErtsDemonitorMode mode,
- Port *target, Eterm ref,
- Eterm *trap_ref)
+ErtsPortOpResult
+erts_port_demonitor(Process *c_p, Port *prt, ErtsMonitor *mon)
{
- Process *c_p = mode == ERTS_PORT_DEMONITOR_NORMAL ? origin : NULL;
ErtsProc2PortSigData *sigdp;
ErtsTryImmDrvCallState try_call_state
= ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
c_p,
- target, ERTS_PORT_SFLGS_INVALID_LOOKUP,
+ prt, ERTS_PORT_SFLGS_INVALID_LOOKUP,
0,
- !trap_ref,
+ !0,
am_demonitor);
- ASSERT(origin);
- ASSERT(target);
- ASSERT(is_internal_ref(ref));
+ ASSERT(c_p && prt && mon);
switch (try_imm_drv_call(&try_call_state)) {
case ERTS_TRY_IMM_DRV_CALL_OK:
- port_demonitor(target, try_call_state.state, origin->common.id, ref);
+ port_demonitor(prt, try_call_state.state, mon);
finalize_imm_drv_call(&try_call_state);
- if (mode == ERTS_PORT_DEMONITOR_NORMAL) {
- BUMP_REDS(origin, ERTS_PORT_REDS_DEMONITOR);
- }
+ BUMP_REDS(c_p, ERTS_PORT_REDS_DEMONITOR);
return ERTS_PORT_OP_DONE;
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
- return ERTS_PORT_OP_BADARG;
+ return ERTS_PORT_OP_DROPPED;
default:
break; /* Schedule call instead... */
}
sigdp = erts_port_task_alloc_p2p_sig_data();
sigdp->flags = ERTS_P2P_SIG_TYPE_DEMONITOR;
- sigdp->u.demonitor.origin = origin->common.id;
- sigdp->u.demonitor.name = target->common.id;
- {
- RefThing *reft = ref_thing_ptr(ref);
- /* Start from 1 skip ref arity */
- sys_memcpy(sigdp->u.demonitor.ref,
- internal_thing_ref_numbers(reft),
- sizeof(sigdp->u.demonitor.ref));
- }
+ sigdp->u.demonitor.port_id = prt->common.id;
+ sigdp->u.demonitor.mon = mon;
/* Ref contents will be initialized here */
- return erts_schedule_proc2port_signal(c_p, target, origin->common.id,
- trap_ref, sigdp, 0, NULL,
+ return erts_schedule_proc2port_signal(c_p, prt, c_p->common.id,
+ NULL, sigdp, 0, NULL,
port_sig_demonitor);
}
@@ -3322,11 +2772,13 @@ init_ack_send_reply(Port *port, Eterm resp)
{
if (!is_internal_port(resp)) {
- Process *rp = erts_proc_lookup_raw(port->async_open_port->to);
- erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
- erts_remove_link(&ERTS_P_LINKS(port), port->async_open_port->to);
- erts_remove_link(&ERTS_P_LINKS(rp), port->common.id);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ Eterm proc = port->async_open_port->to;
+ ErtsLink *lnk = erts_link_tree_lookup(ERTS_P_LINKS(port),
+ proc);
+ if (lnk) {
+ erts_link_tree_delete(&ERTS_P_LINKS(port), lnk);
+ erts_proc_sig_send_unlink(NULL, lnk);
+ }
}
port_sched_op_reply(port->async_open_port->to,
port->async_open_port->ref,
@@ -3353,7 +2805,7 @@ erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) {
break;
case -2: {
char *str = erl_errno_id(errno);
- resp = erts_atom_put((byte *) str, strlen(str),
+ resp = erts_atom_put((byte *) str, sys_strlen(str),
ERTS_ATOM_ENC_LATIN1, 1);
break;
}
@@ -3388,11 +2840,11 @@ void erts_init_io(int port_tab_size,
int port_tab_size_ignore_files,
int legacy_port_tab)
{
- ErlDrvEntry** dp;
+ ErtsStaticDriver* dp;
UWord common_element_size;
- erts_smp_rwmtx_opt_t drv_list_rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER;
- drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ;
- drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED;
+ erts_rwmtx_opt_t drv_list_rwmtx_opts = ERTS_RWMTX_OPT_DEFAULT_INITER;
+ drv_list_rwmtx_opts.type = ERTS_RWMTX_TYPE_EXTREMELY_FREQUENT_READ;
+ drv_list_rwmtx_opts.lived = ERTS_RWMTX_LONG_LIVED;
erts_atomic64_init_nob(&bytes_in, 0);
erts_atomic64_init_nob(&bytes_out, 0);
@@ -3400,11 +2852,9 @@ void erts_init_io(int port_tab_size,
common_element_size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
common_element_size += ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ));
common_element_size += 10; /* name */
-#ifdef ERTS_SMP
common_element_size += sizeof(erts_mtx_t);
init_xports_list_alloc();
-#endif
pdl_init();
@@ -3419,13 +2869,12 @@ void erts_init_io(int port_tab_size,
else if (port_tab_size < ERTS_MIN_PORTS)
port_tab_size = ERTS_MIN_PORTS;
- erts_smp_rwmtx_init_opt(&erts_driver_list_lock,
- &drv_list_rwmtx_opts,
- "driver_list");
+ erts_rwmtx_init_opt(&erts_driver_list_lock, &drv_list_rwmtx_opts, "driver_list", NIL,
+ ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO);
driver_list = NULL;
- erts_smp_tsd_key_create(&driver_list_lock_status_key,
+ erts_tsd_key_create(&driver_list_lock_status_key,
"erts_driver_list_lock_status_key");
- erts_smp_tsd_key_create(&driver_list_last_error_key,
+ erts_tsd_key_create(&driver_list_last_error_key,
"erts_driver_list_last_error_key");
erts_ptab_init_table(&erts_port,
@@ -3433,15 +2882,15 @@ void erts_init_io(int port_tab_size,
NULL,
(ErtsPTabElementCommon *) &erts_invalid_port.common,
port_tab_size,
- common_element_size, /* Doesn't need to be excact */
+ common_element_size, /* Doesn't need to be exact */
"port_table",
legacy_port_tab,
1);
sys_init_io();
- erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1);
- erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
+ erts_tsd_set(driver_list_lock_status_key, (void *) 1);
+ erts_rwmtx_rwlock(&erts_driver_list_lock);
init_driver(&fd_driver, &fd_driver_entry, NULL);
init_driver(&vanilla_driver, &vanilla_driver_entry, NULL);
@@ -3450,76 +2899,101 @@ void erts_init_io(int port_tab_size,
init_driver(&forker_driver, &forker_driver_entry, NULL);
#endif
erts_init_static_drivers();
- for (dp = driver_tab; *dp != NULL; dp++)
- erts_add_driver_entry(*dp, NULL, 1);
+ for (dp = driver_tab; dp->de != NULL; dp++)
+ erts_add_driver_entry(dp->de, NULL, 1, dp->taint);
- erts_smp_tsd_set(driver_list_lock_status_key, NULL);
- erts_smp_rwmtx_rwunlock(&erts_driver_list_lock);
+ erts_tsd_set(driver_list_lock_status_key, NULL);
+ erts_rwmtx_rwunlock(&erts_driver_list_lock);
}
-#if defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP)
-static ERTS_INLINE void lcnt_enable_drv_lock_count(erts_driver_t *dp, int enable)
+#if defined(ERTS_ENABLE_LOCK_COUNT)
+static void lcnt_enable_driver_lock_count(erts_driver_t *dp, int enable)
{
if (dp->lock) {
- if (enable)
- erts_lcnt_init_lock_x(&dp->lock->lcnt,
- "driver_lock",
- ERTS_LCNT_LT_MUTEX,
- erts_atom_put((byte*)dp->name,
- sys_strlen(dp->name),
- ERTS_ATOM_ENC_LATIN1,
- 1));
-
- else
- erts_lcnt_destroy_lock(&dp->lock->lcnt);
-
+ if (enable) {
+ erts_lcnt_install_new_lock_info(&dp->lock->lcnt, "driver_lock",
+ dp->name_atom, ERTS_LOCK_TYPE_MUTEX | ERTS_LOCK_FLAGS_CATEGORY_IO);
+ } else {
+ erts_lcnt_uninstall(&dp->lock->lcnt);
+ }
}
}
-static ERTS_INLINE void lcnt_enable_port_lock_count(Port *prt, int enable)
+static void lcnt_enable_port_lock_count(Port *prt, int enable)
{
erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
- if (!enable) {
- erts_lcnt_destroy_lock(&prt->sched.mtx.lcnt);
- if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
- erts_lcnt_destroy_lock(&prt->lock->lcnt);
+
+ if(enable) {
+ ErlDrvPDL pdl = prt->port_data_lock;
+
+ erts_lcnt_install_new_lock_info(&prt->sched.mtx.lcnt, "port_sched_lock",
+ prt->common.id, ERTS_LOCK_TYPE_MUTEX | ERTS_LOCK_FLAGS_CATEGORY_IO);
+
+ if(pdl) {
+ erts_lcnt_install_new_lock_info(&pdl->mtx.lcnt, "port_data_lock",
+ prt->common.id, ERTS_LOCK_TYPE_MUTEX | ERTS_LOCK_FLAGS_CATEGORY_IO);
+ }
+
+ if(state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) {
+ erts_lcnt_install_new_lock_info(&prt->lock->lcnt, "port_lock",
+ prt->common.id, ERTS_LOCK_TYPE_MUTEX | ERTS_LOCK_FLAGS_CATEGORY_IO);
+ }
+ } else {
+ erts_lcnt_uninstall(&prt->sched.mtx.lcnt);
+
+ if(prt->port_data_lock) {
+ erts_lcnt_uninstall(&prt->port_data_lock->mtx.lcnt);
+ }
+
+ if(state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) {
+ erts_lcnt_uninstall(&prt->lock->lcnt);
+ }
}
- else {
- erts_lcnt_init_lock_x(&prt->sched.mtx.lcnt,
- "port_sched_lock",
- ERTS_LCNT_LT_MUTEX,
- prt->common.id);
- if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
- erts_lcnt_init_lock_x(&prt->lock->lcnt,
- "port_lock",
- ERTS_LCNT_LT_MUTEX,
- prt->common.id);
+}
+
+void erts_lcnt_update_driver_locks(int enable) {
+ erts_driver_t *driver;
+
+ lcnt_enable_driver_lock_count(&vanilla_driver, enable);
+ lcnt_enable_driver_lock_count(&spawn_driver, enable);
+#ifndef __WIN32__
+ lcnt_enable_driver_lock_count(&forker_driver, enable);
+#endif
+ lcnt_enable_driver_lock_count(&fd_driver, enable);
+
+ erts_rwmtx_rlock(&erts_driver_list_lock);
+
+ for (driver = driver_list; driver; driver = driver->next) {
+ lcnt_enable_driver_lock_count(driver, enable);
}
+
+ erts_rwmtx_runlock(&erts_driver_list_lock);
}
-void erts_lcnt_enable_io_lock_count(int enable) {
- erts_driver_t *dp;
- int ix, max = erts_ptab_max(&erts_port);
- Port *prt;
+void erts_lcnt_update_port_locks(int enable) {
+ int i, max;
- for (ix = 0; ix < max; ix++) {
- if ((prt = erts_pix2port(ix)) != NULL) {
- lcnt_enable_port_lock_count(prt, enable);
+ max = erts_ptab_max(&erts_port);
+
+ for(i = 0; i < max; i++) {
+ int delay_handle;
+ Port *port;
+
+ delay_handle = erts_thr_progress_unmanaged_delay();
+ port = erts_pix2port(i);
+
+ if(port != NULL) {
+ lcnt_enable_port_lock_count(port, enable);
}
- } /* for all ports */
- lcnt_enable_drv_lock_count(&vanilla_driver, enable);
- lcnt_enable_drv_lock_count(&spawn_driver, enable);
-#ifndef __WIN32__
- lcnt_enable_drv_lock_count(&forker_driver, enable);
-#endif
- lcnt_enable_drv_lock_count(&fd_driver, enable);
- /* enable lock counting in all drivers */
- for (dp = driver_list; dp; dp = dp->next) {
- lcnt_enable_drv_lock_count(dp, enable);
+ if(delay_handle != ERTS_THR_PRGR_DHANDLE_MANAGED) {
+ erts_thr_progress_unmanaged_continue(delay_handle);
+ }
}
-} /* enable/disable lock counting of ports */
-#endif /* defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP) */
+}
+
+#endif /* defined(ERTS_ENABLE_LOCK_COUNT) */
+
/*
* Buffering of data when using line oriented I/O on ports
*/
@@ -3698,12 +3172,10 @@ deliver_result(Port *prt, Eterm sender, Eterm pid, Eterm res)
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
ASSERT(!prt || prt->common.id == sender);
-#ifdef ERTS_SMP
- ASSERT(!prt || erts_lc_is_port_locked(prt));
-#endif
+ ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));
ASSERT(is_internal_port(sender) && is_internal_pid(pid));
@@ -3731,7 +3203,7 @@ deliver_result(Port *prt, Eterm sender, Eterm pid, Eterm res)
erts_queue_message(rp, rp_locks, mp, tuple, sender);
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
@@ -3762,8 +3234,8 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
int scheduler = erts_get_scheduler_id() != 0;
int trace_send = IS_TRACED_FL(prt, F_TRACE_SEND);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_CHK_NO_PROC_LOCKS;
need = 3 + 3 + 2*hlen;
@@ -3789,25 +3261,11 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) {
listp = buf_to_intlist(&hp, buf, len, listp);
} else if (buf != NULL) {
- ProcBin* pb;
- Binary* bptr;
-
- bptr = erts_bin_nrml_alloc(len);
- erts_refc_init(&bptr->refc, 1);
+ Binary* bptr = erts_bin_nrml_alloc(len);
sys_memcpy(bptr->orig_bytes, buf, len);
- pb = (ProcBin *) hp;
- pb->thing_word = HEADER_PROC_BIN;
- pb->size = len;
- pb->next = ohp->first;
- ohp->first = (struct erl_off_heap_header*)pb;
- pb->val = bptr;
- pb->bytes = (byte*) bptr->orig_bytes;
- pb->flags = 0;
+ listp = erts_build_proc_bin(ohp, hp, bptr);
hp += PROC_BIN_SIZE;
-
- OH_OVERHEAD(ohp, pb->size / sizeof(Eterm));
- listp = make_binary(pb);
}
/* Prepend the header */
@@ -3828,10 +3286,9 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if (trace_send)
trace_port_send(prt, to, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
}
@@ -3866,7 +3323,7 @@ static void flush_linebuf_messages(Port *prt, erts_aint32_t state)
LineBufContext lc;
int ret;
- ERTS_SMP_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));
if (!prt)
return;
@@ -3910,8 +3367,8 @@ deliver_vec_message(Port* prt, /* Port */
erts_aint32_t state;
int trace_send = IS_TRACED_FL(prt, F_TRACE_SEND);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_CHK_NO_PROC_LOCKS;
/*
* Check arguments for validity.
@@ -4000,9 +3457,8 @@ deliver_vec_message(Port* prt, /* Port */
if (IS_TRACED_FL(prt, F_TRACE_SEND))
trace_port_send(prt, to, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
}
@@ -4037,8 +3493,8 @@ static void flush_port(Port *p)
{
int fpe_was_unmasked;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->flush != NULL) {
ERTS_MSACC_PUSH_STATE_M();
@@ -4070,11 +3526,9 @@ static void flush_port(Port *p)
if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(p, am_out, am_flush);
}
-#ifdef ERTS_SMP
if (p->xports)
erts_port_handle_xports(p);
ASSERT(!p->xports);
-#endif
}
if ((erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0
&& is_port_ioq_empty(p)) {
@@ -4092,11 +3546,12 @@ terminate_port(Port *prt)
erts_aint32_t state;
ErtsPrtSD *psd;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT(!ERTS_P_LINKS(prt));
ASSERT(!ERTS_P_MONITORS(prt));
+ ASSERT(!ERTS_P_LT_MONITORS(prt));
/* state may be altered by kill_port() below */
state = erts_atomic32_read_band_nob(&prt->state,
@@ -4135,11 +3590,9 @@ terminate_port(Port *prt)
(*drv->stop)((ErlDrvData)prt->drv_data);
erts_unblock_fpe(fpe_was_unmasked);
ERTS_MSACC_POP_STATE_M();
-#ifdef ERTS_SMP
if (prt->xports)
erts_port_handle_xports(prt);
ASSERT(!prt->xports);
-#endif
}
if (is_internal_port(send_closed_port_id)
@@ -4147,9 +3600,9 @@ terminate_port(Port *prt)
trace_port_send(prt, connected_id, am_closed, 1);
if(drv->handle != NULL) {
- erts_smp_rwmtx_rlock(&erts_driver_list_lock);
+ erts_rwmtx_rlock(&erts_driver_list_lock);
erts_ddll_decrement_port_count(drv->handle);
- erts_smp_rwmtx_runlock(&erts_driver_list_lock);
+ erts_rwmtx_runlock(&erts_driver_list_lock);
}
stopq(prt); /* clear queue memory */
if(prt->linebuf != NULL){
@@ -4159,12 +3612,12 @@ terminate_port(Port *prt)
erts_cleanup_port_data(prt);
- psd = (ErtsPrtSD *) erts_smp_atomic_read_nob(&prt->psd);
+ ASSERT(erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY) == NULL);
+
+ psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
if (psd)
erts_free(ERTS_ALC_T_PRTSD, psd);
- ASSERT(prt->dist_entry == NULL);
-
kill_port(prt);
/*
@@ -4172,7 +3625,7 @@ terminate_port(Port *prt)
* port has been removed from the port table (in kill_port()).
*/
if ((state & ERTS_PORT_SFLG_HALT)
- && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
+ && (erts_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
erts_port_release(prt); /* We will exit and never return */
erts_flush_async_exit(erts_halt_code, "");
}
@@ -4186,145 +3639,25 @@ erts_terminate_port(Port *pp)
terminate_port(pp);
}
-static void port_fire_one_monitor(ErtsMonitor *mon, void *ctx0);
-static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc)
-{
- switch (mon->type) {
- case MON_ORIGIN: {
- ErtsMonitor *rmon;
- Process *rp;
-
- ASSERT(is_internal_pid(mon->pid));
- rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK);
- if (!rp) {
- goto done;
- }
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- if (rmon == NULL) {
- goto done;
- }
- erts_destroy_monitor(rmon);
- } break;
- case MON_TARGET: {
- port_fire_one_monitor(mon, vpsc); /* forward call */
- } break;
- }
- done:
- erts_destroy_monitor(mon);
-}
-
-
-
typedef struct {
- Port *port;
+ Eterm port_id;
Eterm reason;
-} SweepContext;
+} ErtsPortExitContext;
-static void sweep_one_link(ErtsLink *lnk, void *vpsc)
+static void link_port_exit(ErtsLink *lnk, void *vpectxt)
{
- SweepContext *psc = vpsc;
- DistEntry *dep;
- Process *rp;
- Eterm port_id = psc->port->common.id;
-
- ASSERT(lnk->type == LINK_PID);
-
- if (IS_TRACED_FL(psc->port, F_TRACE_PORTS))
- trace_port(psc->port, am_unlink, lnk->pid);
-
- if (is_external_pid(lnk->pid)) {
- dep = external_pid_dist_entry(lnk->pid);
- if(dep != erts_this_dist_entry) {
- ErtsDistLinkData dld;
- ErtsDSigData dsd;
- int code;
- code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0);
- switch (code) {
- case ERTS_DSIG_PREP_NOT_ALIVE:
- case ERTS_DSIG_PREP_NOT_CONNECTED:
- break;
- case ERTS_DSIG_PREP_CONNECTED:
- erts_remove_dist_link(&dld, port_id, lnk->pid, dep);
- erts_destroy_dist_link(&dld);
- code = erts_dsig_send_exit(&dsd, port_id, lnk->pid,
- psc->reason);
- ASSERT(code == ERTS_DSIG_SEND_OK);
- break;
- default:
- ASSERT(! "Invalid dsig prepare result");
- break;
- }
- }
- } else {
- ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND;
- ASSERT(is_internal_pid(lnk->pid));
- rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
- if (rp) {
- ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), port_id);
-
- if (rlnk) {
- int xres = erts_send_exit_signal(NULL,
- port_id,
- rp,
- &rp_locks,
- psc->reason,
- NIL,
- NULL,
- 0);
- if (xres >= 0) {
- if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) {
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND);
- rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND;
- }
- /* We didn't exit the process and it is traced */
- if (IS_TRACED_FL(rp, F_TRACE_PROCS))
- trace_proc(NULL, 0, rp, am_getting_unlinked, port_id);
- }
- erts_destroy_link(rlnk);
- }
-
- erts_smp_proc_unlock(rp, rp_locks);
- }
- }
- erts_destroy_link(lnk);
+ ErtsPortExitContext *pectxt = vpectxt;
+ erts_proc_sig_send_link_exit(NULL, pectxt->port_id,
+ lnk, pectxt->reason, NIL);
}
-static void
-port_fire_one_monitor(ErtsMonitor *mon, void *ctx0)
+static void monitor_port_exit(ErtsMonitor *mon, void *vpectxt)
{
- Process *origin;
- ErtsProcLocks origin_locks;
-
- if (mon->type != MON_TARGET || ! is_pid(mon->pid)) {
- return;
- }
- /*
- * Proceed here if someone monitors us, we (port) are the target and
- * origin is some process
- */
- origin_locks = ERTS_PROC_LOCKS_MSG_SEND | ERTS_PROC_LOCK_LINK;
-
- origin = erts_pid2proc(NULL, 0, mon->pid, origin_locks);
- if (origin) {
- DeclareTmpHeapNoproc(lhp,3);
- SweepContext *ctx = (SweepContext *)ctx0;
- ErtsMonitor *rmon;
- Eterm watched = (is_atom(mon->name)
- ? TUPLE2(lhp, mon->name, erts_this_dist_entry->sysname)
- : ctx->port->common.id);
-
- erts_queue_monitor_message(origin, &origin_locks, mon->ref, am_port,
- watched, ctx->reason);
- UnUseTmpHeapNoproc(3);
-
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(origin), mon->ref);
- erts_smp_proc_unlock(origin, origin_locks);
-
- if (rmon) {
- erts_destroy_monitor(rmon);
- }
- }
+ ErtsPortExitContext *pectxt = vpectxt;
+ if (erts_monitor_is_target(mon))
+ erts_proc_sig_send_monitor_down(mon, pectxt->reason);
+ else
+ erts_proc_sig_send_demonitor(mon);
}
/* 'from' is sending 'this_port' an exit signal, (this_port must be internal).
@@ -4341,12 +3674,15 @@ int
erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
int drop_normal)
{
- ErtsLink *lnk;
+ ErtsLink *links;
+ ErtsMonitor *monitors;
+ ErtsMonitor *lt_monitors;
Eterm modified_reason;
erts_aint32_t state, set_state_flags;
+ ErtsPortExitContext pectxt;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
modified_reason = (reason == am_kill) ? am_killed : reason;
@@ -4391,28 +3727,43 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
set_busy_port(ERTS_Port2ErlDrvPort(prt), 0);
+ links = ERTS_P_LINKS(prt);
+ ERTS_P_LINKS(prt) = NULL;
+ monitors = ERTS_P_MONITORS(prt);
+ ERTS_P_MONITORS(prt) = NULL;
+ lt_monitors = ERTS_P_LT_MONITORS(prt);
+ ERTS_P_LT_MONITORS(prt) = NULL;
+
if (prt->common.u.alive.reg != NULL)
(void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name);
- {
- SweepContext sc = {prt, modified_reason};
- lnk = ERTS_P_LINKS(prt);
- ERTS_P_LINKS(prt) = NULL;
- erts_sweep_links(lnk, &sweep_one_link, &sc);
+ pectxt.port_id = prt->common.id;
+ pectxt.reason = modified_reason;
+
+ if (links)
+ erts_monitor_tree_foreach_delete(&links,
+ link_port_exit,
+ (void *) &pectxt);
+
+ if (monitors || lt_monitors) {
+ DRV_MONITOR_LOCK_PDL(prt);
+ if (monitors)
+ erts_monitor_tree_foreach_delete(&monitors,
+ monitor_port_exit,
+ (void *) &pectxt);
+ if (lt_monitors)
+ erts_monitor_list_foreach_delete(&lt_monitors,
+ monitor_port_exit,
+ (void *) &pectxt);
+ DRV_MONITOR_UNLOCK_PDL(prt);
}
- DRV_MONITOR_LOCK_PDL(prt);
- {
- SweepContext ctx = {prt, modified_reason};
- ErtsMonitor *moni = ERTS_P_MONITORS(prt);
- ERTS_P_MONITORS(prt) = NULL;
- erts_sweep_monitors(moni, &sweep_one_monitor, &ctx);
- }
- DRV_MONITOR_UNLOCK_PDL(prt);
-
- if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && prt->dist_entry) {
- erts_do_net_exits(prt->dist_entry, modified_reason);
- erts_deref_dist_entry(prt->dist_entry);
- prt->dist_entry = NULL;
+
+ if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ ASSERT(dep);
+ erts_do_net_exits(dep, modified_reason);
+ erts_deref_dist_entry(dep);
+ erts_prtsd_set(prt, ERTS_PRTSD_DIST_ENTRY, NULL);
erts_atomic32_read_band_relb(&prt->state,
~ERTS_PORT_SFLG_DISTRIBUTION);
}
@@ -4560,8 +3911,7 @@ static void
cleanup_scheduled_control(Binary *binp, char *bufp)
{
if (binp) {
- if (erts_refc_dectest(&binp->refc, 0) == 0)
- erts_bin_free(binp);
+ erts_bin_release(binp);
}
else {
if (bufp)
@@ -4614,17 +3964,9 @@ write_port_control_result(int control_flags,
else {
dbin = (ErlDrvBinary *) resp_bufp;
if (dbin->orig_size > ERL_ONHEAP_BIN_LIMIT) {
- ProcBin* pb = (ProcBin *) *hpp;
+ res = erts_build_proc_bin(ohp, *hpp, ErlDrvBinary2Binary(dbin));
*hpp += PROC_BIN_SIZE;
- pb->thing_word = HEADER_PROC_BIN;
- pb->size = dbin->orig_size;
- pb->next = ohp->first;
- ohp->first = (struct erl_off_heap_header *) pb;
- pb->val = ErlDrvBinary2Binary(dbin);
- pb->bytes = (byte*) dbin->orig_bytes;
- pb->flags = 0;
- OH_OVERHEAD(ohp, dbin->orig_size / sizeof(Eterm));
- return make_binary(pb);
+ return res;
}
resp_bufp = dbin->orig_bytes;
resp_size = dbin->orig_size;
@@ -4708,7 +4050,7 @@ port_sig_control(Port *prt,
prt);
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
goto done;
}
}
@@ -4761,7 +4103,7 @@ erts_port_control(Process* c_p,
int copy;
ErtsProc2PortSigData *sigdp;
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
if (sched_flags & ERTS_PTS_FLG_EXIT)
return ERTS_PORT_OP_BADARG;
@@ -4884,14 +4226,28 @@ erts_port_control(Process* c_p,
ASSERT(!tmp_alloced);
if (*ebinp == HEADER_SUB_BIN)
ebinp = binary_val(((ErlSubBin *) ebinp)->orig);
+
if (*ebinp != HEADER_PROC_BIN)
copy = 1;
else {
- binp = ((ProcBin *) ebinp)->val;
+ ProcBin *pb = (ProcBin *) ebinp;
+ int offset = bufp - pb->val->orig_bytes;
+
+ ASSERT(pb->val->orig_bytes <= bufp
+ && bufp + size <= pb->val->orig_bytes + pb->val->orig_size);
+
+ if (pb->flags) {
+ erts_emasculate_writable_binary(pb);
+
+ /* The procbin may have been reallocated, so update bufp */
+ bufp = pb->val->orig_bytes + offset;
+ }
+
+ binp = pb->val;
ASSERT(bufp <= bufp + size);
ASSERT(binp->orig_bytes <= bufp
&& bufp + size <= binp->orig_bytes + binp->orig_size);
- erts_refc_inc(&binp->refc, 1);
+ erts_refc_inc(&binp->intern.refc, 1);
}
}
@@ -4918,10 +4274,9 @@ erts_port_control(Process* c_p,
0,
NULL,
port_sig_control);
- if (res != ERTS_PORT_OP_SCHEDULED) {
- cleanup_scheduled_control(binp, bufp);
+ if (res != ERTS_PORT_OP_SCHEDULED)
return ERTS_PORT_OP_BADARG;
- }
+
return res;
}
@@ -5060,11 +4415,11 @@ port_sig_call(Port *prt,
prt);
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
goto done;
}
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
}
}
@@ -5098,7 +4453,7 @@ erts_port_call(Process* c_p,
erts_aint32_t sched_flags;
ErtsProc2PortSigData *sigdp;
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
if (sched_flags & ERTS_PTS_FLG_EXIT) {
return ERTS_PORT_OP_BADARG;
}
@@ -5211,10 +4566,9 @@ erts_port_call(Process* c_p,
0,
NULL,
port_sig_call);
- if (res != ERTS_PORT_OP_SCHEDULED) {
- cleanup_scheduled_call(bufp);
+ if (res != ERTS_PORT_OP_SCHEDULED)
return ERTS_PORT_OP_BADARG;
- }
+
return res;
}
@@ -5317,7 +4671,7 @@ port_sig_info(Port *prt,
prt);
}
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
return ERTS_PORT_REDS_INFO;
}
@@ -5386,7 +4740,7 @@ typedef struct {
Uint sched_id;
Eterm pid;
Uint32 refn[ERTS_REF_NUMBERS];
- erts_smp_atomic32_t refc;
+ erts_atomic32_t refc;
} ErtsIOBytesReq;
static void
@@ -5415,7 +4769,7 @@ reply_io_bytes(void *vreq)
rp_locks = ERTS_PROC_LOCK_MAIN;
}
- hsz = 5 /* 4-tuple */ + REF_THING_SIZE;
+ hsz = 5 /* 4-tuple */ + ERTS_REF_THING_SIZE;
erts_bld_uint64(NULL, &hsz, in);
erts_bld_uint64(NULL, &hsz, out);
@@ -5424,7 +4778,7 @@ reply_io_bytes(void *vreq)
ref = make_internal_ref(hp);
write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]);
- hp += REF_THING_SIZE;
+ hp += ERTS_REF_THING_SIZE;
ein = erts_bld_uint64(&hp, NULL, in);
eout = erts_bld_uint64(&hp, NULL, out);
@@ -5436,10 +4790,10 @@ reply_io_bytes(void *vreq)
if (req->sched_id == sched_id)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
- if (erts_smp_atomic32_dec_read_nob(&req->refc) == 0)
+ if (erts_atomic32_dec_read_nob(&req->refc) == 0)
erts_free(ERTS_ALC_T_IOB_REQ, req);
}
@@ -5453,7 +4807,7 @@ erts_request_io_bytes(Process *c_p)
ErtsIOBytesReq *req = erts_alloc(ERTS_ALC_T_IOB_REQ,
sizeof(ErtsIOBytesReq));
- hp = HAlloc(c_p, REF_THING_SIZE);
+ hp = HAlloc(c_p, ERTS_REF_THING_SIZE);
ref = erts_sched_make_ref_in_buffer(esdp, hp);
refn = internal_ref_numbers(ref);
@@ -5462,16 +4816,14 @@ erts_request_io_bytes(Process *c_p)
req->refn[0] = refn[0];
req->refn[1] = refn[1];
req->refn[2] = refn[2];
- erts_smp_atomic32_init_nob(&req->refc,
+ erts_atomic32_init_nob(&req->refc,
(erts_aint32_t) erts_no_schedulers);
-#ifdef ERTS_SMP
if (erts_no_schedulers > 1)
erts_schedule_multi_misc_aux_work(1,
erts_no_schedulers,
reply_io_bytes,
(void *) req);
-#endif
reply_io_bytes((void *) req);
@@ -5480,24 +4832,115 @@ erts_request_io_bytes(Process *c_p)
typedef struct {
- int to;
+ fmtfn_t to;
void *arg;
} prt_one_lnk_data;
static void prt_one_monitor(ErtsMonitor *mon, void *vprtd)
{
+ ErtsMonitorData *mdp = erts_monitor_to_data(mon);
prt_one_lnk_data *prtd = (prt_one_lnk_data *) vprtd;
- erts_print(prtd->to, prtd->arg, "(%T,%T)", mon->pid,mon->ref);
+ if (mon->type == ERTS_MON_TYPE_RESOURCE && erts_monitor_is_target(mon))
+ erts_print(prtd->to, prtd->arg, "(%p,%T)", mon->other.ptr, mdp->ref);
+ else
+ erts_print(prtd->to, prtd->arg, "(%T,%T)", mon->other.item, mdp->ref);
}
static void prt_one_lnk(ErtsLink *lnk, void *vprtd)
{
prt_one_lnk_data *prtd = (prt_one_lnk_data *) vprtd;
- erts_print(prtd->to, prtd->arg, "%T", lnk->pid);
+ erts_print(prtd->to, prtd->arg, "%T", lnk->other.item);
+}
+
+static void dump_port_state(fmtfn_t to, void *arg, erts_aint32_t state)
+{
+ erts_aint32_t rest;
+ int unknown = 0;
+ char delim = ' ';
+
+ erts_print(to, arg, "State:");
+
+ rest = state;
+ while (rest) {
+ erts_aint32_t chk = (rest ^ (rest-1)) & rest; /* lowest set bit */
+ char* s;
+
+ rest &= ~chk;
+ switch (chk) {
+ case ERTS_PORT_SFLG_CONNECTED: s = "CONNECTED"; break;
+ case ERTS_PORT_SFLG_EXITING: s = "EXITING"; break;
+ case ERTS_PORT_SFLG_DISTRIBUTION: s = "DISTR"; break;
+ case ERTS_PORT_SFLG_BINARY_IO: s = "BINARY_IO"; break;
+ case ERTS_PORT_SFLG_SOFT_EOF: s = "SOFT_EOF"; break;
+ case ERTS_PORT_SFLG_CLOSING: s = "CLOSING"; break;
+ case ERTS_PORT_SFLG_SEND_CLOSED: s = "SEND_CLOSED"; break;
+ case ERTS_PORT_SFLG_LINEBUF_IO: s = "LINEBUF_IO"; break;
+ case ERTS_PORT_SFLG_FREE: s = "FREE"; break;
+ case ERTS_PORT_SFLG_INITIALIZING: s = "INITIALIZING"; break;
+ case ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK: s = "PORT_LOCK"; break;
+ case ERTS_PORT_SFLG_INVALID: s = "INVALID"; break;
+ case ERTS_PORT_SFLG_HALT: s = "HALT"; break;
+#ifdef DEBUG
+ case ERTS_PORT_SFLG_PORT_DEBUG: s = "DEBUG"; break;
+#endif
+ default:
+ unknown = 1;
+ continue;
+ }
+ erts_print(to, arg, "%c%s", delim, s);
+ delim = '|';
+ }
+ if (unknown || !state)
+ erts_print(to, arg, "%c0x%x\n", delim, state);
+ else
+ erts_print(to, arg, "\n");
+}
+
+static void dump_port_task_flags(fmtfn_t to, void *arg, Port* p)
+{
+ erts_aint32_t flags = erts_atomic32_read_nob(&p->sched.flags);
+ erts_aint32_t unknown = 0;
+ char delim = ' ';
+
+ if (!flags)
+ return;
+
+ erts_print(to, arg, "Task Flags:");
+
+ while (flags) {
+ erts_aint32_t chk = (flags ^ (flags-1)) & flags; /* lowest set bit */
+ char* s;
+
+ flags &= ~chk;
+ switch (chk) {
+ case ERTS_PTS_FLG_IN_RUNQ: s = "IN_RUNQ"; break;
+ case ERTS_PTS_FLG_EXEC: s = "EXEC"; break;
+ case ERTS_PTS_FLG_HAVE_TASKS: s = "HAVE_TASKS"; break;
+ case ERTS_PTS_FLG_EXIT: s = "EXIT"; break;
+ case ERTS_PTS_FLG_BUSY_PORT: s = "BUSY_PORT"; break;
+ case ERTS_PTS_FLG_BUSY_PORT_Q: s = "BUSY_Q"; break;
+ case ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q: s = "CHK_UNSET_BUSY_Q"; break;
+ case ERTS_PTS_FLG_HAVE_BUSY_TASKS: s = "BUSY_TASKS"; break;
+ case ERTS_PTS_FLG_HAVE_NS_TASKS: s = "NS_TASKS"; break;
+ case ERTS_PTS_FLG_PARALLELISM: s = "PARALLELISM"; break;
+ case ERTS_PTS_FLG_FORCE_SCHED: s = "FORCE_SCHED"; break;
+ case ERTS_PTS_FLG_EXITING: s = "EXITING"; break;
+ case ERTS_PTS_FLG_EXEC_IMM: s = "EXEC_IMM"; break;
+ default:
+ unknown |= chk;
+ continue;
+ }
+ erts_print(to, arg, "%c%s", delim, s);
+ delim = '|';
+ }
+ if (unknown)
+ erts_print(to, arg, "%cUNKNOWN(0x%x)\n", delim, unknown);
+ else
+ erts_print(to, arg, "\n");
}
void
-print_port_info(Port *p, int to, void *arg)
+print_port_info(Port *p, fmtfn_t to, void *arg)
{
erts_aint32_t state = erts_atomic32_read_nob(&p->state);
@@ -5505,6 +4948,8 @@ print_port_info(Port *p, int to, void *arg)
return;
erts_print(to, arg, "=port:%T\n", p->common.id);
+ dump_port_state(to, arg, state);
+ dump_port_task_flags(to, arg, p);
erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id));
if (state & ERTS_PORT_SFLG_CONNECTED) {
erts_print(to, arg, "Connected: %T", ERTS_PORT_GET_CONNECTED(p));
@@ -5516,17 +4961,24 @@ print_port_info(Port *p, int to, void *arg)
prtd.to = to;
prtd.arg = arg;
erts_print(to, arg, "Links: ");
- erts_doforall_links(ERTS_P_LINKS(p), &prt_one_lnk, &prtd);
+ erts_link_tree_foreach(ERTS_P_LINKS(p), prt_one_lnk, (void *) &prtd);
erts_print(to, arg, "\n");
}
- if (ERTS_P_MONITORS(p)) {
+ if (ERTS_P_MONITORS(p) || ERTS_P_LT_MONITORS(p)) {
prt_one_lnk_data prtd;
prtd.to = to;
prtd.arg = arg;
erts_print(to, arg, "Monitors: ");
- erts_doforall_monitors(ERTS_P_MONITORS(p), &prt_one_monitor, &prtd);
+ erts_monitor_tree_foreach(ERTS_P_MONITORS(p), prt_one_monitor,
+ (void *) &prtd);
+ erts_monitor_list_foreach(ERTS_P_LT_MONITORS(p), prt_one_monitor,
+ (void *) &prtd);
erts_print(to, arg, "\n");
}
+ if (p->suspended) {
+ erts_print(to, arg, "Suspended: ");
+ erts_proclist_dump(to, arg, p->suspended);
+ }
if (p->common.u.alive.reg != NULL)
erts_print(to, arg, "Registered as: %T\n", p->common.u.alive.reg->name);
@@ -5544,6 +4996,14 @@ print_port_info(Port *p, int to, void *arg)
} else {
erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name);
}
+ erts_print(to, arg, "Input: %beu\n", p->bytes_in);
+ erts_print(to, arg, "Output: %beu\n", p->bytes_out);
+ erts_print(to, arg, "Queue: %beu\n", erts_ioq_size(&p->ioq));
+ {
+ Eterm port_data = erts_port_data_read(p);
+ if (port_data != am_undefined)
+ erts_print(to, arg, "Port Data: %T\n", port_data);
+ }
}
void
@@ -5556,14 +5016,14 @@ set_busy_port(ErlDrvPort dprt, int on)
DTRACE_CHARBUF(port_str, 16);
#endif
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
prt = erts_drvport2port(dprt);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return;
if (on) {
- flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags,
+ flags = erts_atomic32_read_bor_acqb(&prt->sched.flags,
ERTS_PTS_FLG_BUSY_PORT);
if (flags & ERTS_PTS_FLG_BUSY_PORT)
return; /* Already busy */
@@ -5579,7 +5039,7 @@ set_busy_port(ErlDrvPort dprt, int on)
}
#endif
} else {
- flags = erts_smp_atomic32_read_band_acqb(&prt->sched.flags,
+ flags = erts_atomic32_read_band_acqb(&prt->sched.flags,
~ERTS_PTS_FLG_BUSY_PORT);
if (!(flags & ERTS_PTS_FLG_BUSY_PORT))
return; /* Already non-busy */
@@ -5591,7 +5051,7 @@ set_busy_port(ErlDrvPort dprt, int on)
DTRACE1(port_not_busy, port_str);
}
#endif
- if (prt->dist_entry) {
+ if (erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY) != NULL) {
/*
* Processes suspended on distribution ports are
* normally queued on the dist entry.
@@ -5673,7 +5133,7 @@ int get_port_flags(ErlDrvPort ix)
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return 0;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
flags = 0;
if (state & ERTS_PORT_SFLG_BINARY_IO)
@@ -5689,8 +5149,8 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len)
int fpe_was_unmasked;
ERTS_MSACC_PUSH_STATE_M();
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(p));
if (len > (Uint) INT_MAX)
erts_exit(ERTS_ABORT_EXIT,
@@ -5719,10 +5179,10 @@ int async_ready(Port *p, void* data)
{
int need_free = 1;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (p) {
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(p));
if (p->drv_ptr->ready_async != NULL) {
ERTS_MSACC_PUSH_AND_SET_STATE_M(ERTS_MSACC_STATE_PORT);
#ifdef USE_VM_PROBES
@@ -5791,24 +5251,17 @@ erts_stale_drv_select(Eterm port,
switch (mode) {
case ERL_DRV_READ | ERL_DRV_WRITE:
type = "Input/Output";
- goto deselect;
case ERL_DRV_WRITE:
type = "Output";
- goto deselect;
case ERL_DRV_READ:
type = "Input";
- deselect:
- if (deselect) {
- driver_select(drv_port, hndl,
- mode | ERL_DRV_USE_NO_CALLBACK,
- 0);
- }
- break;
default:
- type = "Event";
- if (deselect)
- driver_event(drv_port, hndl, NULL);
- break;
+ type = "";
+ }
+ if (deselect) {
+ driver_select(drv_port, hndl,
+ mode | ERL_DRV_USE_NO_CALLBACK,
+ 0);
}
dsbufp = erts_create_logger_dsbuf();
@@ -5907,8 +5360,8 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
pid = ERTS_PORT_GET_CONNECTED(prt);
ASSERT(is_internal_pid(pid));
@@ -5928,10 +5381,9 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (IS_TRACED_FL(prt, F_TRACE_SEND))
trace_port_send(prt, pid, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
}
@@ -6385,22 +5837,12 @@ driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len)
mess = make_binary(hbp);
}
else {
- ProcBin* pbp;
+ Eterm* hp;
Binary* bp = erts_bin_nrml_alloc(size);
ASSERT(bufp);
- erts_refc_init(&bp->refc, 1);
sys_memcpy((void *) bp->orig_bytes, (void *) bufp, size);
- pbp = (ProcBin *) erts_produce_heap(&factory,
- PROC_BIN_SIZE, HEAP_EXTRA);
- pbp->thing_word = HEADER_PROC_BIN;
- pbp->size = size;
- pbp->next = factory.off_heap->first;
- factory.off_heap->first = (struct erl_off_heap_header*)pbp;
- pbp->val = bp;
- pbp->bytes = (byte*) bp->orig_bytes;
- pbp->flags = 0;
- OH_OVERHEAD(factory.off_heap, pbp->size / sizeof(Eterm));
- mess = make_binary(pbp);
+ hp = erts_produce_heap(&factory, PROC_BIN_SIZE, HEAP_EXTRA);
+ mess = erts_build_proc_bin(factory.off_heap, hp, bp);
}
ptr += 2;
break;
@@ -6544,8 +5986,6 @@ driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len)
from = prt->common.id;
}
- /* send message */
- ERL_MESSAGE_TOKEN(factory.message) = am_undefined;
erts_queue_message(rp, rp_locks, factory.message, mess, from);
}
else if (res == -2) {
@@ -6572,7 +6012,7 @@ driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len)
}
if (rp) {
if (rp_locks)
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
}
@@ -6587,9 +6027,7 @@ static ERTS_INLINE int
deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p,
Port **trace_prt)
{
-#ifdef ERTS_SMP
ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
-#endif
erts_aint32_t state;
int res = 1;
Port *prt = erts_port_lookup_raw((Eterm) port_id);
@@ -6607,24 +6045,20 @@ deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p,
goto done;
}
if (connected_p) {
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
ETHR_MEMBAR(ETHR_LoadLoad);
-#endif
*connected_p = ERTS_PORT_GET_CONNECTED(prt);
}
done:
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
- ERTS_SMP_LC_ASSERT(!prt || !erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(!prt || !erts_lc_is_port_locked(prt));
erts_thr_progress_unmanaged_continue(dhndl);
ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
} else
-#endif
if (res == 1) {
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
*trace_prt = prt;
}
return res;
@@ -6652,13 +6086,13 @@ driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len)
erts_aint32_t state;
Port* prt;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
/* NOTE! It *not* safe to access 'drvport' from unmanaged threads. */
prt = erts_drvport2port_state(drvport, &state);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1; /* invalid (dead) */
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
@@ -6695,16 +6129,14 @@ driver_send_term(ErlDrvPort drvport,
* internal data representation for ErlDrvPort.
*/
Port* prt = NULL;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
-#ifdef ERTS_SMP
+ ERTS_CHK_NO_PROC_LOCKS;
if (erts_thr_progress_is_managed_thread())
-#endif
{
erts_aint32_t state;
prt = erts_drvport2port_state(drvport, &state);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1; /* invalid (dead) */
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
}
@@ -6724,11 +6156,11 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
Port* prt = erts_drvport2port_state(ix, &state);
ErtsSchedulerData *esdp = erts_get_scheduler_data();
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
@@ -6738,8 +6170,12 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
else
erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
+ DistEntry* dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ Uint32 conn_id = (Uint32)(UWord) erts_prtsd_get(prt, ERTS_PRTSD_CONN_ID);
+ erts_atomic64_inc_nob(&dep->in);
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
(byte*) hbuf, hlen,
(byte*) (bin->orig_bytes+offs), len);
}
@@ -6763,12 +6199,12 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
Port* prt = erts_drvport2port_state(ix, &state);
ErtsSchedulerData *esdp = erts_get_scheduler_data();
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
@@ -6778,14 +6214,19 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
else
erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ Uint32 conn_id = (Uint32)(UWord) erts_prtsd_get(prt, ERTS_PRTSD_CONN_ID);
+ erts_atomic64_inc_nob(&dep->in);
if (len == 0)
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
NULL, 0,
(byte*) hbuf, hlen);
else
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
(byte*) hbuf, hlen,
(byte*) buf, len);
}
@@ -6802,7 +6243,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
int driver_output(ErlDrvPort ix, char* buf, ErlDrvSizeT len)
{
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
return driver_output2(ix, NULL, 0, buf, len);
}
@@ -6818,7 +6259,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
erts_aint32_t state;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
ASSERT(vec->size >= skip);
if (vec->size <= skip)
@@ -6829,7 +6270,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
@@ -6890,12 +6331,6 @@ ErlDrvSizeT driver_vec_to_buf(ErlIOVec *vec, char *buf, ErlDrvSizeT len)
return (orig_len - len);
}
-
-/*
- * - driver_alloc_binary() is thread safe (efile driver depend on it).
- * - driver_realloc_binary(), and driver_free_binary() are *not* thread safe.
- */
-
/*
* reference count on driver binaries...
*/
@@ -6904,21 +6339,21 @@ ErlDrvSInt
driver_binary_get_refc(ErlDrvBinary *dbp)
{
Binary* bp = ErlDrvBinary2Binary(dbp);
- return (ErlDrvSInt) erts_refc_read(&bp->refc, 1);
+ return (ErlDrvSInt) erts_refc_read(&bp->intern.refc, 1);
}
ErlDrvSInt
driver_binary_inc_refc(ErlDrvBinary *dbp)
{
Binary* bp = ErlDrvBinary2Binary(dbp);
- return (ErlDrvSInt) erts_refc_inctest(&bp->refc, 2);
+ return (ErlDrvSInt) erts_refc_inctest(&bp->intern.refc, 2);
}
ErlDrvSInt
driver_binary_dec_refc(ErlDrvBinary *dbp)
{
Binary* bp = ErlDrvBinary2Binary(dbp);
- return (ErlDrvSInt) erts_refc_dectest(&bp->refc, 1);
+ return (ErlDrvSInt) erts_refc_dectest(&bp->intern.refc, 1);
}
@@ -6934,30 +6369,18 @@ driver_alloc_binary(ErlDrvSizeT size)
bin = erts_bin_drv_alloc_fnf((Uint) size);
if (!bin)
return NULL; /* The driver write must take action */
- erts_refc_init(&bin->refc, 1);
return Binary2ErlDrvBinary(bin);
}
-/* Reallocate space hold by binary */
+/* Reallocate space held by binary */
ErlDrvBinary* driver_realloc_binary(ErlDrvBinary* bin, ErlDrvSizeT size)
{
Binary* oldbin;
Binary* newbin;
- if (!bin) {
- erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- erts_dsprintf(dsbufp,
- "Bad use of driver_realloc_binary(%p, %lu): "
- "called with ",
- bin, (unsigned long)size);
- if (!bin) {
- erts_dsprintf(dsbufp, "NULL pointer as first argument");
- }
- erts_send_warning_to_logger_nogl(dsbufp);
- if (!bin)
- return driver_alloc_binary(size);
- }
+ if (!bin)
+ return driver_alloc_binary(size);
oldbin = ErlDrvBinary2Binary(bin);
newbin = (Binary *) erts_bin_realloc_fnf(oldbin, size);
@@ -6971,18 +6394,11 @@ ErlDrvBinary* driver_realloc_binary(ErlDrvBinary* bin, ErlDrvSizeT size)
void driver_free_binary(ErlDrvBinary* dbin)
{
Binary *bin;
- if (!dbin) {
- erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- erts_dsprintf(dsbufp,
- "Bad use of driver_free_binary(%p): called with "
- "NULL pointer as argument", dbin);
- erts_send_warning_to_logger_nogl(dsbufp);
+ if (!dbin)
return;
- }
bin = ErlDrvBinary2Binary(dbin);
- if (erts_refc_dectest(&bin->refc, 0) == 0)
- erts_bin_free(bin);
+ erts_bin_release(bin);
}
@@ -7068,7 +6484,6 @@ static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl)
erts_free(ERTS_ALC_T_PORT_DATA_LOCK, pdl);
}
-#ifdef ERTS_SMP
static void driver_monitor_lock_pdl(Port *p) {
if (p->port_data_lock) {
@@ -7077,7 +6492,7 @@ static void driver_monitor_lock_pdl(Port *p) {
/* Now we either have the port lock or the port_data_lock */
ERTS_LC_ASSERT(!p->port_data_lock
|| erts_lc_mtx_is_locked(&(p->port_data_lock->mtx)));
- ERTS_SMP_LC_ASSERT(p->port_data_lock
+ ERTS_LC_ASSERT(p->port_data_lock
|| erts_lc_is_port_locked(p));
}
@@ -7085,14 +6500,13 @@ static void driver_monitor_unlock_pdl(Port *p) {
/* We should either have the port lock or the port_data_lock */
ERTS_LC_ASSERT(!p->port_data_lock
|| erts_lc_mtx_is_locked(&(p->port_data_lock->mtx)));
- ERTS_SMP_LC_ASSERT(p->port_data_lock
+ ERTS_LC_ASSERT(p->port_data_lock
|| erts_lc_is_port_locked(p));
if (p->port_data_lock) {
driver_pdl_unlock(p->port_data_lock);
}
}
-#endif
/*
* exported driver_pdl_* functions ...
@@ -7107,7 +6521,7 @@ driver_pdl_create(ErlDrvPort dp)
return NULL;
pdl = erts_alloc(ERTS_ALC_T_PORT_DATA_LOCK,
sizeof(struct erl_drv_port_data_lock));
- erts_mtx_init_x(&pdl->mtx, "port_data_lock", pp->common.id, 1);
+ erts_mtx_init(&pdl->mtx, "port_data_lock", pp->common.id, ERTS_LOCK_FLAGS_CATEGORY_IO);
pdl_init_refc(pdl);
erts_port_inc_refc(pp);
pdl->prt = pp;
@@ -7171,307 +6585,51 @@ driver_pdl_dec_refc(ErlDrvPDL pdl)
return refc;
}
-/* expand queue to hold n elements in tail or head */
-static int expandq(ErlIOQueue* q, int n, int tail)
-/* tail: 0 if make room in head, make room in tail otherwise */
-{
- int h_sz; /* room before header */
- int t_sz; /* room after tail */
- int q_sz; /* occupied */
- int nvsz;
- SysIOVec* niov;
- ErlDrvBinary** nbinv;
-
- h_sz = q->v_head - q->v_start;
- t_sz = q->v_end - q->v_tail;
- q_sz = q->v_tail - q->v_head;
-
- if (tail && (n <= t_sz)) /* do we need to expand tail? */
- return 0;
- else if (!tail && (n <= h_sz)) /* do we need to expand head? */
- return 0;
- else if (n > (h_sz + t_sz)) { /* need to allocate */
- /* we may get little extra but it ok */
- nvsz = (q->v_end - q->v_start) + n;
-
- niov = erts_alloc_fnf(ERTS_ALC_T_IOQ, nvsz * sizeof(SysIOVec));
- if (!niov)
- return -1;
- nbinv = erts_alloc_fnf(ERTS_ALC_T_IOQ, nvsz * sizeof(ErlDrvBinary**));
- if (!nbinv) {
- erts_free(ERTS_ALC_T_IOQ, (void *) niov);
- return -1;
- }
- if (tail) {
- sys_memcpy(niov, q->v_head, q_sz*sizeof(SysIOVec));
- if (q->v_start != q->v_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
- q->v_start = niov;
- q->v_end = niov + nvsz;
- q->v_head = q->v_start;
- q->v_tail = q->v_head + q_sz;
-
- sys_memcpy(nbinv, q->b_head, q_sz*sizeof(ErlDrvBinary*));
- if (q->b_start != q->b_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
- q->b_start = nbinv;
- q->b_end = nbinv + nvsz;
- q->b_head = q->b_start;
- q->b_tail = q->b_head + q_sz;
- }
- else {
- sys_memcpy(niov+nvsz-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
- if (q->v_start != q->v_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
- q->v_start = niov;
- q->v_end = niov + nvsz;
- q->v_tail = q->v_end;
- q->v_head = q->v_tail - q_sz;
-
- sys_memcpy(nbinv+nvsz-q_sz, q->b_head, q_sz*sizeof(ErlDrvBinary*));
- if (q->b_start != q->b_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
- q->b_start = nbinv;
- q->b_end = nbinv + nvsz;
- q->b_tail = q->b_end;
- q->b_head = q->b_tail - q_sz;
- }
- }
- else if (tail) { /* move to beginning to make room in tail */
- sys_memmove(q->v_start, q->v_head, q_sz*sizeof(SysIOVec));
- q->v_head = q->v_start;
- q->v_tail = q->v_head + q_sz;
- sys_memmove(q->b_start, q->b_head, q_sz*sizeof(ErlDrvBinary*));
- q->b_head = q->b_start;
- q->b_tail = q->b_head + q_sz;
- }
- else { /* move to end to make room */
- sys_memmove(q->v_end-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
- q->v_tail = q->v_end;
- q->v_head = q->v_tail-q_sz;
- sys_memmove(q->b_end-q_sz, q->b_head, q_sz*sizeof(ErlDrvBinary*));
- q->b_tail = q->b_end;
- q->b_head = q->b_tail-q_sz;
- }
-
- return 0;
-}
-
-
-
/* Put elements from vec at q tail */
int driver_enqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip)
{
- int n;
- size_t len;
- ErlDrvSizeT size;
- SysIOVec* iov;
- ErlDrvBinary** binv;
- ErlDrvBinary* b;
- ErlIOQueue* q = drvport2ioq(ix);
-
- if (q == NULL)
- return -1;
-
- ASSERT(vec->size >= skip); /* debug only */
- if (vec->size <= skip)
- return 0;
- size = vec->size - skip;
-
- iov = vec->iov;
- binv = vec->binv;
- n = vec->vsize;
-
- /* we use do here to strip iov_len=0 from beginning */
- do {
- len = iov->iov_len;
- if (len <= skip) {
- skip -= len;
- iov++;
- binv++;
- n--;
- }
- else {
- iov->iov_base = ((char *)(iov->iov_base)) + skip;
- iov->iov_len -= skip;
- skip = 0;
- }
- } while(skip > 0);
-
- if (q->v_tail + n >= q->v_end)
- expandq(q, n, 1);
-
- /* Queue and reference all binaries (remove zero length items) */
- while(n--) {
- if ((len = iov->iov_len) > 0) {
- if ((b = *binv) == NULL) { /* speical case create binary ! */
- b = driver_alloc_binary(len);
- sys_memcpy(b->orig_bytes, iov->iov_base, len);
- *q->b_tail++ = b;
- q->v_tail->iov_len = len;
- q->v_tail->iov_base = b->orig_bytes;
- q->v_tail++;
- }
- else {
- driver_binary_inc_refc(b);
- *q->b_tail++ = b;
- *q->v_tail++ = *iov;
- }
- }
- iov++;
- binv++;
- }
- q->size += size; /* update total size in queue */
- return 0;
+ ASSERT(vec->size >= skip);
+ return erts_ioq_enqv(drvport2ioq(ix), (ErtsIOVec*)vec, skip);
}
/* Put elements from vec at q head */
int driver_pushqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip)
{
- int n;
- size_t len;
- ErlDrvSizeT size;
- SysIOVec* iov;
- ErlDrvBinary** binv;
- ErlDrvBinary* b;
- ErlIOQueue* q = drvport2ioq(ix);
-
- if (q == NULL)
- return -1;
-
- if (vec->size <= skip)
- return 0;
- size = vec->size - skip;
-
- iov = vec->iov;
- binv = vec->binv;
- n = vec->vsize;
-
- /* we use do here to strip iov_len=0 from beginning */
- do {
- len = iov->iov_len;
- if (len <= skip) {
- skip -= len;
- iov++;
- binv++;
- n--;
- }
- else {
- iov->iov_base = ((char *)(iov->iov_base)) + skip;
- iov->iov_len -= skip;
- skip = 0;
- }
- } while(skip > 0);
-
- if (q->v_head - n < q->v_start)
- expandq(q, n, 0);
-
- /* Queue and reference all binaries (remove zero length items) */
- iov += (n-1); /* move to end */
- binv += (n-1); /* move to end */
- while(n--) {
- if ((len = iov->iov_len) > 0) {
- if ((b = *binv) == NULL) { /* speical case create binary ! */
- b = driver_alloc_binary(len);
- sys_memcpy(b->orig_bytes, iov->iov_base, len);
- *--q->b_head = b;
- q->v_head--;
- q->v_head->iov_len = len;
- q->v_head->iov_base = b->orig_bytes;
- }
- else {
- driver_binary_inc_refc(b);
- *--q->b_head = b;
- *--q->v_head = *iov;
- }
- }
- iov--;
- binv--;
- }
- q->size += size; /* update total size in queue */
- return 0;
+ ASSERT(vec->size >= skip);
+ return erts_ioq_pushqv(drvport2ioq(ix), (ErtsIOVec*)vec, skip);
}
-
/*
** Remove size bytes from queue head
** Return number of bytes that remain in queue
*/
ErlDrvSizeT driver_deq(ErlDrvPort ix, ErlDrvSizeT size)
{
- ErlIOQueue* q = drvport2ioq(ix);
- ErlDrvSizeT len;
-
- if ((q == NULL) || (q->size < size))
- return -1;
- q->size -= size;
- while (size > 0) {
- ASSERT(q->v_head != q->v_tail);
-
- len = q->v_head->iov_len;
- if (len <= size) {
- size -= len;
- driver_free_binary(*q->b_head);
- *q->b_head++ = NULL;
- q->v_head++;
- }
- else {
- q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size;
- q->v_head->iov_len -= size;
- size = 0;
- }
- }
-
- /* restart pointers (optimised for enq) */
- if (q->v_head == q->v_tail) {
- q->v_head = q->v_tail = q->v_start;
- q->b_head = q->b_tail = q->b_start;
- }
- return q->size;
+ ErlPortIOQueue *q = drvport2ioq(ix);
+ if (erts_ioq_deq(q, size) == -1)
+ return -1;
+ return erts_ioq_size(q);
}
-ErlDrvSizeT driver_peekqv(ErlDrvPort ix, ErlIOVec *ev) {
- ErlIOQueue *q = drvport2ioq(ix);
- ASSERT(ev);
-
- if (! q) {
- return (ErlDrvSizeT) -1;
- } else {
- if ((ev->vsize = q->v_tail - q->v_head) == 0) {
- ev->size = 0;
- ev->iov = NULL;
- ev->binv = NULL;
- } else {
- ev->size = q->size;
- ev->iov = q->v_head;
- ev->binv = q->b_head;
- }
- return q->size;
- }
+ErlDrvSizeT driver_peekqv(ErlDrvPort ix, ErlIOVec *ev)
+{
+ return erts_ioq_peekqv(drvport2ioq(ix), (ErtsIOVec*)ev);
}
SysIOVec* driver_peekq(ErlDrvPort ix, int* vlenp) /* length of io-vector */
{
- ErlIOQueue* q = drvport2ioq(ix);
-
- if (q == NULL) {
- *vlenp = -1;
- return NULL;
- }
- if ((*vlenp = (q->v_tail - q->v_head)) == 0)
- return NULL;
- return q->v_head;
+ return erts_ioq_peekq(drvport2ioq(ix), vlenp);
}
ErlDrvSizeT driver_sizeq(ErlDrvPort ix)
{
- ErlIOQueue* q = drvport2ioq(ix);
+ ErlPortIOQueue *q = drvport2ioq(ix);
if (q == NULL)
- return (size_t) -1;
- return q->size;
+ return (ErlDrvSizeT) -1;
+ return erts_ioq_size(q);
}
@@ -7551,7 +6709,7 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t)
{
Port* prt = erts_drvport2port(ix);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
@@ -7568,7 +6726,7 @@ int driver_cancel_timer(ErlDrvPort ix)
Port* prt = erts_drvport2port(ix);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
erts_cancel_port_timer(prt);
return 0;
}
@@ -7579,11 +6737,11 @@ driver_read_timer(ErlDrvPort ix, unsigned long* t)
Port* prt = erts_drvport2port(ix);
Sint64 left;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
left = erts_read_port_timer(prt);
if (left < 0)
@@ -7598,7 +6756,7 @@ int
driver_get_now(ErlDrvNowData *now_data)
{
Uint mega,secs,micro;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (now_data == NULL) {
return -1;
@@ -7632,41 +6790,47 @@ erl_drv_convert_time_unit(ErlDrvTime val,
(int) to);
}
-static void ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon)
+void erts_ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon)
{
- RefThing *refp;
- ASSERT(is_internal_ref(ref));
- ERTS_CT_ASSERT(sizeof(RefThing) <= sizeof(ErlDrvMonitor));
- refp = ref_thing_ptr(ref);
- memset(mon,0,sizeof(ErlDrvMonitor));
- memcpy(mon,refp,sizeof(RefThing));
+ ERTS_CT_ASSERT(ERTS_REF_THING_SIZE*sizeof(Uint) <= sizeof(ErlDrvMonitor));
+ ASSERT(is_internal_ordinary_ref(ref));
+ sys_memcpy((void *) mon, (void *) internal_ref_val(ref),
+ ERTS_REF_THING_SIZE*sizeof(Uint));
}
+Eterm erts_driver_monitor_to_ref(Eterm *hp, const ErlDrvMonitor *mon)
+{
+ Eterm ref;
+ ERTS_CT_ASSERT(ERTS_REF_THING_SIZE*sizeof(Uint) <= sizeof(ErlDrvMonitor));
+ sys_memcpy((void *) hp, (void *) mon, ERTS_REF_THING_SIZE*sizeof(Uint));
+ ref = make_internal_ref(hp);
+ ASSERT(is_internal_ordinary_ref(ref));
+ return ref;
+}
static int do_driver_monitor_process(Port *prt,
- Eterm *buf,
ErlDrvTermData process,
ErlDrvMonitor *monitor)
{
- Process *rp;
+ Eterm buf[ERTS_REF_THING_SIZE];
Eterm ref;
+ ErtsMonitorData *mdp;
- if (prt->drv_ptr->process_exit == NULL) {
+ if (!prt->drv_ptr->process_exit)
return -1;
- }
- rp = erts_pid2proc_opt(NULL, 0,
- (Eterm) process, ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- if (!rp) {
- return 1;
- }
ref = erts_make_ref_in_buffer(buf);
- erts_add_monitor(&ERTS_P_MONITORS(prt), MON_ORIGIN, ref, rp->common.id, NIL);
- erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, prt->common.id, NIL);
+ mdp = erts_monitor_create(ERTS_MON_TYPE_PORT, ref,
+ prt->common.id, process, NIL);
+
+ if (!erts_proc_sig_send_monitor(&mdp->target, process)) {
+ erts_monitor_release_both(mdp);
+ return 1;
+ }
+
+ erts_monitor_tree_insert(&ERTS_P_MONITORS(prt), &mdp->origin);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- ref_to_driver_monitor(ref,monitor);
+ erts_ref_to_driver_monitor(ref,monitor);
return 0;
}
@@ -7679,7 +6843,7 @@ int driver_monitor_process(ErlDrvPort drvport,
{
Port *prt;
int ret;
-#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
+#if defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -7689,51 +6853,26 @@ int driver_monitor_process(ErlDrvPort drvport,
/* Now (in SMP) we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
- ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
- {
- DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
- UseTmpHeapNoproc(REF_THING_SIZE);
- ret = do_driver_monitor_process(prt,buf,process,monitor);
- UnUseTmpHeapNoproc(REF_THING_SIZE);
- }
+ ERTS_LC_ASSERT((sched != NULL || prt->port_data_lock));
+ ret = do_driver_monitor_process(prt,process,monitor);
DRV_MONITOR_UNLOCK_PDL(prt);
return ret;
}
-static int do_driver_demonitor_process(Port *prt, Eterm *buf,
- const ErlDrvMonitor *monitor)
+static int do_driver_demonitor_process(Port *prt, const ErlDrvMonitor *monitor)
{
- Process *rp;
+ Eterm heap[ERTS_REF_THING_SIZE];
Eterm ref;
ErtsMonitor *mon;
- Eterm to;
- memcpy(buf,monitor,sizeof(Eterm)*REF_THING_SIZE);
- ref = make_internal_ref(buf);
- mon = erts_lookup_monitor(ERTS_P_MONITORS(prt), ref);
- if (mon == NULL) {
- return 1;
- }
- ASSERT(mon->type == MON_ORIGIN);
- to = mon->pid;
- ASSERT(is_internal_pid(to));
- rp = erts_pid2proc_opt(NULL,
- 0,
- to,
- ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- mon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref);
- if (mon) {
- erts_destroy_monitor(mon);
- }
- if (rp) {
- ErtsMonitor *rmon;
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- if (rmon != NULL) {
- erts_destroy_monitor(rmon);
- }
- }
+ ref = erts_driver_monitor_to_ref(heap, monitor);
+
+ mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(prt), ref);
+ if (!mon || !erts_monitor_is_origin(mon))
+ return 1;
+
+ erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), mon);
+ erts_proc_sig_send_demonitor(mon);
return 0;
}
@@ -7742,7 +6881,7 @@ int driver_demonitor_process(ErlDrvPort drvport,
{
Port *prt;
int ret;
-#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
+#if defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -7752,34 +6891,26 @@ int driver_demonitor_process(ErlDrvPort drvport,
/* Now we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
- ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
- {
- DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
- UseTmpHeapNoproc(REF_THING_SIZE);
- ret = do_driver_demonitor_process(prt,buf,monitor);
- UnUseTmpHeapNoproc(REF_THING_SIZE);
- }
+ ERTS_LC_ASSERT((sched != NULL || prt->port_data_lock));
+ ret = do_driver_demonitor_process(prt,monitor);
DRV_MONITOR_UNLOCK_PDL(prt);
return ret;
}
-static ErlDrvTermData do_driver_get_monitored_process(Port *prt, Eterm *buf,
- const ErlDrvMonitor *monitor)
+static ErlDrvTermData do_driver_get_monitored_process(Port *prt,const ErlDrvMonitor *monitor)
{
Eterm ref;
ErtsMonitor *mon;
- Eterm to;
+ Eterm heap[ERTS_REF_THING_SIZE];
+
+ ref = erts_driver_monitor_to_ref(heap, monitor);
- memcpy(buf,monitor,sizeof(Eterm)*REF_THING_SIZE);
- ref = make_internal_ref(buf);
- mon = erts_lookup_monitor(ERTS_P_MONITORS(prt), ref);
- if (mon == NULL) {
+ mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(prt), ref);
+ if (!mon || !erts_monitor_is_origin(mon))
return driver_term_nil;
- }
- ASSERT(mon->type == MON_ORIGIN);
- to = mon->pid;
- ASSERT(is_internal_pid(to));
- return (ErlDrvTermData) to;
+
+ ASSERT(is_internal_pid(mon->other.item));
+ return (ErlDrvTermData) mon->other.item;
}
@@ -7788,7 +6919,7 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
{
Port *prt;
ErlDrvTermData ret;
-#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
+#if defined(ERTS_ENABLE_LOCK_CHECK)
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
@@ -7798,42 +6929,40 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
/* Now we should have either the port lock (if we have a scheduler) or the port data lock
(if we're a driver thread) */
- ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
- {
- DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
- UseTmpHeapNoproc(REF_THING_SIZE);
- ret = do_driver_get_monitored_process(prt,buf,monitor);
- UnUseTmpHeapNoproc(REF_THING_SIZE);
- }
+ ERTS_LC_ASSERT((sched != NULL || prt->port_data_lock));
+ ret = do_driver_get_monitored_process(prt,monitor);
DRV_MONITOR_UNLOCK_PDL(prt);
return ret;
}
-
int driver_compare_monitors(const ErlDrvMonitor *monitor1,
const ErlDrvMonitor *monitor2)
{
- return memcmp(monitor1,monitor2,sizeof(ErlDrvMonitor));
+ return sys_memcmp((void *) monitor1, (void *) monitor2,
+ ERTS_REF_THING_SIZE*sizeof(Eterm));
}
-void erts_fire_port_monitor(Port *prt, Eterm ref)
+void erts_fire_port_monitor(Port *prt, ErtsMonitor *tmon)
{
- ErtsMonitor *rmon;
+ ErtsMonitorData *mdp;
void (*callback)(ErlDrvData drv_data, ErlDrvMonitor *monitor);
ErlDrvMonitor drv_monitor;
int fpe_was_unmasked;
ERTS_MSACC_PUSH_STATE_M();
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- ASSERT(prt->drv_ptr != NULL);
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ASSERT(prt->drv_ptr != NULL);
+ ASSERT(erts_monitor_is_target(tmon));
+ mdp = erts_monitor_to_data(tmon);
DRV_MONITOR_LOCK_PDL(prt);
- if (erts_lookup_monitor(ERTS_P_MONITORS(prt), ref) == NULL) {
+ if (!erts_monitor_is_in_table(&mdp->origin)) {
DRV_MONITOR_UNLOCK_PDL(prt);
+ erts_monitor_release(tmon);
return;
}
callback = prt->drv_ptr->process_exit;
ASSERT(callback != NULL);
- ref_to_driver_monitor(ref,&drv_monitor);
+ erts_ref_to_driver_monitor(mdp->ref,&drv_monitor);
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
DRV_MONITOR_UNLOCK_PDL(prt);
#ifdef USE_VM_PROBES
@@ -7857,11 +6986,9 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
DRV_MONITOR_LOCK_PDL(prt);
ERTS_MSACC_POP_STATE_M();
/* remove monitor *after* callback */
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref);
+ erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), &mdp->origin);
DRV_MONITOR_UNLOCK_PDL(prt);
- if (rmon) {
- erts_destroy_monitor(rmon);
- }
+ erts_monitor_release_both(mdp);
}
@@ -7871,11 +6998,11 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
erts_aint32_t state;
Port* prt = erts_drvport2port_state(ix, &state);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if (prt->async_open_port)
init_ack_send_reply(prt, prt->common.id);
@@ -7906,34 +7033,19 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
int driver_exit(ErlDrvPort ix, int err)
{
Port* prt = erts_drvport2port(ix);
- Process* rp;
- ErtsLink *lnk, *rlnk = NULL;
+ ErtsLink *lnk;
Eterm connected;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
connected = ERTS_PORT_GET_CONNECTED(prt);
- rp = erts_pid2proc(NULL, 0, connected, ERTS_PROC_LOCK_LINK);
- if (rp) {
- rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id);
- }
-
- lnk = erts_remove_link(&ERTS_P_LINKS(prt), connected);
-
-#ifdef ERTS_SMP
- if (rp)
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
-#endif
-
- if (rlnk != NULL) {
- erts_destroy_link(rlnk);
- }
-
- if (lnk != NULL) {
- erts_destroy_link(lnk);
+ lnk = erts_link_tree_lookup(ERTS_P_LINKS(prt), connected);
+ if (lnk) {
+ erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk);
+ erts_proc_sig_send_unlink(NULL, lnk);
}
if (err == 0)
@@ -7956,7 +7068,7 @@ int driver_failure_atom(ErlDrvPort ix, char* string)
{
return driver_failure_term(ix,
erts_atom_put((byte *) string,
- strlen(string),
+ sys_strlen(string),
ERTS_ATOM_ENC_LATIN1,
1),
0);
@@ -7980,7 +7092,7 @@ ErlDrvTermData driver_mk_atom(char* string)
sys_strlen(string),
ERTS_ATOM_ENC_LATIN1,
1);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
return (ErlDrvTermData) am;
}
@@ -7989,27 +7101,27 @@ ErlDrvTermData driver_mk_port(ErlDrvPort ix)
Port* prt = erts_drvport2port(ix);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return (ErlDrvTermData) NIL;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
return (ErlDrvTermData) prt->common.id;
}
ErlDrvTermData driver_connected(ErlDrvPort ix)
{
Port* prt = erts_drvport2port(ix);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return NIL;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
return ERTS_PORT_GET_CONNECTED(prt);
}
ErlDrvTermData driver_caller(ErlDrvPort ix)
{
Port* prt = erts_drvport2port(ix);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return NIL;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
return prt->caller;
}
@@ -8018,20 +7130,20 @@ int driver_lock_driver(ErlDrvPort ix)
Port* prt = erts_drvport2port(ix);
DE_Handle* dh;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
- erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
+ erts_rwmtx_rwlock(&erts_driver_list_lock);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
if ((dh = (DE_Handle*)prt->drv_ptr->handle ) == NULL) {
- erts_smp_rwmtx_rwunlock(&erts_driver_list_lock);
+ erts_rwmtx_rwunlock(&erts_driver_list_lock);
return -1;
}
erts_ddll_lock_driver(dh, prt->drv_ptr->name);
- erts_smp_rwmtx_rwunlock(&erts_driver_list_lock);
+ erts_rwmtx_rwunlock(&erts_driver_list_lock);
return 0;
}
@@ -8039,9 +7151,9 @@ int driver_lock_driver(ErlDrvPort ix)
static int maybe_lock_driver_list(void)
{
void *rec_lock;
- rec_lock = erts_smp_tsd_get(driver_list_lock_status_key);
+ rec_lock = erts_tsd_get(driver_list_lock_status_key);
if (rec_lock == 0) {
- erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
+ erts_rwmtx_rwlock(&erts_driver_list_lock);
return 1;
}
return 0;
@@ -8049,7 +7161,7 @@ static int maybe_lock_driver_list(void)
static void maybe_unlock_driver_list(int doit)
{
if (doit) {
- erts_smp_rwmtx_rwunlock(&erts_driver_list_lock);
+ erts_rwmtx_rwunlock(&erts_driver_list_lock);
}
}
/*
@@ -8072,7 +7184,7 @@ void *driver_dl_open(char * path)
{
void *ptr;
int res;
- int *last_error_p = erts_smp_tsd_get(driver_list_last_error_key);
+ int *last_error_p = erts_tsd_get(driver_list_last_error_key);
int locked = maybe_lock_driver_list();
if ((res = erts_sys_ddll_open(path, &ptr, NULL)) == 0) {
maybe_unlock_driver_list(locked);
@@ -8080,7 +7192,7 @@ void *driver_dl_open(char * path)
} else {
if (!last_error_p) {
last_error_p = erts_alloc(ERTS_ALC_T_DDLL_ERRCODES, sizeof(int));
- erts_smp_tsd_set(driver_list_last_error_key,last_error_p);
+ erts_tsd_set(driver_list_last_error_key,last_error_p);
}
*last_error_p = res;
maybe_unlock_driver_list(locked);
@@ -8092,7 +7204,7 @@ void *driver_dl_sym(void * handle, char *func_name)
{
void *ptr;
int res;
- int *last_error_p = erts_smp_tsd_get(driver_list_lock_status_key);
+ int *last_error_p = erts_tsd_get(driver_list_lock_status_key);
int locked = maybe_lock_driver_list();
if ((res = erts_sys_ddll_sym(handle, func_name, &ptr)) == 0) {
maybe_unlock_driver_list(locked);
@@ -8100,7 +7212,7 @@ void *driver_dl_sym(void * handle, char *func_name)
} else {
if (!last_error_p) {
last_error_p = erts_alloc(ERTS_ALC_T_DDLL_ERRCODES, sizeof(int));
- erts_smp_tsd_set(driver_list_lock_status_key,last_error_p);
+ erts_tsd_set(driver_list_lock_status_key,last_error_p);
}
*last_error_p = res;
maybe_unlock_driver_list(locked);
@@ -8120,7 +7232,7 @@ int driver_dl_close(void *handle)
char *driver_dl_error(void)
{
char *res;
- int *last_error_p = erts_smp_tsd_get(driver_list_lock_status_key);
+ int *last_error_p = erts_tsd_get(driver_list_lock_status_key);
int locked = maybe_lock_driver_list();
res = erts_ddll_error((last_error_p != NULL) ? (*last_error_p) : ERL_DE_ERROR_UNSPECIFIED);
maybe_unlock_driver_list(locked);
@@ -8158,20 +7270,8 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size)
sip->driver_minor_version = ERL_DRV_EXTENDED_MINOR_VERSION;
sip->erts_version = ERLANG_VERSION;
sip->otp_release = ERLANG_OTP_RELEASE;
- sip->thread_support =
-#ifdef USE_THREADS
- 1
-#else
- 0
-#endif
- ;
- sip->smp_support =
-#ifdef ERTS_SMP
- 1
-#else
- 0
-#endif
- ;
+ sip->thread_support = 1;
+ sip->smp_support = 1;
}
@@ -8197,11 +7297,7 @@ driver_system_info(ErlDrvSysInfo *sip, size_t si_size)
*/
if (si_size >= ERL_DRV_SYS_INFO_SIZE(dirty_scheduler_support)) {
sip->dirty_scheduler_support =
-#ifdef ERTS_DIRTY_SCHEDULERS
1
-#else
- 0
-#endif
;
}
@@ -8228,14 +7324,6 @@ no_output_callback(ErlDrvData drv_data, char *buf, ErlDrvSizeT len)
}
static void
-no_event_callback(ErlDrvData drv_data, ErlDrvEvent event, ErlDrvEventData event_data)
-{
- Port *prt = get_current_port();
- report_missing_drv_callback(prt, "Event", "event()");
- driver_event(ERTS_Port2ErlDrvPort(prt), event, NULL);
-}
-
-static void
no_ready_input_callback(ErlDrvData drv_data, ErlDrvEvent event)
{
Port *prt = get_current_port();
@@ -8269,38 +7357,31 @@ no_stop_select_callback(ErlDrvEvent event, void* private)
}
#define IS_DRIVER_VERSION_GE(DE,MAJOR,MINOR) \
- ((DE)->major_version >= (MAJOR) && (DE)->minor_version >= (MINOR))
+ ((DE)->major_version > (MAJOR) || \
+ ((DE)->major_version == (MAJOR) && (DE)->minor_version >= (MINOR)))
static int
init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
{
+ drv->name_atom = erts_atom_put((byte*)de->driver_name,
+ sys_strlen(de->driver_name),
+ ERTS_ATOM_ENC_LATIN1, 1);
drv->name = de->driver_name;
+
ASSERT(de->extended_marker == ERL_DRV_EXTENDED_MARKER);
ASSERT(de->major_version >= 2);
drv->version.major = de->major_version;
drv->version.minor = de->minor_version;
drv->flags = de->driver_flags;
drv->handle = handle;
-#ifdef ERTS_SMP
- if (drv->flags & ERL_DRV_FLAG_USE_PORT_LOCKING)
- drv->lock = NULL;
- else {
- drv->lock = erts_alloc(ERTS_ALC_T_DRIVER_LOCK,
- sizeof(erts_mtx_t));
- erts_mtx_init_x(drv->lock,
- "driver_lock",
-#if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT)
- erts_atom_put((byte *) drv->name,
- sys_strlen(drv->name),
- ERTS_ATOM_ENC_LATIN1,
- 1),
-#else
- NIL,
-#endif
- 1
- );
+ if (drv->flags & ERL_DRV_FLAG_USE_PORT_LOCKING) {
+ drv->lock = NULL;
+ } else {
+ drv->lock = erts_alloc(ERTS_ALC_T_DRIVER_LOCK, sizeof(erts_mtx_t));
+
+ erts_mtx_init(drv->lock, "driver_lock", drv->name_atom,
+ ERTS_LOCK_FLAGS_CATEGORY_IO);
}
-#endif
drv->entry = de;
drv->start = de->start;
@@ -8311,7 +7392,6 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
drv->outputv = de->outputv;
drv->control = de->control;
drv->call = de->call;
- drv->event = de->event ? de->event : no_event_callback;
drv->ready_input = de->ready_input ? de->ready_input : no_ready_input_callback;
drv->ready_output = de->ready_output ? de->ready_output : no_ready_output_callback;
drv->timeout = de->timeout ? de->timeout : no_timeout_callback;
@@ -8343,12 +7423,10 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
void
erts_destroy_driver(erts_driver_t *drv)
{
-#ifdef ERTS_SMP
if (drv->lock) {
- erts_smp_mtx_destroy(drv->lock);
+ erts_mtx_destroy(drv->lock);
erts_free(ERTS_ALC_T_DRIVER_LOCK, drv->lock);
}
-#endif
erts_free(ERTS_ALC_T_DRIVER, drv);
}
@@ -8359,21 +7437,22 @@ erts_destroy_driver(erts_driver_t *drv)
void add_driver_entry(ErlDrvEntry *drv){
void *rec_lock;
- rec_lock = erts_smp_tsd_get(driver_list_lock_status_key);
+ rec_lock = erts_tsd_get(driver_list_lock_status_key);
/*
* Ignore result of erts_add_driver_entry, the init is not
* allowed to fail when drivers are added by drivers.
*/
- erts_add_driver_entry(drv, NULL, rec_lock != NULL);
+ erts_add_driver_entry(drv, NULL, rec_lock != NULL, 0);
}
-int erts_add_driver_entry(ErlDrvEntry *de, DE_Handle *handle, int driver_list_locked)
+int erts_add_driver_entry(ErlDrvEntry *de, DE_Handle *handle,
+ int driver_list_locked, int taint)
{
erts_driver_t *dp = erts_alloc(ERTS_ALC_T_DRIVER, sizeof(erts_driver_t));
- int res;
+ int err = 0;
if (!driver_list_locked) {
- erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
+ erts_rwmtx_rwlock(&erts_driver_list_lock);
}
dp->next = driver_list;
@@ -8384,12 +7463,18 @@ int erts_add_driver_entry(ErlDrvEntry *de, DE_Handle *handle, int driver_list_lo
driver_list = dp;
if (!driver_list_locked) {
- erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1);
+ erts_tsd_set(driver_list_lock_status_key, (void *) 1);
}
- res = init_driver(dp, de, handle);
+ if (!err) {
+ err = init_driver(dp, de, handle);
- if (res != 0) {
+ if (taint) {
+ erts_add_taint(dp->name_atom);
+ }
+ }
+
+ if (err) {
/*
* Remove it all again...
*/
@@ -8401,10 +7486,10 @@ int erts_add_driver_entry(ErlDrvEntry *de, DE_Handle *handle, int driver_list_lo
}
if (!driver_list_locked) {
- erts_smp_tsd_set(driver_list_lock_status_key, NULL);
- erts_smp_rwmtx_rwunlock(&erts_driver_list_lock);
+ erts_tsd_set(driver_list_lock_status_key, NULL);
+ erts_rwmtx_rwunlock(&erts_driver_list_lock);
}
- return res;
+ return err;
}
/* Not allowed for dynamic drivers */
@@ -8413,9 +7498,9 @@ int remove_driver_entry(ErlDrvEntry *drv)
erts_driver_t *dp;
void *rec_lock;
- rec_lock = erts_smp_tsd_get(driver_list_lock_status_key);
+ rec_lock = erts_tsd_get(driver_list_lock_status_key);
if (rec_lock == NULL) {
- erts_smp_rwmtx_rwlock(&erts_driver_list_lock);
+ erts_rwmtx_rwlock(&erts_driver_list_lock);
}
dp = driver_list;
while (dp && dp->entry != drv)
@@ -8423,7 +7508,7 @@ int remove_driver_entry(ErlDrvEntry *drv)
if (dp) {
if (dp->handle) {
if (rec_lock == NULL) {
- erts_smp_rwmtx_rwunlock(&erts_driver_list_lock);
+ erts_rwmtx_rwunlock(&erts_driver_list_lock);
}
return -1;
}
@@ -8437,12 +7522,12 @@ int remove_driver_entry(ErlDrvEntry *drv)
}
erts_destroy_driver(dp);
if (rec_lock == NULL) {
- erts_smp_rwmtx_rwunlock(&erts_driver_list_lock);
+ erts_rwmtx_rwunlock(&erts_driver_list_lock);
}
return 1;
}
if (rec_lock == NULL) {
- erts_smp_rwmtx_rwunlock(&erts_driver_list_lock);
+ erts_rwmtx_rwunlock(&erts_driver_list_lock);
}
return 0;
}
@@ -8458,13 +7543,27 @@ int null_func(void)
int
erl_drv_putenv(const char *key, char *value)
{
- return erts_sys_putenv_raw((char*)key, value);
+ switch (erts_sys_explicit_8bit_putenv((char*)key, value)) {
+ case -1: /* Insufficient buffer space */
+ return 1;
+ case 1: /* Success */
+ return 0;
+ default: /* Not found */
+ return -1;
+ }
}
int
erl_drv_getenv(const char *key, char *value, size_t *value_size)
{
- return erts_sys_getenv_raw((char*)key, value, value_size);
+ switch (erts_sys_explicit_8bit_getenv((char*)key, value, value_size)) {
+ case -1: /* Insufficient buffer space */
+ return 1;
+ case 1: /* Success */
+ return 0;
+ default: /* Not found */
+ return -1;
+ }
}
/* get heart_port