diff options
author | Rickard Green <[email protected]> | 2012-06-04 20:47:08 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2012-12-03 21:18:04 +0100 |
commit | 3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1 (patch) | |
tree | f930df53411010192f32f351e922ea195f28d84f /erts/emulator/beam/io.c | |
parent | 6da93c20472f5d13b34a40ca53cba4fe6f352d24 (diff) | |
download | otp-3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1.tar.gz otp-3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1.tar.bz2 otp-3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1.zip |
Atomic port state
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 343 |
1 files changed, 180 insertions, 163 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index f9e1b62f14..00ecf1fca2 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -90,35 +90,35 @@ static ERTS_INLINE ErlIOQueue* drvport2ioq(ErlDrvPort drvport) { int ix = (int) drvport; - Uint32 status; + erts_aint32_t state; if (ix < 0 || erts_max_ports <= ix) return NULL; + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + +#ifdef ERTS_ENABLE_LOCK_CHECK + if (erts_get_scheduler_data()) { ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); ERTS_LC_ASSERT(!erts_port[ix].port_data_lock || erts_lc_mtx_is_locked( &erts_port[ix].port_data_lock->mtx)); - - status = erts_port[ix].status; } else { - erts_smp_port_state_lock(&erts_port[ix]); - status = erts_port[ix].status; - erts_smp_port_state_unlock(&erts_port[ix]); - - ERTS_LC_ASSERT((status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + ERTS_LC_ASSERT((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) || erts_port[ix].port_data_lock); ERTS_LC_ASSERT(!erts_port[ix].port_data_lock || erts_lc_mtx_is_locked( &erts_port[ix].port_data_lock->mtx)); - } - return ((status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) - ? NULL - : &erts_port[ix].ioq); +#endif + + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return NULL; + else + return &erts_port[ix].ioq; } static ERTS_INLINE int @@ -216,7 +216,7 @@ kill_port(Port *pp) { ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); erts_port_task_free_port(pp); - ASSERT(pp->status & ERTS_PORT_SFLGS_DEAD); + ASSERT(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD); } #ifdef ERTS_SMP @@ -242,27 +242,32 @@ get_free_port(void) erts_smp_spin_lock(&get_free_port_lck); num = last_port_num + 1; - for (;; ++num) { + for (;; ++num) { + erts_aint32_t act; + port = &erts_port[num & erts_port_tab_index_mask]; - erts_smp_port_state_lock(port); - if (port->status & ERTS_PORT_SFLG_FREE) { - last_port_num = num; - erts_smp_spin_unlock(&get_free_port_lck); - break; + act = erts_smp_atomic32_read_nob(&port->state); + + while (act & ERTS_PORT_SFLG_FREE) { + erts_aint32_t exp = act; + act = erts_smp_atomic32_cmpxchg_relb(&port->state, + ERTS_PORT_SFLG_INITIALIZING, + exp); + if (act == exp) { + last_port_num = num; + erts_smp_spin_unlock(&get_free_port_lck); + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0); + erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */ + return num & port_num_mask; + } } - erts_smp_port_state_unlock(port); if (--tries == 0) { erts_smp_spin_unlock(&get_free_port_lck); return -1; } } - port->status = ERTS_PORT_SFLG_INITIALIZING; - ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0); - erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */ - erts_smp_port_state_unlock(port); - return num & port_num_mask; } /* @@ -284,13 +289,12 @@ erts_test_next_port(int set, Uint next) Port* port = &erts_port[num & erts_port_tab_index_mask]; - erts_smp_port_state_lock(port); + erts_aint32_t state = erts_smp_atomic32_read_nob(&port->state); - if (port->status & ERTS_PORT_SFLG_FREE) { + if (state & ERTS_PORT_SFLG_FREE) { last_port_num = num - 1; res = num & port_num_mask; } - erts_smp_port_state_unlock(port); } erts_smp_spin_unlock(&get_free_port_lck); return res; @@ -326,38 +330,37 @@ void port_cleanup(Port *prt) { #ifdef ERTS_SMP - Uint32 port_specific; erts_smp_mtx_t *mtx; #endif +#if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK) + erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state); +#endif erts_driver_t *driver; - erts_smp_port_state_lock(prt); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); driver = prt->drv_ptr; prt->drv_ptr = NULL; ASSERT(driver); - ASSERT(prt->status & ERTS_PORT_SFLG_FREE_SCHEDULED); - ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0); + ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_FREE_SCHEDULED); + ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); + ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE)); - ASSERT(prt->status & ERTS_PORT_SFLG_PORT_DEBUG); - ASSERT(!(prt->status & ERTS_PORT_SFLG_FREE)); - prt->status = ERTS_PORT_SFLG_FREE; + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0); #ifdef ERTS_SMP - - port_specific = (prt->status & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK); - mtx = prt->lock; ASSERT(mtx); prt->lock = NULL; - erts_smp_port_state_unlock(prt); erts_smp_mtx_unlock(mtx); +#endif - if (port_specific) { + erts_smp_atomic32_set_relb(&prt->state, ERTS_PORT_SFLG_FREE); + +#ifdef ERTS_SMP + if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) { erts_smp_mtx_destroy(mtx); erts_free(ERTS_ALC_T_PORT_LOCK, mtx); } @@ -422,13 +425,13 @@ static void stopq(Port* prt) static void setup_port(Port* prt, Eterm pid, erts_driver_t *driver, - ErlDrvData drv_data, char *name, Uint32 xstatus) + ErlDrvData drv_data, char *name, erts_aint32_t xstate) { ErtsRunQueue *runq = erts_get_runq_current(NULL); char *new_name, *old_name; #ifdef DEBUG /* Make sure the debug flags survives until port is freed */ - xstatus |= ERTS_PORT_SFLG_PORT_DEBUG; + xstate |= ERTS_PORT_SFLG_PORT_DEBUG; #endif ASSERT(runq); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -437,8 +440,6 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver, new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1); sys_strcpy(new_name, name); erts_smp_runq_lock(runq); - erts_smp_port_state_lock(prt); - prt->status = ERTS_PORT_SFLG_CONNECTED | xstatus; prt->snapshot = erts_smp_atomic32_read_nob(&erts_ports_snapshot); old_name = prt->name; prt->name = new_name; @@ -447,7 +448,8 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver, #endif ASSERT(!prt->drv_ptr); prt->drv_ptr = driver; - erts_smp_port_state_unlock(prt); + erts_smp_atomic32_set_relb(&prt->state, + ERTS_PORT_SFLG_CONNECTED | xstate); erts_smp_runq_unlock(runq); #ifdef ERTS_SMP ASSERT(!prt->xports); @@ -492,7 +494,7 @@ erts_wake_process_later(Port *prt, Process *process) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt->status & ERTS_PORT_SFLGS_DEAD) + if (erts_smp_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) return; for (p = &(prt->suspended); *p != NULL; p = &((*p)->next)) @@ -522,7 +524,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ int port_num; int port_ix; ErlDrvData drv_data = 0; - Uint32 xstatus = 0; + erts_aint32_t xstate = 0; Port *port; int fpe_was_unmasked; @@ -601,11 +603,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ *error_number_ptr = BADARG; } /* Need to mark the port as free again */ - erts_smp_port_state_lock(port); - port->status = ERTS_PORT_SFLG_FREE; ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 2); erts_smp_atomic_set_nob(&port->refc, 0); - erts_smp_port_state_unlock(port); + erts_smp_atomic32_set_relb(&port->state, ERTS_PORT_SFLG_FREE); return -3; } @@ -623,7 +623,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_smp_mtx_init_x(port->lock, "port_lock", port->id); - xstatus |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; + xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } #endif @@ -637,7 +637,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_smp_mtx_lock(port->lock); #endif - setup_port(port, pid, driver, drv_data, name, xstatus); + setup_port(port, pid, driver, drv_data, name, xstate); if (IS_TRACED_FL(port, F_TRACE_PORTS)) { trace_port_open(port, @@ -737,11 +737,11 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ Process *rp; int port_num; Eterm port_id; - Uint32 xstatus = 0; + erts_aint32_t xstate = 0; ERTS_SMP_CHK_NO_PROC_LOCKS; - creator_port = erts_drvport2port(creator_port_ix); + creator_port = erts_drvport2port(creator_port_ix, NULL); if (!creator_port) return (ErlDrvTermData) -1; @@ -780,7 +780,7 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK, sizeof(erts_smp_mtx_t)); erts_smp_mtx_init_locked_x(port->lock, "port_lock", port_id); - xstatus |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; + xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } #endif @@ -793,7 +793,7 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port)); - setup_port(port, pid, driver, drv_data, name, xstatus); + setup_port(port, pid, driver, drv_data, name, xstate); port->id = port_id; erts_add_link(&(port->nlinks), LINK_PID, pid); @@ -1349,7 +1349,7 @@ void init_io(void) ERTS_TRACE_FLAGS(&erts_port[i]) = 0; erts_port[i].drv_ptr = NULL; - erts_port[i].status = ERTS_PORT_SFLG_FREE; + erts_smp_atomic32_init_nob(&erts_port[i].state, ERTS_PORT_SFLG_FREE); erts_port[i].name = NULL; erts_port[i].nlinks = NULL; erts_port[i].monitors = NULL; @@ -1593,7 +1593,7 @@ deliver_result(Eterm sender, Eterm pid, Eterm res) * len -- length of data */ -static void deliver_read_message(Port* prt, Eterm to, +static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, char *hbuf, ErlDrvSizeT hlen, char *buf, ErlDrvSizeT len, int eol) { @@ -1611,10 +1611,11 @@ static void deliver_read_message(Port* prt, Eterm to, ERTS_SMP_CHK_NO_PROC_LOCKS; need = 3 + 3 + 2*hlen; - if (prt->status & ERTS_PORT_SFLG_LINEBUF_IO) { + + if (state & ERTS_PORT_SFLG_LINEBUF_IO) { need += 3; } - if (prt->status & ERTS_PORT_SFLG_BINARY_IO && buf != NULL) { + if ((state & ERTS_PORT_SFLG_BINARY_IO) && buf != NULL) { need += PROC_BIN_SIZE; } else { need += 2*len; @@ -1630,7 +1631,7 @@ static void deliver_read_message(Port* prt, Eterm to, hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); listp = NIL; - if ((prt->status & ERTS_PORT_SFLG_BINARY_IO) == 0) { + if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) { listp = buf_to_intlist(&hp, buf, len, listp); } else if (buf != NULL) { ProcBin* pb; @@ -1661,7 +1662,7 @@ static void deliver_read_message(Port* prt, Eterm to, listp = buf_to_intlist(&hp, hbuf, hlen, listp); } - if (prt->status & ERTS_PORT_SFLG_LINEBUF_IO){ + if (state & ERTS_PORT_SFLG_LINEBUF_IO){ listp = TUPLE2(hp, (eol) ? am_eol : am_noeol, listp); hp += 3; } @@ -1686,7 +1687,8 @@ static void deliver_read_message(Port* prt, Eterm to, * Deliver all lines in a line buffer, repeats calls to * deliver_read_message, and takes the same parameters. */ -static void deliver_linebuf_message(Port* prt, Eterm to, +static void deliver_linebuf_message(Port* prt, erts_aint_t state, + Eterm to, char* hbuf, ErlDrvSizeT hlen, char *buf, ErlDrvSizeT len) { @@ -1695,7 +1697,7 @@ static void deliver_linebuf_message(Port* prt, Eterm to, if(init_linebuf_context(&lc,&(prt->linebuf), buf, len) < 0) return; while((ret = read_linebuf(&lc)) > LINEBUF_EMPTY) - deliver_read_message(prt, to, hbuf, hlen, LINEBUF_DATA(lc), + deliver_read_message(prt, state, to, hbuf, hlen, LINEBUF_DATA(lc), LINEBUF_DATALEN(lc), (ret == LINEBUF_EOL)); } @@ -1706,19 +1708,24 @@ static void deliver_linebuf_message(Port* prt, Eterm to, * Parameters: * prt - Pointer to a Port structure for this port. */ -static void flush_linebuf_messages(Port *prt) +static void flush_linebuf_messages(Port *prt, erts_aint32_t state) { LineBufContext lc; int ret; ERTS_SMP_LC_ASSERT(!prt || erts_lc_is_port_locked(prt)); - if(prt == NULL || !(prt->status & ERTS_PORT_SFLG_LINEBUF_IO)) + + if (!prt) + return; + + if (!(state & ERTS_PORT_SFLG_LINEBUF_IO)) return; if(init_linebuf_context(&lc,&(prt->linebuf), NULL, 0) < 0) return; while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY) deliver_read_message(prt, + state, prt->connected, NULL, 0, @@ -1747,6 +1754,7 @@ deliver_vec_message(Port* prt, /* Port */ ErlOffHeap *ohp; ErtsProcLocks rp_locks = 0; int scheduler = erts_get_scheduler_id() != 0; + erts_aint32_t state; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -1762,12 +1770,13 @@ deliver_vec_message(Port* prt, /* Port */ if (!rp) return; + state = erts_smp_atomic32_read_nob(&prt->state); /* * Calculate the exact number of heap words needed. */ need = 3 + 3; /* Heap space for two tuples */ - if (prt->status & ERTS_PORT_SFLG_BINARY_IO) { + if (state & ERTS_PORT_SFLG_BINARY_IO) { need += (2+PROC_BIN_SIZE)*vsize - 2 + hlen*2; } else { need += (hlen+csize)*2; @@ -1778,7 +1787,7 @@ deliver_vec_message(Port* prt, /* Port */ listp = NIL; iov += vsize; - if ((prt->status & ERTS_PORT_SFLG_BINARY_IO) == 0) { + if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) { Eterm* thp = hp; while (vsize--) { iov--; @@ -1864,7 +1873,7 @@ static void deliver_bin_message(Port* prt, /* port */ /* * Note. * - * The test for (p->status & ERTS_PORT_SFLGS_DEAD) == 0 is important since the + * The test for ERTS_PORT_SFLGS_DEAD is important since the * driver's flush function might call driver_async, which when using no * threads and being short circuited will notice that the io queue is empty * (after calling the driver's async_ready) and recursively call @@ -1899,7 +1908,8 @@ static void flush_port(Port *p) ASSERT(!p->xports); #endif } - if ((p->status & ERTS_PORT_SFLGS_DEAD) == 0 && is_port_ioq_empty(p)) { + if ((erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0 + && is_port_ioq_empty(p)) { terminate_port(p); } } @@ -1911,7 +1921,7 @@ terminate_port(Port *prt) Eterm send_closed_port_id; Eterm connected_id = NIL /* Initialize to silence compiler */; erts_driver_t *drv; - int halt; + erts_aint32_t state; ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -1919,10 +1929,10 @@ terminate_port(Port *prt) ASSERT(!prt->nlinks); ASSERT(!prt->monitors); - /* prt->status may be altered by kill_port()below */ - halt = (prt->status & ERTS_PORT_SFLG_HALT) != 0; - if (prt->status & ERTS_PORT_SFLG_SEND_CLOSED) { - erts_port_status_band_set(prt, ~ERTS_PORT_SFLG_SEND_CLOSED); + /* state may be altered by kill_port() below */ + state = erts_smp_atomic32_read_band_nob(&prt->state, + ~ERTS_PORT_SFLG_SEND_CLOSED); + if (state & ERTS_PORT_SFLG_SEND_CLOSED) { send_closed_port_id = prt->id; connected_id = prt->connected; } @@ -1978,7 +1988,8 @@ terminate_port(Port *prt) * We don't want to send the closed message until after the * port has been removed from the port table (in kill_port()). */ - if (halt && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) { + if ((state & ERTS_PORT_SFLG_HALT) + && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) { erts_smp_port_unlock(prt); /* We will exit and never return */ erl_exit_flush_async(erts_halt_code, ""); } @@ -2100,6 +2111,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) { ErtsLink *lnk; Eterm rreason; + erts_aint32_t state; ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); @@ -2119,9 +2131,10 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) } #endif - if ((p->status & (ERTS_PORT_SFLGS_DEAD - | ERTS_PORT_SFLG_EXITING - | ERTS_PORT_SFLG_IMMORTAL)) + state = erts_smp_atomic32_read_nob(&p->state); + if ((state & (ERTS_PORT_SFLGS_DEAD + | ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_IMMORTAL)) || ((reason == am_normal) && ((from != p->connected) && (from != p->id)))) { return; @@ -2142,7 +2155,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) if (p->reg != NULL) (void) erts_unregister_name(NULL, 0, p, p->reg->name); - erts_port_status_bor_set(p, ERTS_PORT_SFLG_EXITING); + state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); { SweepContext sc = {p->id, rreason}; @@ -2158,17 +2171,20 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) } DRV_MONITOR_UNLOCK_PDL(p); - if ((p->status & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) { + if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) { erts_do_net_exits(p->dist_entry, rreason); erts_deref_dist_entry(p->dist_entry); - p->dist_entry = NULL; - erts_port_status_band_set(p, ~ERTS_PORT_SFLG_DISTRIBUTION); + p->dist_entry = NULL; + erts_smp_atomic32_read_band_relb(&p->state, + ~ERTS_PORT_SFLG_DISTRIBUTION); } if ((reason != am_kill) && !is_port_ioq_empty(p)) { - erts_port_status_bandor_set(p, - ~ERTS_PORT_SFLG_EXITING, /* must turn it off */ - ERTS_PORT_SFLG_CLOSING); + /* must turn exiting flag off */ + erts_smp_atomic32_read_bset_relb(&p->state, + (ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_CLOSING), + ERTS_PORT_SFLG_CLOSING); flush_port(p); } else { @@ -2217,7 +2233,8 @@ void erts_port_command(Process *proc, if ((pid = port->connected) == tp[1]) { /* PID must be connected */ if (tp[2] == am_close) { - erts_port_status_bor_set(port, ERTS_PORT_SFLG_SEND_CLOSED); + erts_smp_atomic32_read_bor_relb(&port->state, + ERTS_PORT_SFLG_SEND_CLOSED); erts_do_exit_port(port, pid, am_normal); #ifdef USE_VM_PROBES @@ -2443,13 +2460,14 @@ void print_port_info(int to, void *arg, int i) { Port* p = &erts_port[i]; + erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); - if (p->status & ERTS_PORT_SFLGS_DEAD) + if (state & ERTS_PORT_SFLGS_DEAD) return; erts_print(to, arg, "=port:%T\n", p->id); erts_print(to, arg, "Slot: %d\n", i); - if (p->status & ERTS_PORT_SFLG_CONNECTED) { + if (state & ERTS_PORT_SFLG_CONNECTED) { erts_print(to, arg, "Connected: %T", p->connected); erts_print(to, arg, "\n"); } @@ -2497,8 +2515,8 @@ set_busy_port(ErlDrvPort port_num, int on) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num])); if (on) { - erts_port_status_bor_set(&erts_port[port_num], - ERTS_PORT_SFLG_PORT_BUSY); + erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state, + ERTS_PORT_SFLG_PORT_BUSY); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { erts_snprintf(port_str, sizeof(port_str), @@ -2508,8 +2526,8 @@ set_busy_port(ErlDrvPort port_num, int on) #endif } else { ErtsProcList* plp = erts_port[port_num].suspended; - erts_port_status_band_set(&erts_port[port_num], - ~ERTS_PORT_SFLG_PORT_BUSY); + erts_smp_atomic32_read_band_relb(&erts_port[port_num].state, + ~ERTS_PORT_SFLG_PORT_BUSY); erts_port[port_num].suspended = NULL; #ifdef USE_VM_PROBES @@ -2576,19 +2594,27 @@ void set_port_control_flags(ErlDrvPort port_num, int flags) erts_port[port_num].control_flags = flags; } -int get_port_flags(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); +int get_port_flags(ErlDrvPort ix) +{ + int flags; + Port *prt; + erts_aint32_t state; + + prt = erts_drvport2port(ix, &state); + if (!prt) + return 0; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt == NULL) - return 0; + flags = 0; + if (state & ERTS_PORT_SFLG_BINARY_IO) + flags |= PORT_FLAG_BINARY; + if (state & ERTS_PORT_SFLG_LINEBUF_IO) + flags |= PORT_FLAG_LINE; - return (prt->status & ERTS_PORT_SFLG_BINARY_IO ? PORT_FLAG_BINARY : 0) - | (prt->status & ERTS_PORT_SFLG_LINEBUF_IO ? PORT_FLAG_LINE : 0); + return flags; } - void erts_raw_port_command(Port* p, byte* buf, Uint len) { int fpe_was_unmasked; @@ -2624,8 +2650,9 @@ int async_ready(Port *p, void* data) ERTS_SMP_CHK_NO_PROC_LOCKS; if (p) { + erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); - ASSERT(!(p->status & ERTS_PORT_SFLGS_DEAD)); + ASSERT(!(state & ERTS_PORT_SFLGS_DEAD)); if (p->drv_ptr->ready_async != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_ready_async)) { @@ -2641,7 +2668,7 @@ int async_ready(Port *p, void* data) ASSERT(!p->xports); #endif } - if ((p->status & ERTS_PORT_SFLG_CLOSING) && is_port_ioq_empty(p)) { + if ((state & ERTS_PORT_SFLG_CLOSING) && is_port_ioq_empty(p)) { terminate_port(p); } } @@ -2736,7 +2763,7 @@ erts_get_port_names(Eterm id) pnp_len = sizeof(ErtsPortNames) + len; pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len); } - erts_smp_port_state_lock(prt); + erts_smp_port_minor_lock(prt); if (id != prt->id) { len = nlen = 0; name = driver_name = NULL; @@ -2764,7 +2791,7 @@ erts_get_port_names(Eterm id) } do_realloc = 0; } - erts_smp_port_state_unlock(prt); + erts_smp_port_minor_unlock(prt); } while (do_realloc); } return pnp; @@ -2803,7 +2830,7 @@ ErlDrvTermData driver_mk_term_nil(void) void driver_report_exit(int ix, int status) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); Eterm* hp; Eterm tuple; Process *rp; @@ -2852,14 +2879,13 @@ deliver_term_check_port(ErlDrvPort drvport) res = -1; /* invalid */ else { Port* prt = &erts_port[ix]; - erts_smp_port_state_lock(prt); - if (!(prt->status & ERTS_PORT_SFLGS_INVALID_LOOKUP)) + erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state); + if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) res = 1; /* ok */ - else if (prt->status & ERTS_PORT_SFLG_CLOSING) + else if (state & ERTS_PORT_SFLG_CLOSING) res = 0; /* closing */ else res = -1; /* invalid (dead) */ - erts_smp_port_state_unlock(prt); } return res; } @@ -3428,7 +3454,7 @@ driver_deliver_term(ErlDrvPort port, int driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -3454,19 +3480,20 @@ driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, ErlDrvBinary* bin, ErlDrvSizeT offs, ErlDrvSizeT len) { - Port* prt = erts_drvport2port(ix); + erts_aint32_t state; + Port* prt = erts_drvport2port(ix, &state); ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt->status & ERTS_PORT_SFLG_CLOSING) + if (state & ERTS_PORT_SFLG_CLOSING) return 0; prt->bytes_in += (hlen + len); erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len)); - if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) { + if (state & ERTS_PORT_SFLG_DISTRIBUTION) { return erts_net_message(prt, prt->dist_entry, (byte*) hbuf, hlen, @@ -3488,7 +3515,8 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, char* buf, ErlDrvSizeT len) { - Port* prt = erts_drvport2port(ix); + erts_aint32_t state; + Port* prt = erts_drvport2port(ix, &state); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -3497,12 +3525,12 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt->status & ERTS_PORT_SFLG_CLOSING) + if (state & ERTS_PORT_SFLG_CLOSING) return 0; prt->bytes_in += (hlen + len); erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len)); - if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) { + if (state & ERTS_PORT_SFLG_DISTRIBUTION) { if (len == 0) return erts_net_message(prt, prt->dist_entry, @@ -3514,10 +3542,10 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, (byte*) hbuf, hlen, (byte*) buf, len); } - else if(prt->status & ERTS_PORT_SFLG_LINEBUF_IO) - deliver_linebuf_message(prt, prt->connected, hbuf, hlen, buf, len); + else if (state & ERTS_PORT_SFLG_LINEBUF_IO) + deliver_linebuf_message(prt, state, prt->connected, hbuf, hlen, buf, len); else - deliver_read_message(prt, prt->connected, hbuf, hlen, buf, len, 0); + deliver_read_message(prt, state, prt->connected, hbuf, hlen, buf, len, 0); return 0; } @@ -3538,6 +3566,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, SysIOVec* iov; ErlDrvBinary** binv; Port* prt; + erts_aint32_t state; ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -3550,13 +3579,13 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, if (hlen < 0) hlen = 0; - prt = erts_drvport2port(ix); + prt = erts_drvport2port(ix, &state); if (prt == NULL) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt->status & ERTS_PORT_SFLG_CLOSING) + if (state & ERTS_PORT_SFLG_CLOSING) return 0; /* size > 0 ! */ @@ -3826,7 +3855,7 @@ ErlDrvPDL driver_pdl_create(ErlDrvPort dp) { ErlDrvPDL pdl; - Port *pp = erts_drvport2port(dp); + Port *pp = erts_drvport2port(dp, NULL); if (!pp || pp->port_data_lock) return NULL; pdl = erts_alloc(ERTS_ALC_T_PORT_DATA_LOCK, @@ -4283,7 +4312,7 @@ drv_cancel_timer(Port *prt) int driver_set_timer(ErlDrvPort ix, unsigned long t) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -4310,7 +4339,7 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t) int driver_cancel_timer(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); if (prt == NULL) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -4322,7 +4351,7 @@ int driver_cancel_timer(ErlDrvPort ix) int driver_read_timer(ErlDrvPort ix, unsigned long* t) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -4400,8 +4429,10 @@ int driver_monitor_process(ErlDrvPort port, { Port *prt; int ret; - Uint32 status; + erts_aint32_t state; +#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); +#endif int ix = (int) port; if (ix < 0 || erts_max_ports <= ix) { return -1; @@ -4410,15 +4441,9 @@ int driver_monitor_process(ErlDrvPort port, DRV_MONITOR_LOCK_PDL(prt); - if (sched) { - status = erts_port[ix].status; - } else { - erts_smp_port_state_lock(prt); - status = erts_port[ix].status; - erts_smp_port_state_unlock(prt); - } + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); - if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); return -1; } @@ -4488,8 +4513,10 @@ int driver_demonitor_process(ErlDrvPort port, { Port *prt; int ret; - Uint32 status; + erts_aint32_t state; +#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); +#endif int ix = (int) port; if (ix < 0 || erts_max_ports <= ix) { return -1; @@ -4498,15 +4525,9 @@ int driver_demonitor_process(ErlDrvPort port, DRV_MONITOR_LOCK_PDL(prt); - if (sched) { - status = erts_port[ix].status; - } else { - erts_smp_port_state_lock(prt); - status = erts_port[ix].status; - erts_smp_port_state_unlock(prt); - } + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); - if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); return -1; } @@ -4558,8 +4579,10 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, { Port *prt; ErlDrvTermData ret; - Uint32 status; + erts_aint32_t state; +#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); +#endif int ix = (int) port; if (ix < 0 || erts_max_ports <= ix) { return driver_term_nil; @@ -4568,15 +4591,8 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, DRV_MONITOR_LOCK_PDL(prt); - if (sched) { - status = erts_port[ix].status; - } else { - erts_smp_port_state_lock(prt); - status = erts_port[ix].status; - erts_smp_port_state_unlock(prt); - } - - if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); return driver_term_nil; } @@ -4651,7 +4667,8 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof) { - Port* prt = erts_drvport2port(ix); + erts_aint32_t state; + Port* prt = erts_drvport2port(ix, &state); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -4659,10 +4676,10 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); if (eof) - flush_linebuf_messages(prt); - if (prt->status & ERTS_PORT_SFLG_CLOSING) { + flush_linebuf_messages(prt, state); + if (state & ERTS_PORT_SFLG_CLOSING) { terminate_port(prt); - } else if (eof && (prt->status & ERTS_PORT_SFLG_SOFT_EOF)) { + } else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) { deliver_result(prt->id, prt->connected, am_eof); } else { /* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */ @@ -4684,7 +4701,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) */ int driver_exit(ErlDrvPort ix, int err) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); Process* rp; ErtsLink *lnk, *rlnk = NULL; @@ -4757,14 +4774,14 @@ ErlDrvTermData driver_mk_atom(char* string) ErlDrvTermData driver_mk_port(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); return (ErlDrvTermData) prt->id; } ErlDrvTermData driver_connected(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return NIL; @@ -4774,7 +4791,7 @@ ErlDrvTermData driver_connected(ErlDrvPort ix) ErlDrvTermData driver_caller(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return NIL; @@ -4784,7 +4801,7 @@ ErlDrvTermData driver_caller(ErlDrvPort ix) int driver_lock_driver(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); DE_Handle* dh; ERTS_SMP_CHK_NO_PROC_LOCKS; |