aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-06-04 20:47:08 +0200
committerRickard Green <[email protected]>2012-12-03 21:18:04 +0100
commit3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1 (patch)
treef930df53411010192f32f351e922ea195f28d84f /erts/emulator/beam/io.c
parent6da93c20472f5d13b34a40ca53cba4fe6f352d24 (diff)
downloadotp-3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1.tar.gz
otp-3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1.tar.bz2
otp-3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1.zip
Atomic port state
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c343
1 files changed, 180 insertions, 163 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index f9e1b62f14..00ecf1fca2 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -90,35 +90,35 @@ static ERTS_INLINE ErlIOQueue*
drvport2ioq(ErlDrvPort drvport)
{
int ix = (int) drvport;
- Uint32 status;
+ erts_aint32_t state;
if (ix < 0 || erts_max_ports <= ix)
return NULL;
+ state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+
if (erts_get_scheduler_data()) {
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
ERTS_LC_ASSERT(!erts_port[ix].port_data_lock
|| erts_lc_mtx_is_locked(
&erts_port[ix].port_data_lock->mtx));
-
- status = erts_port[ix].status;
}
else {
- erts_smp_port_state_lock(&erts_port[ix]);
- status = erts_port[ix].status;
- erts_smp_port_state_unlock(&erts_port[ix]);
-
- ERTS_LC_ASSERT((status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ ERTS_LC_ASSERT((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
|| erts_port[ix].port_data_lock);
ERTS_LC_ASSERT(!erts_port[ix].port_data_lock
|| erts_lc_mtx_is_locked(
&erts_port[ix].port_data_lock->mtx));
-
}
- return ((status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
- ? NULL
- : &erts_port[ix].ioq);
+#endif
+
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ return NULL;
+ else
+ return &erts_port[ix].ioq;
}
static ERTS_INLINE int
@@ -216,7 +216,7 @@ kill_port(Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
erts_port_task_free_port(pp);
- ASSERT(pp->status & ERTS_PORT_SFLGS_DEAD);
+ ASSERT(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD);
}
#ifdef ERTS_SMP
@@ -242,27 +242,32 @@ get_free_port(void)
erts_smp_spin_lock(&get_free_port_lck);
num = last_port_num + 1;
- for (;; ++num) {
+ for (;; ++num) {
+ erts_aint32_t act;
+
port = &erts_port[num & erts_port_tab_index_mask];
- erts_smp_port_state_lock(port);
- if (port->status & ERTS_PORT_SFLG_FREE) {
- last_port_num = num;
- erts_smp_spin_unlock(&get_free_port_lck);
- break;
+ act = erts_smp_atomic32_read_nob(&port->state);
+
+ while (act & ERTS_PORT_SFLG_FREE) {
+ erts_aint32_t exp = act;
+ act = erts_smp_atomic32_cmpxchg_relb(&port->state,
+ ERTS_PORT_SFLG_INITIALIZING,
+ exp);
+ if (act == exp) {
+ last_port_num = num;
+ erts_smp_spin_unlock(&get_free_port_lck);
+ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0);
+ erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */
+ return num & port_num_mask;
+ }
}
- erts_smp_port_state_unlock(port);
if (--tries == 0) {
erts_smp_spin_unlock(&get_free_port_lck);
return -1;
}
}
- port->status = ERTS_PORT_SFLG_INITIALIZING;
- ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0);
- erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */
- erts_smp_port_state_unlock(port);
- return num & port_num_mask;
}
/*
@@ -284,13 +289,12 @@ erts_test_next_port(int set, Uint next)
Port* port = &erts_port[num & erts_port_tab_index_mask];
- erts_smp_port_state_lock(port);
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&port->state);
- if (port->status & ERTS_PORT_SFLG_FREE) {
+ if (state & ERTS_PORT_SFLG_FREE) {
last_port_num = num - 1;
res = num & port_num_mask;
}
- erts_smp_port_state_unlock(port);
}
erts_smp_spin_unlock(&get_free_port_lck);
return res;
@@ -326,38 +330,37 @@ void
port_cleanup(Port *prt)
{
#ifdef ERTS_SMP
- Uint32 port_specific;
erts_smp_mtx_t *mtx;
#endif
+#if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK)
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state);
+#endif
erts_driver_t *driver;
- erts_smp_port_state_lock(prt);
-
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
driver = prt->drv_ptr;
prt->drv_ptr = NULL;
ASSERT(driver);
- ASSERT(prt->status & ERTS_PORT_SFLG_FREE_SCHEDULED);
- ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0);
+ ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_FREE_SCHEDULED);
+ ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
+ ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE));
- ASSERT(prt->status & ERTS_PORT_SFLG_PORT_DEBUG);
- ASSERT(!(prt->status & ERTS_PORT_SFLG_FREE));
- prt->status = ERTS_PORT_SFLG_FREE;
+ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0);
#ifdef ERTS_SMP
-
- port_specific = (prt->status & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK);
-
mtx = prt->lock;
ASSERT(mtx);
prt->lock = NULL;
- erts_smp_port_state_unlock(prt);
erts_smp_mtx_unlock(mtx);
+#endif
- if (port_specific) {
+ erts_smp_atomic32_set_relb(&prt->state, ERTS_PORT_SFLG_FREE);
+
+#ifdef ERTS_SMP
+ if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) {
erts_smp_mtx_destroy(mtx);
erts_free(ERTS_ALC_T_PORT_LOCK, mtx);
}
@@ -422,13 +425,13 @@ static void stopq(Port* prt)
static void
setup_port(Port* prt, Eterm pid, erts_driver_t *driver,
- ErlDrvData drv_data, char *name, Uint32 xstatus)
+ ErlDrvData drv_data, char *name, erts_aint32_t xstate)
{
ErtsRunQueue *runq = erts_get_runq_current(NULL);
char *new_name, *old_name;
#ifdef DEBUG
/* Make sure the debug flags survives until port is freed */
- xstatus |= ERTS_PORT_SFLG_PORT_DEBUG;
+ xstate |= ERTS_PORT_SFLG_PORT_DEBUG;
#endif
ASSERT(runq);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -437,8 +440,6 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver,
new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1);
sys_strcpy(new_name, name);
erts_smp_runq_lock(runq);
- erts_smp_port_state_lock(prt);
- prt->status = ERTS_PORT_SFLG_CONNECTED | xstatus;
prt->snapshot = erts_smp_atomic32_read_nob(&erts_ports_snapshot);
old_name = prt->name;
prt->name = new_name;
@@ -447,7 +448,8 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver,
#endif
ASSERT(!prt->drv_ptr);
prt->drv_ptr = driver;
- erts_smp_port_state_unlock(prt);
+ erts_smp_atomic32_set_relb(&prt->state,
+ ERTS_PORT_SFLG_CONNECTED | xstate);
erts_smp_runq_unlock(runq);
#ifdef ERTS_SMP
ASSERT(!prt->xports);
@@ -492,7 +494,7 @@ erts_wake_process_later(Port *prt, Process *process)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (prt->status & ERTS_PORT_SFLGS_DEAD)
+ if (erts_smp_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD)
return;
for (p = &(prt->suspended); *p != NULL; p = &((*p)->next))
@@ -522,7 +524,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
int port_num;
int port_ix;
ErlDrvData drv_data = 0;
- Uint32 xstatus = 0;
+ erts_aint32_t xstate = 0;
Port *port;
int fpe_was_unmasked;
@@ -601,11 +603,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
*error_number_ptr = BADARG;
}
/* Need to mark the port as free again */
- erts_smp_port_state_lock(port);
- port->status = ERTS_PORT_SFLG_FREE;
ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 2);
erts_smp_atomic_set_nob(&port->refc, 0);
- erts_smp_port_state_unlock(port);
+ erts_smp_atomic32_set_relb(&port->state, ERTS_PORT_SFLG_FREE);
return -3;
}
@@ -623,7 +623,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
erts_smp_mtx_init_x(port->lock,
"port_lock",
port->id);
- xstatus |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
+ xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
}
#endif
@@ -637,7 +637,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
erts_smp_mtx_lock(port->lock);
#endif
- setup_port(port, pid, driver, drv_data, name, xstatus);
+ setup_port(port, pid, driver, drv_data, name, xstate);
if (IS_TRACED_FL(port, F_TRACE_PORTS)) {
trace_port_open(port,
@@ -737,11 +737,11 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
Process *rp;
int port_num;
Eterm port_id;
- Uint32 xstatus = 0;
+ erts_aint32_t xstate = 0;
ERTS_SMP_CHK_NO_PROC_LOCKS;
- creator_port = erts_drvport2port(creator_port_ix);
+ creator_port = erts_drvport2port(creator_port_ix, NULL);
if (!creator_port)
return (ErlDrvTermData) -1;
@@ -780,7 +780,7 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK,
sizeof(erts_smp_mtx_t));
erts_smp_mtx_init_locked_x(port->lock, "port_lock", port_id);
- xstatus |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
+ xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
}
#endif
@@ -793,7 +793,7 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port));
- setup_port(port, pid, driver, drv_data, name, xstatus);
+ setup_port(port, pid, driver, drv_data, name, xstate);
port->id = port_id;
erts_add_link(&(port->nlinks), LINK_PID, pid);
@@ -1349,7 +1349,7 @@ void init_io(void)
ERTS_TRACE_FLAGS(&erts_port[i]) = 0;
erts_port[i].drv_ptr = NULL;
- erts_port[i].status = ERTS_PORT_SFLG_FREE;
+ erts_smp_atomic32_init_nob(&erts_port[i].state, ERTS_PORT_SFLG_FREE);
erts_port[i].name = NULL;
erts_port[i].nlinks = NULL;
erts_port[i].monitors = NULL;
@@ -1593,7 +1593,7 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
* len -- length of data
*/
-static void deliver_read_message(Port* prt, Eterm to,
+static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
char *hbuf, ErlDrvSizeT hlen,
char *buf, ErlDrvSizeT len, int eol)
{
@@ -1611,10 +1611,11 @@ static void deliver_read_message(Port* prt, Eterm to,
ERTS_SMP_CHK_NO_PROC_LOCKS;
need = 3 + 3 + 2*hlen;
- if (prt->status & ERTS_PORT_SFLG_LINEBUF_IO) {
+
+ if (state & ERTS_PORT_SFLG_LINEBUF_IO) {
need += 3;
}
- if (prt->status & ERTS_PORT_SFLG_BINARY_IO && buf != NULL) {
+ if ((state & ERTS_PORT_SFLG_BINARY_IO) && buf != NULL) {
need += PROC_BIN_SIZE;
} else {
need += 2*len;
@@ -1630,7 +1631,7 @@ static void deliver_read_message(Port* prt, Eterm to,
hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
listp = NIL;
- if ((prt->status & ERTS_PORT_SFLG_BINARY_IO) == 0) {
+ if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) {
listp = buf_to_intlist(&hp, buf, len, listp);
} else if (buf != NULL) {
ProcBin* pb;
@@ -1661,7 +1662,7 @@ static void deliver_read_message(Port* prt, Eterm to,
listp = buf_to_intlist(&hp, hbuf, hlen, listp);
}
- if (prt->status & ERTS_PORT_SFLG_LINEBUF_IO){
+ if (state & ERTS_PORT_SFLG_LINEBUF_IO){
listp = TUPLE2(hp, (eol) ? am_eol : am_noeol, listp);
hp += 3;
}
@@ -1686,7 +1687,8 @@ static void deliver_read_message(Port* prt, Eterm to,
* Deliver all lines in a line buffer, repeats calls to
* deliver_read_message, and takes the same parameters.
*/
-static void deliver_linebuf_message(Port* prt, Eterm to,
+static void deliver_linebuf_message(Port* prt, erts_aint_t state,
+ Eterm to,
char* hbuf, ErlDrvSizeT hlen,
char *buf, ErlDrvSizeT len)
{
@@ -1695,7 +1697,7 @@ static void deliver_linebuf_message(Port* prt, Eterm to,
if(init_linebuf_context(&lc,&(prt->linebuf), buf, len) < 0)
return;
while((ret = read_linebuf(&lc)) > LINEBUF_EMPTY)
- deliver_read_message(prt, to, hbuf, hlen, LINEBUF_DATA(lc),
+ deliver_read_message(prt, state, to, hbuf, hlen, LINEBUF_DATA(lc),
LINEBUF_DATALEN(lc), (ret == LINEBUF_EOL));
}
@@ -1706,19 +1708,24 @@ static void deliver_linebuf_message(Port* prt, Eterm to,
* Parameters:
* prt - Pointer to a Port structure for this port.
*/
-static void flush_linebuf_messages(Port *prt)
+static void flush_linebuf_messages(Port *prt, erts_aint32_t state)
{
LineBufContext lc;
int ret;
ERTS_SMP_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));
- if(prt == NULL || !(prt->status & ERTS_PORT_SFLG_LINEBUF_IO))
+
+ if (!prt)
+ return;
+
+ if (!(state & ERTS_PORT_SFLG_LINEBUF_IO))
return;
if(init_linebuf_context(&lc,&(prt->linebuf), NULL, 0) < 0)
return;
while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY)
deliver_read_message(prt,
+ state,
prt->connected,
NULL,
0,
@@ -1747,6 +1754,7 @@ deliver_vec_message(Port* prt, /* Port */
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
+ erts_aint32_t state;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -1762,12 +1770,13 @@ deliver_vec_message(Port* prt, /* Port */
if (!rp)
return;
+ state = erts_smp_atomic32_read_nob(&prt->state);
/*
* Calculate the exact number of heap words needed.
*/
need = 3 + 3; /* Heap space for two tuples */
- if (prt->status & ERTS_PORT_SFLG_BINARY_IO) {
+ if (state & ERTS_PORT_SFLG_BINARY_IO) {
need += (2+PROC_BIN_SIZE)*vsize - 2 + hlen*2;
} else {
need += (hlen+csize)*2;
@@ -1778,7 +1787,7 @@ deliver_vec_message(Port* prt, /* Port */
listp = NIL;
iov += vsize;
- if ((prt->status & ERTS_PORT_SFLG_BINARY_IO) == 0) {
+ if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) {
Eterm* thp = hp;
while (vsize--) {
iov--;
@@ -1864,7 +1873,7 @@ static void deliver_bin_message(Port* prt, /* port */
/*
* Note.
*
- * The test for (p->status & ERTS_PORT_SFLGS_DEAD) == 0 is important since the
+ * The test for ERTS_PORT_SFLGS_DEAD is important since the
* driver's flush function might call driver_async, which when using no
* threads and being short circuited will notice that the io queue is empty
* (after calling the driver's async_ready) and recursively call
@@ -1899,7 +1908,8 @@ static void flush_port(Port *p)
ASSERT(!p->xports);
#endif
}
- if ((p->status & ERTS_PORT_SFLGS_DEAD) == 0 && is_port_ioq_empty(p)) {
+ if ((erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0
+ && is_port_ioq_empty(p)) {
terminate_port(p);
}
}
@@ -1911,7 +1921,7 @@ terminate_port(Port *prt)
Eterm send_closed_port_id;
Eterm connected_id = NIL /* Initialize to silence compiler */;
erts_driver_t *drv;
- int halt;
+ erts_aint32_t state;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -1919,10 +1929,10 @@ terminate_port(Port *prt)
ASSERT(!prt->nlinks);
ASSERT(!prt->monitors);
- /* prt->status may be altered by kill_port()below */
- halt = (prt->status & ERTS_PORT_SFLG_HALT) != 0;
- if (prt->status & ERTS_PORT_SFLG_SEND_CLOSED) {
- erts_port_status_band_set(prt, ~ERTS_PORT_SFLG_SEND_CLOSED);
+ /* state may be altered by kill_port() below */
+ state = erts_smp_atomic32_read_band_nob(&prt->state,
+ ~ERTS_PORT_SFLG_SEND_CLOSED);
+ if (state & ERTS_PORT_SFLG_SEND_CLOSED) {
send_closed_port_id = prt->id;
connected_id = prt->connected;
}
@@ -1978,7 +1988,8 @@ terminate_port(Port *prt)
* We don't want to send the closed message until after the
* port has been removed from the port table (in kill_port()).
*/
- if (halt && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
+ if ((state & ERTS_PORT_SFLG_HALT)
+ && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
erts_smp_port_unlock(prt); /* We will exit and never return */
erl_exit_flush_async(erts_halt_code, "");
}
@@ -2100,6 +2111,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
{
ErtsLink *lnk;
Eterm rreason;
+ erts_aint32_t state;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
@@ -2119,9 +2131,10 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
}
#endif
- if ((p->status & (ERTS_PORT_SFLGS_DEAD
- | ERTS_PORT_SFLG_EXITING
- | ERTS_PORT_SFLG_IMMORTAL))
+ state = erts_smp_atomic32_read_nob(&p->state);
+ if ((state & (ERTS_PORT_SFLGS_DEAD
+ | ERTS_PORT_SFLG_EXITING
+ | ERTS_PORT_SFLG_IMMORTAL))
|| ((reason == am_normal) &&
((from != p->connected) && (from != p->id)))) {
return;
@@ -2142,7 +2155,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
if (p->reg != NULL)
(void) erts_unregister_name(NULL, 0, p, p->reg->name);
- erts_port_status_bor_set(p, ERTS_PORT_SFLG_EXITING);
+ state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING);
{
SweepContext sc = {p->id, rreason};
@@ -2158,17 +2171,20 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
}
DRV_MONITOR_UNLOCK_PDL(p);
- if ((p->status & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) {
+ if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) {
erts_do_net_exits(p->dist_entry, rreason);
erts_deref_dist_entry(p->dist_entry);
- p->dist_entry = NULL;
- erts_port_status_band_set(p, ~ERTS_PORT_SFLG_DISTRIBUTION);
+ p->dist_entry = NULL;
+ erts_smp_atomic32_read_band_relb(&p->state,
+ ~ERTS_PORT_SFLG_DISTRIBUTION);
}
if ((reason != am_kill) && !is_port_ioq_empty(p)) {
- erts_port_status_bandor_set(p,
- ~ERTS_PORT_SFLG_EXITING, /* must turn it off */
- ERTS_PORT_SFLG_CLOSING);
+ /* must turn exiting flag off */
+ erts_smp_atomic32_read_bset_relb(&p->state,
+ (ERTS_PORT_SFLG_EXITING
+ | ERTS_PORT_SFLG_CLOSING),
+ ERTS_PORT_SFLG_CLOSING);
flush_port(p);
}
else {
@@ -2217,7 +2233,8 @@ void erts_port_command(Process *proc,
if ((pid = port->connected) == tp[1]) {
/* PID must be connected */
if (tp[2] == am_close) {
- erts_port_status_bor_set(port, ERTS_PORT_SFLG_SEND_CLOSED);
+ erts_smp_atomic32_read_bor_relb(&port->state,
+ ERTS_PORT_SFLG_SEND_CLOSED);
erts_do_exit_port(port, pid, am_normal);
#ifdef USE_VM_PROBES
@@ -2443,13 +2460,14 @@ void
print_port_info(int to, void *arg, int i)
{
Port* p = &erts_port[i];
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
- if (p->status & ERTS_PORT_SFLGS_DEAD)
+ if (state & ERTS_PORT_SFLGS_DEAD)
return;
erts_print(to, arg, "=port:%T\n", p->id);
erts_print(to, arg, "Slot: %d\n", i);
- if (p->status & ERTS_PORT_SFLG_CONNECTED) {
+ if (state & ERTS_PORT_SFLG_CONNECTED) {
erts_print(to, arg, "Connected: %T", p->connected);
erts_print(to, arg, "\n");
}
@@ -2497,8 +2515,8 @@ set_busy_port(ErlDrvPort port_num, int on)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
if (on) {
- erts_port_status_bor_set(&erts_port[port_num],
- ERTS_PORT_SFLG_PORT_BUSY);
+ erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state,
+ ERTS_PORT_SFLG_PORT_BUSY);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_busy)) {
erts_snprintf(port_str, sizeof(port_str),
@@ -2508,8 +2526,8 @@ set_busy_port(ErlDrvPort port_num, int on)
#endif
} else {
ErtsProcList* plp = erts_port[port_num].suspended;
- erts_port_status_band_set(&erts_port[port_num],
- ~ERTS_PORT_SFLG_PORT_BUSY);
+ erts_smp_atomic32_read_band_relb(&erts_port[port_num].state,
+ ~ERTS_PORT_SFLG_PORT_BUSY);
erts_port[port_num].suspended = NULL;
#ifdef USE_VM_PROBES
@@ -2576,19 +2594,27 @@ void set_port_control_flags(ErlDrvPort port_num, int flags)
erts_port[port_num].control_flags = flags;
}
-int get_port_flags(ErlDrvPort ix) {
- Port* prt = erts_drvport2port(ix);
+int get_port_flags(ErlDrvPort ix)
+{
+ int flags;
+ Port *prt;
+ erts_aint32_t state;
+
+ prt = erts_drvport2port(ix, &state);
+ if (!prt)
+ return 0;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (prt == NULL)
- return 0;
+ flags = 0;
+ if (state & ERTS_PORT_SFLG_BINARY_IO)
+ flags |= PORT_FLAG_BINARY;
+ if (state & ERTS_PORT_SFLG_LINEBUF_IO)
+ flags |= PORT_FLAG_LINE;
- return (prt->status & ERTS_PORT_SFLG_BINARY_IO ? PORT_FLAG_BINARY : 0)
- | (prt->status & ERTS_PORT_SFLG_LINEBUF_IO ? PORT_FLAG_LINE : 0);
+ return flags;
}
-
void erts_raw_port_command(Port* p, byte* buf, Uint len)
{
int fpe_was_unmasked;
@@ -2624,8 +2650,9 @@ int async_ready(Port *p, void* data)
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (p) {
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
- ASSERT(!(p->status & ERTS_PORT_SFLGS_DEAD));
+ ASSERT(!(state & ERTS_PORT_SFLGS_DEAD));
if (p->drv_ptr->ready_async != NULL) {
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(driver_ready_async)) {
@@ -2641,7 +2668,7 @@ int async_ready(Port *p, void* data)
ASSERT(!p->xports);
#endif
}
- if ((p->status & ERTS_PORT_SFLG_CLOSING) && is_port_ioq_empty(p)) {
+ if ((state & ERTS_PORT_SFLG_CLOSING) && is_port_ioq_empty(p)) {
terminate_port(p);
}
}
@@ -2736,7 +2763,7 @@ erts_get_port_names(Eterm id)
pnp_len = sizeof(ErtsPortNames) + len;
pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len);
}
- erts_smp_port_state_lock(prt);
+ erts_smp_port_minor_lock(prt);
if (id != prt->id) {
len = nlen = 0;
name = driver_name = NULL;
@@ -2764,7 +2791,7 @@ erts_get_port_names(Eterm id)
}
do_realloc = 0;
}
- erts_smp_port_state_unlock(prt);
+ erts_smp_port_minor_unlock(prt);
} while (do_realloc);
}
return pnp;
@@ -2803,7 +2830,7 @@ ErlDrvTermData driver_mk_term_nil(void)
void driver_report_exit(int ix, int status)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
Eterm* hp;
Eterm tuple;
Process *rp;
@@ -2852,14 +2879,13 @@ deliver_term_check_port(ErlDrvPort drvport)
res = -1; /* invalid */
else {
Port* prt = &erts_port[ix];
- erts_smp_port_state_lock(prt);
- if (!(prt->status & ERTS_PORT_SFLGS_INVALID_LOOKUP))
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state);
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
res = 1; /* ok */
- else if (prt->status & ERTS_PORT_SFLG_CLOSING)
+ else if (state & ERTS_PORT_SFLG_CLOSING)
res = 0; /* closing */
else
res = -1; /* invalid (dead) */
- erts_smp_port_state_unlock(prt);
}
return res;
}
@@ -3428,7 +3454,7 @@ driver_deliver_term(ErlDrvPort port,
int
driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -3454,19 +3480,20 @@ driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len
int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
ErlDrvBinary* bin, ErlDrvSizeT offs, ErlDrvSizeT len)
{
- Port* prt = erts_drvport2port(ix);
+ erts_aint32_t state;
+ Port* prt = erts_drvport2port(ix, &state);
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (prt == NULL)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (prt->status & ERTS_PORT_SFLG_CLOSING)
+ if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
prt->bytes_in += (hlen + len);
erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len));
- if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) {
+ if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
return erts_net_message(prt,
prt->dist_entry,
(byte*) hbuf, hlen,
@@ -3488,7 +3515,8 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
char* buf, ErlDrvSizeT len)
{
- Port* prt = erts_drvport2port(ix);
+ erts_aint32_t state;
+ Port* prt = erts_drvport2port(ix, &state);
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -3497,12 +3525,12 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (prt->status & ERTS_PORT_SFLG_CLOSING)
+ if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
prt->bytes_in += (hlen + len);
erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len));
- if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) {
+ if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
if (len == 0)
return erts_net_message(prt,
prt->dist_entry,
@@ -3514,10 +3542,10 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
(byte*) hbuf, hlen,
(byte*) buf, len);
}
- else if(prt->status & ERTS_PORT_SFLG_LINEBUF_IO)
- deliver_linebuf_message(prt, prt->connected, hbuf, hlen, buf, len);
+ else if (state & ERTS_PORT_SFLG_LINEBUF_IO)
+ deliver_linebuf_message(prt, state, prt->connected, hbuf, hlen, buf, len);
else
- deliver_read_message(prt, prt->connected, hbuf, hlen, buf, len, 0);
+ deliver_read_message(prt, state, prt->connected, hbuf, hlen, buf, len, 0);
return 0;
}
@@ -3538,6 +3566,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
SysIOVec* iov;
ErlDrvBinary** binv;
Port* prt;
+ erts_aint32_t state;
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -3550,13 +3579,13 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
if (hlen < 0)
hlen = 0;
- prt = erts_drvport2port(ix);
+ prt = erts_drvport2port(ix, &state);
if (prt == NULL)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (prt->status & ERTS_PORT_SFLG_CLOSING)
+ if (state & ERTS_PORT_SFLG_CLOSING)
return 0;
/* size > 0 ! */
@@ -3826,7 +3855,7 @@ ErlDrvPDL
driver_pdl_create(ErlDrvPort dp)
{
ErlDrvPDL pdl;
- Port *pp = erts_drvport2port(dp);
+ Port *pp = erts_drvport2port(dp, NULL);
if (!pp || pp->port_data_lock)
return NULL;
pdl = erts_alloc(ERTS_ALC_T_PORT_DATA_LOCK,
@@ -4283,7 +4312,7 @@ drv_cancel_timer(Port *prt)
int driver_set_timer(ErlDrvPort ix, unsigned long t)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -4310,7 +4339,7 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t)
int driver_cancel_timer(ErlDrvPort ix)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
if (prt == NULL)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -4322,7 +4351,7 @@ int driver_cancel_timer(ErlDrvPort ix)
int
driver_read_timer(ErlDrvPort ix, unsigned long* t)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -4400,8 +4429,10 @@ int driver_monitor_process(ErlDrvPort port,
{
Port *prt;
int ret;
- Uint32 status;
+ erts_aint32_t state;
+#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
+#endif
int ix = (int) port;
if (ix < 0 || erts_max_ports <= ix) {
return -1;
@@ -4410,15 +4441,9 @@ int driver_monitor_process(ErlDrvPort port,
DRV_MONITOR_LOCK_PDL(prt);
- if (sched) {
- status = erts_port[ix].status;
- } else {
- erts_smp_port_state_lock(prt);
- status = erts_port[ix].status;
- erts_smp_port_state_unlock(prt);
- }
+ state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
- if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
return -1;
}
@@ -4488,8 +4513,10 @@ int driver_demonitor_process(ErlDrvPort port,
{
Port *prt;
int ret;
- Uint32 status;
+ erts_aint32_t state;
+#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
+#endif
int ix = (int) port;
if (ix < 0 || erts_max_ports <= ix) {
return -1;
@@ -4498,15 +4525,9 @@ int driver_demonitor_process(ErlDrvPort port,
DRV_MONITOR_LOCK_PDL(prt);
- if (sched) {
- status = erts_port[ix].status;
- } else {
- erts_smp_port_state_lock(prt);
- status = erts_port[ix].status;
- erts_smp_port_state_unlock(prt);
- }
+ state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
- if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
return -1;
}
@@ -4558,8 +4579,10 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
{
Port *prt;
ErlDrvTermData ret;
- Uint32 status;
+ erts_aint32_t state;
+#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
+#endif
int ix = (int) port;
if (ix < 0 || erts_max_ports <= ix) {
return driver_term_nil;
@@ -4568,15 +4591,8 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
DRV_MONITOR_LOCK_PDL(prt);
- if (sched) {
- status = erts_port[ix].status;
- } else {
- erts_smp_port_state_lock(prt);
- status = erts_port[ix].status;
- erts_smp_port_state_unlock(prt);
- }
-
- if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
+ state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
return driver_term_nil;
}
@@ -4651,7 +4667,8 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
static int
driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
{
- Port* prt = erts_drvport2port(ix);
+ erts_aint32_t state;
+ Port* prt = erts_drvport2port(ix, &state);
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -4659,10 +4676,10 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
if (eof)
- flush_linebuf_messages(prt);
- if (prt->status & ERTS_PORT_SFLG_CLOSING) {
+ flush_linebuf_messages(prt, state);
+ if (state & ERTS_PORT_SFLG_CLOSING) {
terminate_port(prt);
- } else if (eof && (prt->status & ERTS_PORT_SFLG_SOFT_EOF)) {
+ } else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) {
deliver_result(prt->id, prt->connected, am_eof);
} else {
/* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */
@@ -4684,7 +4701,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof)
*/
int driver_exit(ErlDrvPort ix, int err)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
Process* rp;
ErtsLink *lnk, *rlnk = NULL;
@@ -4757,14 +4774,14 @@ ErlDrvTermData driver_mk_atom(char* string)
ErlDrvTermData driver_mk_port(ErlDrvPort ix)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
return (ErlDrvTermData) prt->id;
}
ErlDrvTermData driver_connected(ErlDrvPort ix)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (prt == NULL)
return NIL;
@@ -4774,7 +4791,7 @@ ErlDrvTermData driver_connected(ErlDrvPort ix)
ErlDrvTermData driver_caller(ErlDrvPort ix)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (prt == NULL)
return NIL;
@@ -4784,7 +4801,7 @@ ErlDrvTermData driver_caller(ErlDrvPort ix)
int driver_lock_driver(ErlDrvPort ix)
{
- Port* prt = erts_drvport2port(ix);
+ Port* prt = erts_drvport2port(ix, NULL);
DE_Handle* dh;
ERTS_SMP_CHK_NO_PROC_LOCKS;