From 50cb7c24f061fd3d7df5970d8202f47c470a4047 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 15 Aug 2012 01:28:55 +0200 Subject: Use ptab functionality also for ports --- erts/emulator/beam/atom.names | 1 + erts/emulator/beam/beam_emu.c | 5 +- erts/emulator/beam/bif.c | 100 +-- erts/emulator/beam/break.c | 9 +- erts/emulator/beam/dist.c | 25 +- erts/emulator/beam/dist.h | 2 +- erts/emulator/beam/erl_alloc.c | 5 +- erts/emulator/beam/erl_alloc.h | 2 + erts/emulator/beam/erl_alloc.types | 6 +- erts/emulator/beam/erl_async.c | 9 + erts/emulator/beam/erl_bif_ddll.c | 45 +- erts/emulator/beam/erl_bif_info.c | 30 +- erts/emulator/beam/erl_bif_port.c | 78 ++- erts/emulator/beam/erl_bif_trace.c | 59 +- erts/emulator/beam/erl_db_util.c | 11 +- erts/emulator/beam/erl_driver.h | 2 +- erts/emulator/beam/erl_lock_check.c | 17 +- erts/emulator/beam/erl_lock_check.h | 1 + erts/emulator/beam/erl_monitors.c | 7 +- erts/emulator/beam/erl_node_container_utils.h | 11 +- erts/emulator/beam/erl_node_tables.c | 36 +- erts/emulator/beam/erl_node_tables.h | 3 +- erts/emulator/beam/erl_port.h | 421 +++++++++---- erts/emulator/beam/erl_port_task.c | 100 ++- erts/emulator/beam/erl_process.c | 61 +- erts/emulator/beam/erl_process.h | 12 +- erts/emulator/beam/erl_ptab.c | 20 +- erts/emulator/beam/erl_ptab.h | 4 - erts/emulator/beam/erl_sys_driver.h | 1 - erts/emulator/beam/erl_trace.c | 8 +- erts/emulator/beam/global.h | 13 +- erts/emulator/beam/io.c | 876 +++++++++++--------------- erts/emulator/beam/register.c | 24 +- erts/emulator/beam/register.h | 19 +- erts/emulator/hipe/hipe_bif_list.m4 | 1 + erts/emulator/sys/common/erl_check_io.c | 15 +- erts/emulator/sys/unix/sys.c | 41 +- erts/emulator/sys/vxworks/sys.c | 22 +- erts/emulator/sys/win32/sys.c | 143 ++--- 39 files changed, 1104 insertions(+), 1141 deletions(-) diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 201719507f..d6d8633f6f 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -419,6 +419,7 @@ atom pid atom port atom ports atom port_count +atom port_limit atom print atom priority atom private diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c index 3fb2fbd01c..d8ccbacc26 100644 --- a/erts/emulator/beam/beam_emu.c +++ b/erts/emulator/beam/beam_emu.c @@ -1660,6 +1660,7 @@ void process_main(void) reg[0] = r(0); result = erl_send(c_p, r(0), x(1)); PreFetch(0, next); + ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); if (c_p->mbuf || MSO(c_p).overhead >= BIN_VHEAP_SZ(c_p)) { @@ -2636,6 +2637,7 @@ void process_main(void) reg[0] = r(0); result = (*bf)(c_p, reg, I); ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result)); + ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); ERTS_HOLE_CHECK(c_p); ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); @@ -3370,7 +3372,6 @@ void process_main(void) PROCESS_MAIN_CHK_LOCKS(c_p); bif_nif_arity = I[-1]; ERTS_SMP_UNREQ_PROC_MAIN_LOCK(c_p); - ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); ASSERT(!ERTS_PROC_IS_EXITING(c_p)); { @@ -3415,7 +3416,6 @@ void process_main(void) bif_nif_arity = I[-1]; ASSERT(bif_nif_arity <= 3); ERTS_SMP_UNREQ_PROC_MAIN_LOCK(c_p); - ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); reg[0] = r(0); { Eterm (*bf)(Process*, Eterm*, BeamInstr*) = vbf; @@ -4646,7 +4646,6 @@ void process_main(void) ERTS_SMP_UNREQ_PROC_MAIN_LOCK(c_p); flags = erts_call_trace(c_p, ep->code, ep->match_prog_set, reg, 0, &ERTS_TRACER_PROC(c_p)); - ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); ASSERT(!ERTS_PROC_IS_EXITING(c_p)); diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 84513c1b0e..5ea441ad81 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -37,6 +37,8 @@ #include "erl_db_util.h" #include "register.h" #include "erl_thr_progress.h" +#define ERTS_PTAB_WANT_BIF_IMPL__ +#include "erl_ptab.h" static Export* flush_monitor_message_trap = NULL; static Export* set_cpu_topology_trap = NULL; @@ -155,7 +157,10 @@ BIF_RETTYPE link_1(BIF_ALIST_1) } if (is_internal_port(BIF_ARG_1)) { - Port *pt = erts_id2port(BIF_ARG_1, BIF_P, ERTS_PROC_LOCK_MAIN); + Port *pt = erts_id2port_sflgs(BIF_ARG_1, + BIF_P, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); if (!pt) { goto res_no_proc; } @@ -167,7 +172,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) /* else: already linked */ erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); - erts_smp_port_unlock(pt); + erts_port_release(pt); BIF_RET(am_true); } else if (is_external_port(BIF_ARG_1) @@ -948,7 +953,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) #ifdef ERTS_SMP if (ERTS_PROC_PENDING_EXIT(BIF_P)) { if (pt) - erts_smp_port_unlock(pt); + erts_port_release(pt); goto handle_pending_exit; } #endif @@ -959,7 +964,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) if (pt) { rl = erts_remove_link(&ERTS_P_LINKS(pt), BIF_P->common.id); - erts_smp_port_unlock(pt); + erts_port_release(pt); if (rl) erts_destroy_link(rl); } @@ -1341,7 +1346,7 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) if (is_internal_port(BIF_ARG_1)) { Port *prt; erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - prt = erts_id2port(BIF_ARG_1, NULL, 0); + prt = erts_id2port(BIF_ARG_1); if (prt) { erts_do_exit_port(prt, BIF_P->common.id, BIF_ARG_2); erts_port_release(prt); @@ -1894,7 +1899,10 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { profile_runnable_proc(p, am_inactive); } - pt = erts_id2port(to, p, ERTS_PROC_LOCK_MAIN); + pt = erts_id2port_sflgs(to, + p, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); port_common: ERTS_SMP_LC_ASSERT(!pt || erts_lc_is_port_locked(pt)); @@ -1908,7 +1916,7 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { profile_runnable_port(pt, am_active); } - state = erts_smp_atomic32_read_nob(&pt->state); + state = erts_atomic32_read_nob(&pt->state); /* XXX let port_command handle the busy stuff !!! */ if (state & ERTS_PORT_SFLG_PORT_BUSY) { if (suspend) { @@ -3510,75 +3518,25 @@ BIF_RETTYPE garbage_collect_message_area_0(BIF_ALIST_0) #endif } + /**********************************************************************/ -/* Return a list of active ports */ +/* + * The erlang:processes/0 BIF. + */ -BIF_RETTYPE ports_0(BIF_ALIST_0) +BIF_RETTYPE processes_0(BIF_ALIST_0) { - Eterm res = NIL; - Eterm* port_buf = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*erts_max_ports); - Eterm* pp = port_buf; - Eterm* dead_ports; - int alive, dead; - Uint32 next_ss; - int i; - - /* To get a consistent snapshot... - * We add alive ports from start of the buffer - * while dying ports are added from the other end by the killing threads. - */ - - erts_smp_mtx_lock(&ports_snapshot_mtx); /* One snapshot at a time */ - - erts_smp_atomic_set_nob(&erts_dead_ports_ptr, - (erts_aint_t) (port_buf + erts_max_ports)); - - next_ss = erts_smp_atomic32_inc_read_relb(&erts_ports_snapshot); - - for (i = erts_max_ports-1; i >= 0; i--) { - Port* prt = &erts_port[i]; - erts_aint32_t state = erts_smp_atomic32_read_acqb(&prt->state); - if (!(state & ERTS_PORT_SFLGS_DEAD)) { - erts_smp_port_minor_lock(prt); - state = erts_smp_atomic32_read_nob(&prt->state); - if (!(state & ERTS_PORT_SFLGS_DEAD) && prt->snapshot != next_ss) { - ASSERT(prt->snapshot == next_ss - 1); - *pp++ = prt->common.id; - prt->snapshot = next_ss; /* Consumed by this snapshot */ - } - erts_smp_port_minor_unlock(prt); - } - } - - dead_ports = (Eterm*)erts_smp_atomic_xchg_nob(&erts_dead_ports_ptr, - (erts_aint_t) NULL); - erts_smp_mtx_unlock(&ports_snapshot_mtx); - - ASSERT(pp <= dead_ports); - - alive = pp - port_buf; - dead = port_buf + erts_max_ports - dead_ports; - - ASSERT((alive+dead) <= erts_max_ports); - - if (alive+dead > 0) { - erts_aint_t i; - Eterm *hp = HAlloc(BIF_P, (alive+dead)*2); - - for (i = 0; i < alive; i++) { - res = CONS(hp, port_buf[i], res); - hp += 2; - } - for (i = 0; i < dead; i++) { - res = CONS(hp, dead_ports[i], res); - hp += 2; - } - } + return erts_ptab_list(BIF_P, &erts_proc); +} - erts_free(ERTS_ALC_T_TMP, port_buf); +/**********************************************************************/ +/* + * The erlang:ports/0 BIF. + */ - BIF_RET(res); +BIF_RETTYPE ports_0(BIF_ALIST_0) +{ + return erts_ptab_list(BIF_P, &erts_port); } /**********************************************************************/ diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index fba734623a..ad05316b39 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -61,9 +61,12 @@ extern char* erts_system_version[]; static void port_info(int to, void *to_arg) { - int i; - for (i = 0; i < erts_max_ports; i++) - print_port_info(to, to_arg, i); + int i, max = erts_ptab_max(&erts_port); + for (i = 0; i < max; i++) { + Port *p = erts_pix2port(i); + if (p) + print_port_info(p, to, to_arg); + } } void diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 7aef26aa21..13715034cb 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -144,7 +144,7 @@ create_cache(DistEntry *dep) ERTS_SMP_LC_ASSERT( is_internal_port(dep->cid) - && erts_lc_is_port_locked(&erts_port[internal_port_index(dep->cid)])); + && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); ASSERT(!dep->cache); dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE, @@ -421,9 +421,9 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) ASSERT(is_internal_port(prt_id)); erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx); - prt = erts_id2port(prt_id, NULL, 0); + prt = erts_id2port(prt_id); if (prt) { - ASSERT(erts_smp_atomic32_read_nob(&prt->state) + ASSERT(erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLG_DISTRIBUTION); ASSERT(prt->dist_entry); /* will call do_net_exists !!! */ @@ -455,7 +455,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) erts_smp_de_rwlock(dep); ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid) - && erts_lc_is_port_locked(&erts_port[internal_port_index(dep->cid)])); + && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); if (erts_port_task_is_scheduled(&dep->dist_cmd)) erts_port_task_abort(dep->cid, &dep->dist_cmd); @@ -1957,7 +1957,7 @@ erts_dist_command(Port *prt, int reds_limit) dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - state = erts_smp_atomic32_read_nob(&prt->state); + state = erts_atomic32_read_nob(&prt->state); if (reds > reds_limit) goto preempted; @@ -1978,7 +1978,7 @@ erts_dist_command(Port *prt, int reds_limit) obufsize += size_obuf(fob); foq.first = foq.first->next; free_dist_obuf(fob); - state = erts_smp_atomic32_read_nob(&prt->state); + state = erts_atomic32_read_nob(&prt->state); preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD); if (state & ERTS_PORT_SFLG_PORT_BUSY) break; @@ -2060,7 +2060,7 @@ erts_dist_command(Port *prt, int reds_limit) obufsize += size_obuf(fob); oq.first = oq.first->next; free_dist_obuf(fob); - state = erts_smp_atomic32_read_nob(&prt->state); + state = erts_atomic32_read_nob(&prt->state); preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD); if ((state & ERTS_PORT_SFLG_PORT_BUSY) && oq.first && !preempt) goto finalize_only; @@ -2580,10 +2580,13 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) else if (!dep) goto system_limit; /* Should never happen!!! */ - pp = erts_id2port(BIF_ARG_2, BIF_P, ERTS_PROC_LOCK_MAIN); + pp = erts_id2port_sflgs(BIF_ARG_2, + BIF_P, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); erts_smp_de_rwlock(dep); - if (!pp || (erts_smp_atomic32_read_nob(&pp->state) + if (!pp || (erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_EXITING)) goto badarg; @@ -2613,7 +2616,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; - erts_smp_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); pp->dist_entry = dep; @@ -2658,7 +2661,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) } if (pp) - erts_smp_port_unlock(pp); + erts_port_release(pp); return ret; diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 98c069da8a..7de8786c4a 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -187,7 +187,7 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry) if (prt) { ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - ASSERT((erts_smp_atomic32_read_nob(&prt->state) + ASSERT((erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) == 0); ASSERT(prt->dist_entry); diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index d0c0a13400..e8a9c11034 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -2273,6 +2273,7 @@ erts_allocated_areas(int *print_to_p, void *print_to_arg, void *proc) int i, length; Uint reserved_atom_space, atom_space; int max_processes = erts_ptab_max(&erts_proc); + int max_ports = erts_ptab_max(&erts_port); if (proc) { ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN @@ -2303,8 +2304,8 @@ erts_allocated_areas(int *print_to_p, void *print_to_arg, void *proc) values[i].arity = 2; values[i].name = "static"; - values[i].ui[0] = - erts_max_ports*sizeof(Port) /* Port table */ + values[i].ui[0] = + max_ports*sizeof(Port) /* Port table */ + erts_timer_wheel_memory_size() /* Timer wheel */ #ifdef SYS_TMP_BUF_SIZE + SYS_TMP_BUF_SIZE /* tmp_buf in sys on vxworks & ose */ diff --git a/erts/emulator/beam/erl_alloc.h b/erts/emulator/beam/erl_alloc.h index e475f9d8a2..ba5ec9c367 100644 --- a/erts/emulator/beam/erl_alloc.h +++ b/erts/emulator/beam/erl_alloc.h @@ -267,6 +267,8 @@ typedef void (*erts_alloc_verify_func_t)(Allctr_t *); erts_alloc_verify_func_t erts_alloc_get_verify_unused_temp_alloc(Allctr_t **allctr); +#define ERTS_ALC_DATA_ALIGN_SIZE(SZ) \ + (((((SZ) - 1) / 8) + 1) * 8) #define ERTS_ALC_CACHE_LINE_ALIGN_SIZE(SZ) \ (((((SZ) - 1) / ERTS_CACHE_LINE_SIZE) + 1) * ERTS_CACHE_LINE_SIZE) diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 4ad0c41b50..92ba15214e 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -146,6 +146,7 @@ class SYSTEM system_data type SBMBC SBMBC SYSTEM small_block_mbc type PROC FIXED_SIZE PROCESSES proc +type PORT DRIVER SYSTEM port type ATOM LONG_LIVED ATOM atom_entry type MODULE LONG_LIVED CODE module_entry type REG_PROC STANDARD PROCESSES reg_proc @@ -188,7 +189,7 @@ type PORT_TABLE LONG_LIVED SYSTEM port_tab type TIMER_WHEEL LONG_LIVED SYSTEM timer_wheel type DRV DRIVER SYSTEM drv_internal type DRV_BINARY BINARY BINARIES drv_binary -type DRIVER STANDARD SYSTEM driver +type DRIVER DRIVER SYSTEM driver type NIF DRIVER SYSTEM nif_internal type BINARY BINARY BINARIES binary type NBIF_TABLE SYSTEM SYSTEM nbif_tab @@ -196,7 +197,6 @@ type ARG_REG STANDARD PROCESSES arg_reg type PROC_DICT STANDARD PROCESSES proc_dict type CALLS_BUF STANDARD PROCESSES calls_buf type BPD STANDARD SYSTEM bpd -type PORT_NAME STANDARD SYSTEM port_name type LINEBUF STANDARD SYSTEM line_buf type IOQ STANDARD SYSTEM io_queue type BITS_BUF STANDARD SYSTEM bits_buf @@ -236,7 +236,7 @@ type PORT_TASK SHORT_LIVED SYSTEM port_task type PORT_TASKQ SHORT_LIVED SYSTEM port_task_queue type MISC_OP_LIST SHORT_LIVED SYSTEM misc_op_list type PORT_NAMES SHORT_LIVED SYSTEM port_names -type PORT_DATA_LOCK STANDARD SYSTEM port_data_lock +type PORT_DATA_LOCK DRIVER SYSTEM port_data_lock type NODES_MON STANDARD PROCESSES nodes_monitor type PTAB_LIST_DEL SHORT_LIVED PROCESSES ptab_list_deleted_el type PTAB_LIST_CNKI SHORT_LIVED PROCESSES ptab_list_chunk_info diff --git a/erts/emulator/beam/erl_async.c b/erts/emulator/beam/erl_async.c index 8d3bea82c1..d8adaedc91 100644 --- a/erts/emulator/beam/erl_async.c +++ b/erts/emulator/beam/erl_async.c @@ -375,10 +375,15 @@ static ERTS_INLINE ErtsAsync *async_get(ErtsThrQ_t *q, static ERTS_INLINE void call_async_ready(ErtsAsync *a) { +#if ERTS_USE_ASYNC_READY_Q Port *p = erts_id2port_sflgs(a->port, NULL, 0, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); +#else + Port *p = erts_thr_id2port_sflgs(a->port, + ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); +#endif if (!p) { if (a->async_free) a->async_free(a->async_data); @@ -388,7 +393,11 @@ static ERTS_INLINE void call_async_ready(ErtsAsync *a) if (a->async_free) a->async_free(a->async_data); } +#if ERTS_USE_ASYNC_READY_Q erts_port_release(p); +#else + erts_thr_port_release(p); +#endif } if (a->hndl) erts_ddll_dereference_driver(a->hndl); diff --git a/erts/emulator/beam/erl_bif_ddll.c b/erts/emulator/beam/erl_bif_ddll.c index a120a3e5f2..e18b1f0159 100644 --- a/erts/emulator/beam/erl_bif_ddll.c +++ b/erts/emulator/beam/erl_bif_ddll.c @@ -117,48 +117,27 @@ static void ddll_no_more_references(void *vdh); static void kill_ports_driver_unloaded(DE_Handle *dh) { - int ix; + int ix, max = erts_ptab_max(&erts_port); - for (ix = 0; ix < erts_max_ports; ix++) { - Port* prt = &erts_port[ix]; + for (ix = 0; ix < max; ix++) { erts_aint32_t state; - state = erts_smp_atomic32_read_acqb(&prt->state); - if (state & FREE_PORT_FLAGS) + Port* prt = erts_pix2port(ix); + if (!prt) continue; - erts_smp_port_minor_lock(prt); + ERTS_SMP_DATA_DEPENDENCY_READ_MEMORY_BARRIER; - if (prt->drv_ptr->handle != dh) { - erts_smp_port_minor_unlock(prt); + state = erts_atomic32_read_nob(&prt->state); + if (state & FREE_PORT_FLAGS) continue; - } - erts_smp_atomic32_inc_nob(&prt->common.refc); - erts_smp_port_minor_unlock(prt); - - state = erts_smp_atomic32_read_acqb(&prt->state); - if (!(state & FREE_PORT_FLAGS)) { -#if DDLL_SMP - if (state & ERTS_PORT_SFLG_INITIALIZING) { - /* Extremely rare busy wait */ - do { - ERTS_SPIN_BODY; - state = erts_smp_atomic32_read_acqb(&prt->state); - } while (state & ERTS_PORT_SFLG_INITIALIZING); - } + erts_smp_port_lock(prt); - erts_smp_mtx_lock(prt->lock); + state = erts_atomic32_read_nob(&prt->state); + if (!(state & ERTS_PORT_SFLGS_DEAD) && prt->drv_ptr->handle == dh) + driver_failure_atom((ErlDrvPort) prt, "driver_unloaded"); - if (prt->drv_ptr->handle == dh) { - state = erts_smp_atomic32_read_acqb(&prt->state); - if (!(state & ERTS_PORT_SFLGS_DEAD)) - driver_failure_atom(ix, "driver_unloaded"); - } -#else - driver_failure_atom(ix, "driver_unloaded"); -#endif - erts_port_release(prt); - } + erts_port_release(prt); } } diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 847a0c33e9..0586a89041 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -2189,6 +2189,10 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) BIF_RET(make_small(erts_ptab_count(&erts_proc))); } else if (BIF_ARG_1 == am_process_limit) { BIF_RET(make_small(erts_ptab_max(&erts_proc))); + } else if (BIF_ARG_1 == am_port_count) { + BIF_RET(make_small(erts_ptab_count(&erts_port))); + } else if (BIF_ARG_1 == am_port_limit) { + BIF_RET(make_small(erts_ptab_max(&erts_port))); } else if (BIF_ARG_1 == am_info || BIF_ARG_1 == am_procs || BIF_ARG_1 == am_loaded @@ -2840,7 +2844,10 @@ static BIF_RETTYPE port_info(Process* p, Eterm portid, Eterm item) int count; if (is_internal_port(portid)) - prt = erts_id2port(portid, p, ERTS_PROC_LOCK_MAIN); + prt = erts_id2port_sflgs(portid, + p, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); else if (is_atom(portid)) erts_whereis_name(p, ERTS_PROC_LOCK_MAIN, portid, NULL, 0, 0, &prt); @@ -2975,7 +2982,7 @@ static BIF_RETTYPE port_info(Process* p, Eterm portid, Eterm item) #ifndef ERTS_SMP res = am_false; #else - if (erts_smp_atomic32_read_nob(&prt->state) + if (erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) { DECL_AM(port_level); ASSERT(prt->drv_ptr->flags @@ -2999,7 +3006,7 @@ static BIF_RETTYPE port_info(Process* p, Eterm portid, Eterm item) done: - erts_smp_port_unlock(prt); + erts_port_release(prt); return ret; } @@ -3357,9 +3364,8 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) Eterm res; if (ERTS_IS_ATOM_STR("next_pid", BIF_ARG_1)) res = erts_ptab_test_next_id(&erts_proc, 0, 0); - else { - res = erts_test_next_port(0, 0); - } + else + res = erts_ptab_test_next_id(&erts_port, 0, 0); if (res < 0) BIF_RET(am_false); BIF_RET(erts_make_integer(res, BIF_P)); @@ -3466,11 +3472,14 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) } else if(is_internal_port(tp[2])) { Eterm res; - Port *p = erts_id2port(tp[2], BIF_P, ERTS_PROC_LOCK_MAIN); + Port *p = erts_id2port_sflgs(tp[2], + BIF_P, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); if(!p) BIF_RET(am_undefined); res = make_link_list(BIF_P, ERTS_P_LINKS(p), NIL); - erts_smp_port_unlock(p); + erts_port_release(p); BIF_RET(res); } else if(is_node_name_atom(tp[2])) { @@ -3714,9 +3723,8 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) if (ERTS_IS_ATOM_STR("next_pid", BIF_ARG_1)) res = erts_ptab_test_next_id(&erts_proc, 1, next); - else { - res = erts_test_next_port(1, next); - } + else + res = erts_ptab_test_next_id(&erts_port, 1, next); if (res < 0) BIF_RET(am_false); BIF_RET(erts_make_integer(res, BIF_P)); diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c index 2b7760f556..31151dc9f5 100644 --- a/erts/emulator/beam/erl_bif_port.c +++ b/erts/emulator/beam/erl_bif_port.c @@ -42,7 +42,7 @@ #include "erl_bits.h" #include "dtrace-wrapper.h" -static int open_port(Process* p, Eterm name, Eterm settings, int *err_nump); +static Port *open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump); static byte* convert_environment(Process* p, Eterm env); static char **convert_args(Eterm); static void free_args(char **); @@ -54,16 +54,17 @@ port_call(Process* p, Eterm arg1, Eterm arg2, Eterm arg3); BIF_RETTYPE open_port_2(BIF_ALIST_2) { - int port_num; - Eterm port_val; + Port *port; + Eterm port_id; char *str; - int err_num; + int err_type, err_num; - if ((port_num = open_port(BIF_P, BIF_ARG_1, BIF_ARG_2, &err_num)) < 0) { - if (port_num == -3) { + port = open_port(BIF_P, BIF_ARG_1, BIF_ARG_2, &err_type, &err_num); + if (!port) { + if (err_type == -3) { ASSERT(err_num == BADARG || err_num == SYSTEM_LIMIT); BIF_ERROR(BIF_P, err_num); - } else if (port_num == -2) { + } else if (err_type == -2) { str = erl_errno_id(err_num); } else { str = "einval"; @@ -74,15 +75,15 @@ BIF_RETTYPE open_port_2(BIF_ALIST_2) erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK); - port_val = erts_port[port_num].common.id; - erts_add_link(&ERTS_P_LINKS(&erts_port[port_num]), LINK_PID, BIF_P->common.id); - erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, port_val); + port_id = port->common.id; + erts_add_link(&ERTS_P_LINKS(port), LINK_PID, BIF_P->common.id); + erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, port_id); erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); - erts_port_release(&erts_port[port_num]); + erts_port_release(port); - BIF_RET(port_val); + BIF_RET(port_id); } /**************************************************************************** @@ -112,7 +113,10 @@ id_or_name2port(Process *c_p, Eterm id) { Port *port; if (is_not_atom(id)) - port = erts_id2port(id, c_p, ERTS_PROC_LOCK_MAIN); + port = erts_id2port_sflgs(id, + c_p, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); else erts_whereis_name(c_p, ERTS_PROC_LOCK_MAIN, id, NULL, 0, 0, &port); return port; @@ -164,7 +168,7 @@ do_port_command(Process *BIF_P, Eterm arg1, Eterm arg2, Eterm arg3, ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP); } else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE) - && (erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLG_PORT_BUSY)) { + && (erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLG_PORT_BUSY)) { if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) { ERTS_BIF_PREP_RET(res, am_false); } @@ -538,7 +542,7 @@ BIF_RETTYPE port_connect_2(BIF_ALIST_2) rp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, pid, ERTS_PROC_LOCK_LINK); if (!rp) { - erts_smp_port_unlock(prt); + erts_port_release(prt); ERTS_SMP_ASSERT_IS_NOT_EXITING(BIF_P); goto error; } @@ -549,7 +553,7 @@ BIF_RETTYPE port_connect_2(BIF_ALIST_2) erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); prt->connected = pid; /* internal pid */ - erts_smp_port_unlock(prt); + erts_port_release(prt); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_connect)) { DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); @@ -591,7 +595,7 @@ BIF_RETTYPE port_set_data_2(BIF_ALIST_2) hp = bp->mem; prt->data = copy_struct(data, size, &hp, &bp->off_heap); } - erts_smp_port_unlock(prt); + erts_port_release(prt); BIF_RET(am_true); } @@ -612,7 +616,7 @@ BIF_RETTYPE port_get_data_1(BIF_ALIST_1) Eterm* hp = HAlloc(BIF_P, prt->bp->used_size); res = copy_struct(prt->data, prt->bp->used_size, &hp, &MSO(BIF_P)); } - erts_smp_port_unlock(prt); + erts_port_release(prt); BIF_RET(res); } @@ -625,11 +629,10 @@ BIF_RETTYPE port_get_data_1(BIF_ALIST_1) * either BADARG or SYSTEM_LIMIT). */ -static int -open_port(Process* p, Eterm name, Eterm settings, int *err_nump) +static Port * +open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump) { -#define OPEN_PORT_ERROR(VAL) do { port_num = (VAL); goto do_return; } while (0) - int i, port_num; + int i; Eterm option; Uint arity; Eterm* tp; @@ -641,6 +644,7 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_nump) Eterm edir = NIL; byte dir[MAXPATHLEN]; erts_aint32_t sflgs = 0; + Port *port; /* These are the defaults */ opts.packet_bytes = 0; @@ -923,38 +927,40 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_nump) erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN); - port_num = erts_open_driver(driver, p->common.id, name_buf, &opts, err_nump); + port = erts_open_driver(driver, p->common.id, name_buf, &opts, err_typep, err_nump); #ifdef USE_VM_PROBES - if (port_num >= 0 && DTRACE_ENABLED(port_open)) { + if (port && DTRACE_ENABLED(port_open)) { DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); dtrace_proc_str(p, process_str); - erts_snprintf(port_str, sizeof(port_str), "%T", erts_port[port_num].id); + erts_snprintf(port_str, sizeof(port_str), "%T", port->common.id); DTRACE3(port_open, process_str, name_buf, port_str); } #endif erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN); - if (port_num < 0) { - DEBUGF(("open_driver returned %d(%d)\n", port_num, *err_nump)); + if (!port) { + DEBUGF(("open_driver returned (%d:%d)\n", + err_typep ? *err_typep : 4711, + err_nump ? *err_nump : 4711)); if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) { trace_virtual_sched(p, am_in); } - OPEN_PORT_ERROR(port_num); + goto do_return; } if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) { trace_virtual_sched(p, am_in); } - if (linebuf && erts_port[port_num].linebuf == NULL){ - erts_port[port_num].linebuf = allocate_linebuf(linebuf); + if (linebuf && port->linebuf == NULL){ + port->linebuf = allocate_linebuf(linebuf); sflgs |= ERTS_PORT_SFLG_LINEBUF_IO; } if (sflgs) - erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state, sflgs); + erts_atomic32_read_bor_relb(&port->state, sflgs); do_return: if (name_buf) @@ -965,13 +971,15 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_nump) if (opts.wd && opts.wd != ((char *)dir)) { erts_free(ERTS_ALC_T_TMP, (void *) opts.wd); } - return port_num; + return port; badarg: - *err_nump = BADARG; - OPEN_PORT_ERROR(-3); + if (err_typep) + *err_typep = -3; + if (err_nump) + *err_nump = BADARG; + port = NULL; goto do_return; -#undef OPEN_PORT_ERROR } /* Arguments can be given i unicode and as raw binaries, convert filename is used to convert */ diff --git a/erts/emulator/beam/erl_bif_trace.c b/erts/emulator/beam/erl_bif_trace.c index d94b05fc25..60d5d039ed 100644 --- a/erts/emulator/beam/erl_bif_trace.c +++ b/erts/emulator/beam/erl_bif_trace.c @@ -151,14 +151,12 @@ trace_pattern(Process* p, Eterm MFA, Eterm Pattern, Eterm flaglist) } } else if (is_internal_port(meta_tracer_pid)) { Port *meta_tracer_port; - meta_tracer_proc = NULL; - if (internal_port_index(meta_tracer_pid) >= erts_max_ports) + meta_tracer_proc = NULL; + meta_tracer_port = (erts_port_lookup( + meta_tracer_pid, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP)); + if (!meta_tracer_port) goto error; - meta_tracer_port = - &erts_port[internal_port_index(meta_tracer_pid)]; - if (INVALID_TRACER_PORT(meta_tracer_port, meta_tracer_pid)) { - goto error; - } } else { goto error; } @@ -479,14 +477,14 @@ Eterm trace_3(BIF_ALIST_3) ? ERTS_PROC_LOCKS_ALL_MINOR : ERTS_PROC_LOCKS_ALL)); } else if (is_internal_port(tracer)) { - Port *tracer_port = erts_id2port(tracer, p, ERTS_PROC_LOCK_MAIN); - if (!erts_is_valid_tracer_port(tracer)) { - if (tracer_port) - erts_smp_port_unlock(tracer_port); + Port *tracer_port = erts_id2port_sflgs(tracer, + p, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); + if (!tracer_port) goto error; - } ERTS_TRACE_FLAGS(tracer_port) |= F_TRACER; - erts_smp_port_unlock(tracer_port); + erts_port_release(tracer_port); } else goto error; @@ -519,12 +517,15 @@ Eterm trace_3(BIF_ALIST_3) if (pid_spec == tracer) goto error; - tracee_port = erts_id2port(pid_spec, p, ERTS_PROC_LOCK_MAIN); + tracee_port = erts_id2port_sflgs(pid_spec, + p, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); if (!tracee_port) goto error; if (tracer != NIL && port_already_traced(p, tracee_port, tracer)) { - erts_smp_port_unlock(tracee_port); + erts_port_release(tracee_port); goto already_traced; } @@ -538,7 +539,7 @@ Eterm trace_3(BIF_ALIST_3) else if (tracer != NIL) ERTS_TRACER_PROC(tracee_port) = tracer; - erts_smp_port_unlock(tracee_port); + erts_port_release(tracee_port); matches = 1; } else if (is_pid(pid_spec)) { @@ -677,15 +678,21 @@ Eterm trace_3(BIF_ALIST_3) } } if (ports || mods) { + int max = erts_ptab_max(&erts_port); /* tracing of ports */ - for (i = 0; i < erts_max_ports; i++) { - Port *tracee_port = &erts_port[i]; + for (i = 0; i < max; i++) { erts_aint32_t state; - state = erts_smp_atomic32_read_nob(&tracee_port->state); - if (state & ERTS_PORT_SFLGS_DEAD) continue; + Port *tracee_port = erts_pix2port(i); + if (!tracee_port) + continue; + state = erts_atomic32_read_nob(&tracee_port->state); + if (state & ERTS_PORT_SFLGS_DEAD) + continue; if (tracer != NIL) { - if (tracee_port->common.id == tracer) continue; - if (port_already_traced(NULL, tracee_port, tracer)) continue; + if (tracee_port->common.id == tracer) + continue; + if (port_already_traced(NULL, tracee_port, tracer)) + continue; } if (on) ERTS_TRACE_FLAGS(tracee_port) |= mask; @@ -2147,9 +2154,11 @@ BIF_RETTYPE system_profile_2(BIF_ALIST_2) if (!profiler_p) goto error; } else if (is_internal_port(profiler)) { - if (internal_port_index(profiler) >= erts_max_ports) goto error; - profiler_port = &erts_port[internal_port_index(profiler)]; - if (INVALID_TRACER_PORT(profiler_port, profiler)) goto error; + profiler_port = (erts_port_lookup( + profiler, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP)); + if (!profiler_port) + goto error; } else { goto error; } diff --git a/erts/emulator/beam/erl_db_util.c b/erts/emulator/beam/erl_db_util.c index d89190e3c1..67c2d0eb7f 100644 --- a/erts/emulator/beam/erl_db_util.c +++ b/erts/emulator/beam/erl_db_util.c @@ -182,12 +182,13 @@ set_match_trace(Process *tracee_p, Eterm fail_term, Eterm tracer, } } else if (is_internal_port(tracer)) { Port *tracer_port = - erts_id2port(tracer, tracee_p, ERTS_PROC_LOCKS_ALL); + erts_id2port_sflgs(tracer, + tracee_p, + ERTS_PROC_LOCKS_ALL, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); if (tracer_port) { - if (! INVALID_TRACER_PORT(tracer_port, tracer)) { - ret = set_tracee_flags(tracee_p, tracer, d_flags, e_flags); - } - erts_smp_port_unlock(tracer_port); + ret = set_tracee_flags(tracee_p, tracer, d_flags, e_flags); + erts_port_release(tracer_port); } } else { ASSERT(is_nil(tracer)); diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h index 1ae9a211d7..1dc43419fe 100644 --- a/erts/emulator/beam/erl_driver.h +++ b/erts/emulator/beam/erl_driver.h @@ -210,8 +210,8 @@ typedef struct erl_drv_binary { typedef struct _erl_drv_data* ErlDrvData; /* Data to be used by the driver itself. */ #ifndef ERL_SYS_DRV typedef struct _erl_drv_event* ErlDrvEvent; /* An event to be selected on. */ -typedef struct _erl_drv_port* ErlDrvPort; /* A port descriptor. */ #endif +typedef struct _erl_drv_port* ErlDrvPort; /* A port descriptor. */ typedef struct _erl_drv_port* ErlDrvThreadData; /* Thread data. */ #if !defined(__WIN32__) && !defined(_WIN32) && !defined(_WIN32_) && !defined(USE_SELECT) diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 1db7b27412..06203a706d 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -111,9 +111,6 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "efile_drv", "address" }, #if defined(ENABLE_CHILD_WAITER_THREAD) || defined(ERTS_SMP) { "child_status", NULL }, -#endif -#ifdef __WIN32__ - { "sys_driver_data_lock", NULL }, #endif { "drv_ev_state_grow", NULL, }, { "drv_ev_state", "address" }, @@ -159,7 +156,7 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "port_taskq_pre_alloc_lock", "address" }, { "proclist_pre_alloc_lock", "address" }, { "port_tasks_lock", NULL }, - { "get_free_port", NULL }, + { "port_table", NULL }, { "port_state", "address" }, { "xports_list_pre_alloc_lock", "address" }, { "inet_buffer_stack_lock", NULL }, @@ -248,6 +245,7 @@ typedef struct { typedef struct erts_lc_locked_locks_t_ erts_lc_locked_locks_t; struct erts_lc_locked_locks_t_ { char *thread_name; + int emu_thread; erts_tid_t tid; erts_lc_locked_locks_t *next; erts_lc_locked_locks_t *prev; @@ -365,6 +363,7 @@ create_locked_locks(char *thread_name) if (!l_lcks->thread_name) lc_abort(); + l_lcks->emu_thread = 0; l_lcks->tid = erts_thr_self(); l_lcks->required.first = NULL; l_lcks->required.last = NULL; @@ -672,7 +671,7 @@ erts_lc_set_thread_name(char *thread_name) { erts_lc_locked_locks_t *l_lcks = get_my_locked_locks(); if (!l_lcks) - (void) create_locked_locks(thread_name); + l_lcks = create_locked_locks(thread_name); else { ASSERT(l_lcks->thread_name); free((void *) l_lcks->thread_name); @@ -680,6 +679,14 @@ erts_lc_set_thread_name(char *thread_name) if (!l_lcks->thread_name) lc_abort(); } + l_lcks->emu_thread = 1; +} + +int +erts_lc_is_emu_thr(void) +{ + erts_lc_locked_locks_t *l_lcks = get_my_locked_locks(); + return l_lcks->emu_thread; } int diff --git a/erts/emulator/beam/erl_lock_check.h b/erts/emulator/beam/erl_lock_check.h index b67f36fa06..e76bcf556e 100644 --- a/erts/emulator/beam/erl_lock_check.h +++ b/erts/emulator/beam/erl_lock_check.h @@ -102,6 +102,7 @@ void erts_lc_unrequire_lock_flg(erts_lc_lock_t *lck, Uint16 op_flags); void erts_lc_require_lock(erts_lc_lock_t *lck); void erts_lc_unrequire_lock(erts_lc_lock_t *lck); +int erts_lc_is_emu_thr(void); #define ERTS_LC_ASSERT(A) \ ((void) ((A) ? 1 : erts_lc_assert_failed(__FILE__, __LINE__, #A))) diff --git a/erts/emulator/beam/erl_monitors.c b/erts/emulator/beam/erl_monitors.c index 450aad975d..63175c44d6 100644 --- a/erts/emulator/beam/erl_monitors.c +++ b/erts/emulator/beam/erl_monitors.c @@ -985,12 +985,15 @@ Eterm erts_debug_dump_links_1(BIF_ALIST_1) Process *rp; DistEntry *dep; if (is_internal_port(pid)) { - Port *rport = erts_id2port(pid, p, ERTS_PROC_LOCK_MAIN); + Port *rport = erts_id2port_sflgs(pid, + p, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); if (rport) { erts_printf("Dumping port links----------------------\n"); erts_dump_links(ERTS_P_LINKS(rport), 0); erts_printf("Links dumped----------------------------\n"); - erts_smp_port_unlock(rport); + erts_port_release(rport); BIF_RET(am_true); } else { BIF_ERROR(p,BADARG); diff --git a/erts/emulator/beam/erl_node_container_utils.h b/erts/emulator/beam/erl_node_container_utils.h index 62fb277eef..7796cbfc2b 100644 --- a/erts/emulator/beam/erl_node_container_utils.h +++ b/erts/emulator/beam/erl_node_container_utils.h @@ -197,8 +197,10 @@ extern ErtsPTab erts_proc; * Ports * \* */ -#define internal_port_index(x) (internal_port_data((x)) \ - & erts_port_tab_index_mask) +extern ErtsPTab erts_port; + +#define internal_port_index(x) erts_ptab_data2ix(&erts_port, \ + internal_port_data((x))) #define internal_port_node_name(x) (internal_port_node((x))->sysname) #define external_port_node_name(x) (external_port_node((x))->sysname) @@ -237,7 +239,7 @@ extern ErtsPTab erts_proc; #define is_not_port(x) (!is_port(x)) /* Highest port-ID part in a term of type Port - Not necessarily the same as the variable erts_max_ports + Not necessarily the same as current maximum port table size which defines the maximum number of simultaneous Ports in the Erlang node. ERTS_MAX_PORTS is a hard upper limit. */ @@ -249,6 +251,9 @@ extern ErtsPTab erts_proc; #define ERTS_R9_PORTS_BITS (_PORT_R9_NUM_SIZE) #define ERTS_PORTS_BITS (_PORT_NUM_SIZE) + +#define ERTS_INVALID_PORT make_internal_port(ERTS_MAX_PORT_DATA) + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\ * Refs * \* */ diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 3b400ce0de..67f413ef28 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -1274,7 +1274,7 @@ setup_reference_table(void) ErlHeapFragment *hfp; DistEntry *dep; HashInfo hi; - int i, max_processes; + int i, max; DeclareTmpHeapNoproc(heap,3); inserted_bins = NULL; @@ -1309,9 +1309,9 @@ setup_reference_table(void) #endif UnUseTmpHeapNoproc(3); - max_processes = erts_ptab_max(&erts_proc); + max = erts_ptab_max(&erts_proc); /* Insert all processes */ - for (i = 0; i < max_processes; i++) { + for (i = 0; i < max; i++) { Process *proc = erts_pix2proc(i); if (proc) { ErlMessage *msg; @@ -1383,25 +1383,33 @@ setup_reference_table(void) #endif /* Insert all ports */ - for (i = 0; i < erts_max_ports; i++) { - erts_aint32_t state = erts_smp_atomic32_read_nob(&erts_port[i].state); + max = erts_ptab_max(&erts_port); + for (i = 0; i < max; i++) { + erts_aint32_t state; + Port *prt; + + prt = erts_pix2port(i); + if (!prt) + continue; + + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_DEAD) continue; /* Insert links */ - if (ERTS_P_LINKS(&erts_port[i])) - insert_links(ERTS_P_LINKS(&erts_port[i]), erts_port[i].common.id); + if (ERTS_P_LINKS(prt)) + insert_links(ERTS_P_LINKS(prt), prt->common.id); /* Insert monitors */ - if (ERTS_P_MONITORS(&erts_port[i])) - insert_monitors(ERTS_P_MONITORS(&erts_port[i]), erts_port[i].common.id); + if (ERTS_P_MONITORS(prt)) + insert_monitors(ERTS_P_MONITORS(prt), prt->common.id); /* Insert port data */ - for(hfp = erts_port[i].bp; hfp; hfp = hfp->next) - insert_offheap(&(hfp->off_heap), HEAP_REF, erts_port[i].common.id); + for(hfp = prt->bp; hfp; hfp = hfp->next) + insert_offheap(&(hfp->off_heap), HEAP_REF, prt->common.id); /* Insert controller */ - if (erts_port[i].dist_entry) - insert_dist_entry(erts_port[i].dist_entry, + if (prt->dist_entry) + insert_dist_entry(prt->dist_entry, CTRL_REF, - erts_port[i].common.id, + prt->common.id, 0); } diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 4a015bdef9..3a74888522 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -100,7 +100,6 @@ typedef struct { */ struct erl_link; -struct port; typedef struct dist_entry_ { HashBucket hash_bucket; /* Hash bucket */ @@ -141,7 +140,7 @@ typedef struct dist_entry_ { erts_smp_atomic_t dist_cmd_scheduled; ErtsPortTaskHandle dist_cmd; - Uint (*send)(struct port *prt, ErtsDistOutputBuf *obuf); + Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); struct cache* cache; /* The atom cache */ } DistEntry; diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 1a5bc636e2..fb34a6da2d 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -17,15 +17,21 @@ * %CopyrightEnd% */ -#ifndef ERL_PORT_H__ +#ifndef ERL_PORT_TYPE__ +#define ERL_PORT_TYPE__ +typedef struct _erl_drv_port Port; +#endif + +#if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__) #define ERL_PORT_H__ -typedef struct port Port; #include "erl_port_task.h" #include "erl_ptab.h" +#include "erl_thr_progress.h" typedef struct erts_driver_t_ erts_driver_t; +#define ERTS_INVALID_ERL_DRV_PORT ((ErlDrvPort) (SWord) -1) #define SMALL_IO_QUEUE 5 /* Number of fixed elements */ typedef struct { @@ -90,16 +96,18 @@ typedef struct ErtsXPortsList_ ErtsXPortsList; * */ -struct port { +struct _erl_drv_port { ErtsPTabElementCommon common; /* *Need* to be first in struct */ ErtsPortTaskSched sched; ErtsPortTaskHandle timeout_task; #ifdef ERTS_SMP - erts_smp_mtx_t *lock; + erts_mtx_t *lock; ErtsXPortsList *xports; erts_smp_atomic_t run_queue; - erts_smp_spinlock_t state_lck; /* protects: id, status, snapshot */ +#else + erts_atomic32_t refc; + int cleanup; #endif Eterm connected; /* A connected process */ Eterm caller; /* Current caller. */ @@ -116,15 +124,20 @@ struct port { ErtsProcList *suspended; /* List of suspended processes. */ LineBuf *linebuf; /* Buffer to hold data not ready for process to get (line oriented I/O)*/ - erts_smp_atomic32_t state; /* Status and type flags */ + erts_atomic32_t state; /* Status and type flags */ int control_flags; /* Flags for port_control() */ - erts_aint32_t snapshot; /* Next snapshot that port should be part of */ ErlDrvPDL port_data_lock; ErtsPrtSD *psd; /* Port specific data */ }; +struct erl_drv_port_data_lock { + erts_mtx_t mtx; + erts_atomic_t refc; + Port *prt; +}; + ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF @@ -183,35 +196,6 @@ erts_prtsd_set(Port *prt, int ix, void *data) #endif -/* arrays that get malloced at startup */ -extern Port* erts_port; - -extern Uint erts_max_ports; -extern Uint erts_port_tab_index_mask; -extern erts_smp_atomic32_t erts_ports_snapshot; -extern erts_smp_atomic_t erts_dead_ports_ptr; - -ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt); - -#if ERTS_GLB_INLINE_INCL_FUNC_DEF - -ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt) -{ - ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck)); - if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) { - /* Dead ports are added from the end of the snapshot buffer */ - Eterm* tombstone; - tombstone = (Eterm*) erts_smp_atomic_add_read_nob(&erts_dead_ports_ptr, - -(erts_aint_t)sizeof(Eterm)); - ASSERT(tombstone+1 != NULL); - ASSERT(prt->snapshot == erts_smp_atomic32_read_nob(&erts_ports_snapshot) - 1); - *tombstone = prt->common.id; - } - /*else no ongoing snapshot or port was already included or created after snapshot */ -} - -#endif - extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ @@ -263,8 +247,11 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ | ERTS_PORT_SFLG_PORT_BUSY \ | ERTS_PORT_SFLG_DISTRIBUTION) - +void print_port_info(Port *, int, void *); +void erts_port_free(Port *); +#ifndef ERTS_SMP void erts_port_cleanup(Port *); +#endif void erts_fire_port_monitor(Port *prt, Eterm ref); #ifdef ERTS_SMP void erts_smp_xports_unlock(Port *); @@ -274,8 +261,9 @@ void erts_smp_xports_unlock(Port *); int erts_lc_is_port_locked(Port *); #endif -ERTS_GLB_INLINE void erts_smp_port_minor_lock(Port*); -ERTS_GLB_INLINE void erts_smp_port_minor_unlock(Port*); +ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt); +ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt); +ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc); ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt); ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt); @@ -283,64 +271,71 @@ ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -ERTS_GLB_INLINE void -erts_smp_port_minor_lock(Port* prt) +ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt) { #ifdef ERTS_SMP - erts_smp_spin_lock(&prt->state_lck); + erts_ptab_inc_refc(&prt->common); +#else + erts_atomic32_inc_nob(&prt->refc); #endif } -ERTS_GLB_INLINE void -erts_smp_port_minor_unlock(Port *prt) +ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt) { #ifdef ERTS_SMP - erts_smp_spin_unlock(&prt->state_lck); + int referred = erts_ptab_dec_test_refc(&prt->common); + if (!referred) + erts_port_free(prt); +#else + int refc = erts_atomic32_dec_read_nob(&prt->refc); + if (refc == 0) + erts_port_free(prt); #endif } +ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc) +{ +#ifdef ERTS_SMP + int referred = erts_ptab_add_test_refc(&prt->common, add_refc); + if (!referred) + erts_port_free(prt); +#else + int refc = erts_atomic32_add_read_nob(&prt->refc, add_refc); + if (refc == 0) + erts_port_free(prt); +#endif +} ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt) { - int res; - - ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0); - erts_smp_atomic32_inc_nob(&prt->common.refc); - #ifdef ERTS_SMP - res = erts_smp_mtx_trylock(prt->lock); - if (res == EBUSY) { - erts_smp_atomic32_dec_nob(&prt->common.refc); - } + /* *Need* to be a managed thread */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + return erts_mtx_trylock(prt->lock); #else - res = 0; + return 0; #endif - - return res; } ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt) { - ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0); - erts_smp_atomic32_inc_nob(&prt->common.refc); #ifdef ERTS_SMP - erts_smp_mtx_lock(prt->lock); + /* *Need* to be a managed thread */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + erts_mtx_lock(prt->lock); #endif } ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt) { - erts_aint32_t refc; #ifdef ERTS_SMP - erts_smp_mtx_unlock(prt->lock); + /* *Need* to be a managed thread */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + erts_mtx_unlock(prt->lock); #endif - refc = erts_smp_atomic32_dec_read_nob(&prt->common.refc); - ASSERT(refc >= 0); - if (refc == 0) - erts_port_cleanup(prt); } #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ @@ -348,7 +343,7 @@ erts_smp_port_unlock(Port *prt) #define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \ (!(PP) \ - || (erts_smp_atomic32_read_nob(&(PP)->state) & (FLGS)) \ + || (erts_atomic32_read_nob(&(PP)->state) & (FLGS)) \ || (PP)->common.id != (ID)) /* port lookup */ @@ -365,125 +360,301 @@ erts_smp_port_unlock(Port *prt) #define ERTS_PORT_SCHED_ID(P, ID) \ ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID))) +extern const Port erts_invalid_port; +#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port) + #ifdef ERTS_SMP Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks); #endif -#define erts_id2port(ID, P, PL) \ - erts_id2port_sflgs((ID), (P), (PL), ERTS_PORT_SFLGS_INVALID_LOOKUP) - -ERTS_GLB_INLINE Port*erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32); +ERTS_GLB_INLINE Port *erts_pix2port(int); +ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm); +ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32); +ERTS_GLB_INLINE Port*erts_id2port(Eterm id); +ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32); ERTS_GLB_INLINE void erts_port_release(Port *); -ERTS_GLB_INLINE Port*erts_drvport2port(ErlDrvPort, erts_aint32_t *); -ERTS_GLB_INLINE Port*erts_drvportid2port(Eterm); -ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id); -ERTS_GLB_INLINE int erts_is_port_alive(Eterm id); -ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm id); +#ifdef ERTS_SMP +ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs); +ERTS_GLB_INLINE void erts_thr_port_release(Port *prt); +#endif +ERTS_GLB_INLINE Port *erts_thr_drvport2port_raw(ErlDrvPort); +ERTS_GLB_INLINE Port *erts_drvport2port_raw(ErlDrvPort drvport); +ERTS_GLB_INLINE Port *erts_drvport2port(ErlDrvPort, erts_aint32_t *); +ERTS_GLB_INLINE Port *erts_drvportid2port(Eterm); +ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort); +ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm); +ERTS_GLB_INLINE int erts_is_port_alive(Eterm); +ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm); #if ERTS_GLB_INLINE_INCL_FUNC_DEF +ERTS_GLB_INLINE Port *erts_pix2port(int ix) +{ + Port *prt; + ASSERT(0 <= ix && ix < erts_ptab_max(&erts_port)); + prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, ix); + return prt == ERTS_PORT_LOCK_BUSY ? NULL : prt; +} + +ERTS_GLB_INLINE Port * +erts_port_lookup_raw(Eterm id) +{ + Port *prt; + + ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying()); + + if (is_not_internal_port(id)) + return NULL; + + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); + return prt && prt->common.id == id ? prt : NULL; +} + +ERTS_GLB_INLINE Port * +erts_port_lookup(Eterm id, Uint32 invalid_sflgs) +{ + Port *prt = erts_port_lookup_raw(id); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); + return (state & invalid_sflgs) ? NULL : prt; +} + + ERTS_GLB_INLINE Port* -erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs) +erts_id2port(Eterm id) +{ + erts_aint32_t state; + Port *prt; + + /* Only allowed to be called from managed threads */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + + if (is_not_internal_port(id)) + return NULL; + + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); + + if (!prt || prt->common.id != id) + return NULL; + + erts_smp_port_lock(prt); + state = erts_atomic32_read_nob(&prt->state); + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + erts_smp_port_unlock(prt); + return NULL; + } + + return prt; +} + + +ERTS_GLB_INLINE Port* +erts_id2port_sflgs(Eterm id, + Process *c_p, ErtsProcLocks c_p_locks, + Uint32 invalid_sflgs) { #ifdef ERTS_SMP int no_proc_locks = !c_p || !c_p_locks; #endif + erts_aint32_t state; Port *prt; + /* Only allowed to be called from managed threads */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + if (is_not_internal_port(id)) return NULL; - prt = &erts_port[internal_port_index(id)]; + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); - erts_smp_port_minor_lock(prt); - if (ERTS_INVALID_PORT_OPT(prt, id, sflgs)) { - erts_smp_port_minor_unlock(prt); - prt = NULL; + if (!prt || prt->common.id != id) + return NULL; + +#ifdef ERTS_SMP + if (no_proc_locks) + erts_smp_port_lock(prt); + else if (erts_smp_port_trylock(prt) == EBUSY) { + /* Unlock process locks, and acquire locks in lock order... */ + erts_smp_proc_unlock(c_p, c_p_locks); + erts_smp_port_lock(prt); + erts_smp_proc_lock(c_p, c_p_locks); + } +#endif + state = erts_atomic32_read_nob(&prt->state); + if (state & invalid_sflgs) { +#ifdef ERTS_SMP + erts_smp_port_unlock(prt); +#endif + return NULL; } - else { - erts_smp_atomic32_inc_nob(&prt->common.refc); - erts_smp_port_minor_unlock(prt); + return prt; +} + +ERTS_GLB_INLINE void +erts_port_release(Port *prt) +{ + /* Only allowed to be called from managed threads */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); #ifdef ERTS_SMP - if (no_proc_locks) - erts_smp_mtx_lock(prt->lock); - else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) { - /* Unlock process locks, and acquire locks in lock order... */ - erts_smp_proc_unlock(c_p, c_p_locks); - erts_smp_mtx_lock(prt->lock); - erts_smp_proc_lock(c_p, c_p_locks); + erts_smp_port_unlock(prt); +#else + if (prt->cleanup) { + prt->cleanup = 0; + erts_port_cleanup(prt); + } +#endif +} + +#ifdef ERTS_SMP + +/* + * erts_thr_id2port_sflgs() and erts_thr_port_release() can + * be used by unmanaged threads in the SMP case. + */ +ERTS_GLB_INLINE Port * +erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs) +{ + Port *prt; + ErtsThrPrgrDelayHandle dhndl; + + if (is_not_internal_port(id)) + return NULL; + + dhndl = erts_thr_progress_unmanaged_delay(); + + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); + + if (!prt || prt->common.id != id) { + erts_thr_progress_unmanaged_continue(dhndl); + prt = NULL; + } + else { + erts_aint32_t state; + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) { + erts_port_inc_refc(prt); + erts_thr_progress_unmanaged_continue(dhndl); } - /* The id may not have changed... */ - ERTS_SMP_LC_ASSERT(prt->common.id == id); - /* ... but state may have... */ - if (erts_smp_atomic32_read_nob(&prt->state) & sflgs) { - erts_smp_port_unlock(prt); /* Also decrements common.refc... */ + erts_mtx_lock(prt->lock); + state = erts_atomic32_read_nob(&prt->state); + if (state & invalid_sflgs) { + erts_mtx_unlock(prt->lock); + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_port_dec_refc(prt); prt = NULL; } -#endif - } return prt; } ERTS_GLB_INLINE void -erts_port_release(Port *prt) +erts_thr_port_release(Port *prt) { - erts_smp_port_unlock(prt); + erts_mtx_unlock(prt->lock); +#ifdef ERTS_SMP + if (!erts_thr_progress_is_managed_thread()) + erts_port_dec_refc(prt); +#endif +} + +#endif + +ERTS_GLB_INLINE Port* +erts_thr_drvport2port_raw(ErlDrvPort drvport) +{ +#if ERTS_ENABLE_LOCK_CHECK + int emu_thread = erts_lc_is_emu_thr(); +#endif + if (drvport == ERTS_INVALID_ERL_DRV_PORT) + return NULL; + else { + Port *prt = (Port *) drvport; +#if ERTS_ENABLE_LOCK_CHECK + if (emu_thread) { + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(!prt->port_data_lock + || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); + } + else { + ERTS_LC_ASSERT(prt->port_data_lock); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); + } +#endif + return prt; + } +} + +ERTS_GLB_INLINE Port* +erts_drvport2port_raw(ErlDrvPort drvport) +{ + ERTS_LC_ASSERT(erts_lc_is_emu_thr()); + if (drvport == ERTS_INVALID_ERL_DRV_PORT) + return NULL; + else { + Port *prt = (Port *) drvport; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + return prt; + } } ERTS_GLB_INLINE Port* erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep) { - int ix = (int) drvport; + Port *prt = erts_drvport2port_raw(drvport); erts_aint32_t state; - if (ix < 0 || erts_max_ports <= ix) + if (!prt) return NULL; - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) return NULL; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); if (statep) *statep = state; - return &erts_port[ix]; + return prt; } ERTS_GLB_INLINE Port* erts_drvportid2port(Eterm id) { - int ix; + Port *prt; erts_aint32_t state; if (is_not_internal_port(id)) return NULL; - ix = (int) internal_port_index(id); - if (erts_max_ports <= ix) + prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, + internal_port_index(id)); + if (!prt) return NULL; - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); - if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + if (prt->common.id != id) return NULL; - if (erts_port[ix].common.id != id) + state = erts_atomic32_read_nob(&prt->state); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) return NULL; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); - return &erts_port[ix]; + return prt; +} + +ERTS_GLB_INLINE Eterm +erts_drvport2id(ErlDrvPort drvport) +{ + Port *prt = erts_drvport2port_raw(drvport); + if (!prt) + return am_undefined; + else + return prt->common.id; } ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id) { - if (is_not_internal_port(id)) + Port *prt = erts_port_lookup_raw(id); + if (prt) + return (Uint32) erts_atomic32_read_acqb(&prt->state); + else return ERTS_PORT_SFLG_INVALID; - else { - erts_aint32_t state; - int ix = internal_port_index(id); - if (erts_max_ports <= ix) - return ERTS_PORT_SFLG_INVALID; - state = erts_smp_atomic32_read_ddrb(&erts_port[ix].state); - if (erts_port[ix].common.id != id) - return ERTS_PORT_SFLG_INVALID; - return state; - } } ERTS_GLB_INLINE int diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index 6eb8c4869c..112d27c94e 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -51,12 +51,9 @@ #define ERTS_PORT_TASK_INVALID_PORT(P, ID) \ - ((erts_smp_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \ + ((erts_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \ || (P)->common.id != (ID)) -#define ERTS_PORT_IS_IN_RUNQ(RQ, P) \ - ((P)->sched.next || (P)->sched.prev || (RQ)->ports.start == (P)) - #ifdef USE_VM_PROBES #define DTRACE_DRIVER(PROBE_NAME, PP) \ if (DTRACE_ENABLED(driver_ready_input)) { \ @@ -76,7 +73,6 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks; struct ErtsPortTaskQueue_ { ErtsPortTask *first; ErtsPortTask *last; - Port *port; }; struct ErtsPortTask_ { @@ -275,7 +271,6 @@ port_taskq_init(ErtsPortTaskQueue *ptqp, Port *pp) if (ptqp) { ptqp->first = NULL; ptqp->last = NULL; - ptqp->port = pp; } return ptqp; } @@ -429,7 +424,10 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp) ErtsPortTask *ptp; Port *pp; - pp = &erts_port[internal_port_index(id)]; + pp = erts_port_lookup_raw(id); + if (!pp) + return 1; + runq = erts_port_runq(pp); if (!runq) return 1; @@ -512,7 +510,10 @@ erts_port_task_schedule(Eterm id, ptp = port_task_alloc(); ASSERT(is_internal_port(id)); - pp = &erts_port[internal_port_index(id)]; + pp = erts_port_lookup_raw(id); + if (!pp) + return -1; + runq = erts_port_runq(pp); if (!runq || ERTS_PORT_TASK_INVALID_PORT(pp, id)) { @@ -616,7 +617,7 @@ erts_port_task_free_port(Port *pp) ErtsPortTaskQueue *ptqp; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); - ASSERT(!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); + ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); runq = erts_port_runq(pp); ASSERT(runq); ERTS_PT_CHK_PRES_PORTQ(runq, pp); @@ -627,14 +628,10 @@ erts_port_task_free_port(Port *pp) ErtsPortTask *ptp; enqueue_free: ptp = port_task_alloc(); - erts_smp_port_minor_lock(pp); - erts_smp_atomic32_read_bset_relb(&pp->state, - (ERTS_PORT_SFLG_CLOSING - | ERTS_PORT_SFLG_FREE_SCHEDULED), - ERTS_PORT_SFLG_FREE_SCHEDULED); - erts_may_save_closed_port(pp); - erts_smp_port_minor_unlock(pp); - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 1); + erts_atomic32_read_bset_relb(&pp->state, + (ERTS_PORT_SFLG_CLOSING + | ERTS_PORT_SFLG_FREE_SCHEDULED), + ERTS_PORT_SFLG_FREE_SCHEDULED); ptp->type = ERTS_PORT_TASK_FREE; ptp->event = (ErlDrvEvent) -1; ptp->event_data = NULL; @@ -651,20 +648,18 @@ erts_port_task_free_port(Port *pp) goto enqueue_free; } ASSERT(!pp->sched.taskq); - erts_smp_port_minor_lock(pp); - erts_smp_atomic32_read_bset_relb(&pp->state, - (ERTS_PORT_SFLG_CLOSING - | ERTS_PORT_SFLG_FREE_SCHEDULED), - ERTS_PORT_SFLG_FREE_SCHEDULED); - erts_may_save_closed_port(pp); - erts_smp_port_minor_unlock(pp); - erts_smp_atomic32_dec_nob(&pp->common.refc); /* Not alive */ - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 0); /* Lock */ + erts_atomic32_read_bset_relb(&pp->state, + (ERTS_PORT_SFLG_CLOSING + | ERTS_PORT_SFLG_FREE_SCHEDULED), + ERTS_PORT_SFLG_FREE_SCHEDULED); handle_remaining_tasks(runq, pp); /* May release runq lock */ ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first)); pp->sched.taskq = NULL; ERTS_PT_CHK_PRES_PORTQ(runq, pp); erts_smp_runq_unlock(runq); +#ifndef ERTS_SMP + pp->cleanup = 1; +#endif } } @@ -703,14 +698,21 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) goto done; } + if (erts_smp_port_trylock(pp) == EBUSY) { + erts_smp_runq_unlock(runq); + erts_smp_port_lock(pp); + erts_smp_runq_lock(runq); + } + ASSERT(pp->sched.in_runq); pp->sched.in_runq = 0; + if (!pp->sched.taskq) { if (erts_system_profile_flags.runnable_ports) profile_runnable_port(pp, am_inactive); res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) != (erts_aint_t) 0); - goto done; + goto release_port_done; } *curr_port_pp = pp; @@ -721,12 +723,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ASSERT(!pp->sched.exe_taskq); pp->sched.exe_taskq = ptqp; - - if (erts_smp_port_trylock(pp) == EBUSY) { - erts_smp_runq_unlock(runq); - erts_smp_port_lock(pp); - erts_smp_runq_lock(runq); - } if (erts_sched_stat.enabled) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); @@ -771,31 +767,31 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_smp_runq_lock(runq); erts_unblock_fpe(fpe_was_unmasked); - ASSERT(erts_smp_atomic32_read_nob(&pp->state) + ASSERT(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_FREE_SCHEDULED); if (ptqp->first || (pp->sched.taskq && pp->sched.taskq->first)) handle_remaining_tasks(runq, pp); ASSERT(!ptqp->first && (!pp->sched.taskq || !pp->sched.taskq->first)); - erts_smp_atomic32_dec_nob(&pp->common.refc); /* Not alive */ - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 0); /* Lock */ port_task_free(ptp); if (pp->sched.taskq) port_taskq_free(pp->sched.taskq); pp->sched.taskq = NULL; - +#ifndef ERTS_SMP + pp->cleanup = 1; +#endif goto tasks_done; case ERTS_PORT_TASK_TIMEOUT: reds += ERTS_PORT_REDS_TIMEOUT; - if (!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) { + if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) { DTRACE_DRIVER(driver_timeout, pp); (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); } break; case ERTS_PORT_TASK_INPUT: reds += ERTS_PORT_REDS_INPUT; - ASSERT((erts_smp_atomic32_read_nob(&pp->state) + ASSERT((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); /* NOTE some windows drivers use ->ready_input for input and output */ @@ -804,7 +800,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; case ERTS_PORT_TASK_OUTPUT: reds += ERTS_PORT_REDS_OUTPUT; - ASSERT((erts_smp_atomic32_read_nob(&pp->state) + ASSERT((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event); @@ -812,7 +808,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; case ERTS_PORT_TASK_EVENT: reds += ERTS_PORT_REDS_EVENT; - ASSERT((erts_smp_atomic32_read_nob(&pp->state) + ASSERT((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_event, pp); (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data); @@ -828,7 +824,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; } - if ((erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING) + if ((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(pp)) { reds += ERTS_PORT_REDS_TERMINATE; erts_terminate_port(pp); @@ -879,7 +875,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ErtsRunQueue *xrunq; #endif - ASSERT(!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); + ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); ASSERT(pp->sched.taskq->first); #ifdef ERTS_SMP @@ -922,23 +918,9 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { trace_sched_ports(pp, am_out); } -#ifndef ERTS_SMP + +release_port_done: erts_port_release(pp); -#else - { - erts_aint32_t refc; - erts_smp_mtx_unlock(pp->lock); - refc = erts_smp_atomic32_dec_read_nob(&pp->common.refc); - ASSERT(refc >= 0); - if (refc == 0) { - erts_smp_runq_unlock(runq); - erts_port_cleanup(pp); /* Might aquire runq lock */ - erts_smp_runq_lock(runq); - res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - != (erts_aint_t) 0); - } - } -#endif done: ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index f9ff3c4bd5..e6fe4ccdc1 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -43,7 +43,6 @@ #include "erl_thr_queue.h" #include "erl_async.h" #include "dtrace-wrapper.h" -#define ERTS_PTAB_WANT_BIF_IMPL__ #include "erl_ptab.h" #define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS) @@ -1482,37 +1481,32 @@ handle_reap_ports(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_REAP_PORTS); awdp->esdp->run_queue->halt_in_progress = 1; if (erts_smp_atomic32_dec_read_acqb(&erts_halt_progress) == 0) { - int i; + int i, max = erts_ptab_max(&erts_port); erts_smp_atomic32_set_nob(&erts_halt_progress, 1); - for (i = 0; i < erts_max_ports; i++) { - Port *prt = &erts_port[i]; - erts_aint32_t state = erts_smp_atomic32_read_acqb(&prt->state); + for (i = 0; i < max; i++) { + erts_aint32_t state; + Port *prt = erts_pix2port(i); + if (!prt) + continue; + state = erts_atomic32_read_acqb(&prt->state); if (state & (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP | ERTS_PORT_SFLG_HALT)) continue; - erts_smp_port_minor_lock(prt); + /* We need to set the halt flag - get the port lock */ -#ifdef ERTS_SMP - erts_smp_atomic32_inc_nob(&prt->common.refc); -#endif - erts_smp_port_minor_unlock(prt); -#ifdef ERTS_SMP - erts_smp_mtx_lock(prt->lock); -#endif - state = erts_smp_atomic32_read_acqb(&prt->state); - if (state & (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP - | ERTS_PORT_SFLG_HALT)) { - erts_port_release(prt); - continue; - } - state = erts_smp_atomic32_read_bor_relb(&prt->state, + + erts_smp_port_lock(prt); + + state = erts_atomic32_read_nob(&prt->state); + if (!(state & (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP + | ERTS_PORT_SFLG_HALT))) { + state = erts_atomic32_read_bor_relb(&prt->state, ERTS_PORT_SFLG_HALT); - erts_smp_atomic32_inc_nob(&erts_halt_progress); - if (state & (ERTS_PORT_SFLG_EXITING|ERTS_PORT_SFLG_CLOSING)) { - erts_port_release(prt); - continue; + erts_smp_atomic32_inc_nob(&erts_halt_progress); + if (!(state & (ERTS_PORT_SFLG_EXITING|ERTS_PORT_SFLG_CLOSING))) + erts_do_exit_port(prt, prt->common.id, am_killed); } - erts_do_exit_port(prt, prt->common.id, am_killed); + erts_port_release(prt); } if (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0) { @@ -4304,6 +4298,11 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) erts_smp_atomic32_init_relb(&erts_halt_progress, -1); erts_halt_code = 0; + +#if !defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) + erts_lc_set_thread_name("scheduler 1"); +#endif + } ErtsRunQueue * @@ -8112,7 +8111,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) ASSERT(mon->type == MON_TARGET); ASSERT(is_pid(mon->pid) || is_internal_port(mon->pid)); if (is_internal_port(mon->pid)) { - Port *prt = erts_id2port(mon->pid, NULL, 0); + Port *prt = erts_id2port(mon->pid); if (prt == NULL) { goto done; } @@ -8199,7 +8198,7 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext) switch(lnk->type) { case LINK_PID: if(is_internal_port(item)) { - Port *prt = erts_id2port(item, NULL, 0); + Port *prt = erts_id2port(item); if (prt) { rlnk = erts_remove_link(&ERTS_P_LINKS(prt), p->common.id); if (rlnk) @@ -8757,14 +8756,6 @@ stack_element_dump(int to, void *to_arg, Process* p, Eterm* sp, int yreg) return yreg; } -/* - * The processes/0 BIF. - */ -BIF_RETTYPE processes_0(BIF_ALIST_0) -{ - return erts_ptab_list(BIF_P, &erts_proc); -} - /* * A nice system halt closing all open port goes as follows: * 1) This function schedules the aux work ERTS_SSI_AUX_WORK_REAP_PORTS diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 6f882c5e57..f5b7921820 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -42,6 +42,9 @@ typedef struct process Process; #include "erl_process_lock.h" /* Only pull out important types... */ #undef ERTS_PROCESS_LOCK_ONLY_PROC_LOCK_TYPE__ +#define ERL_PORT_GET_PORT_TYPE_ONLY__ +#include "erl_port.h" +#undef ERL_PORT_GET_PORT_TYPE_ONLY__ #include "erl_vm.h" #include "erl_smp.h" #include "erl_message.h" @@ -66,7 +69,6 @@ typedef struct process Process; #undef ERL_THR_PROGRESS_TSD_TYPE_ONLY struct ErtsNodesMonitor_; -struct port; #define ERTS_MAX_NO_OF_SCHEDULERS 1024 @@ -378,8 +380,8 @@ struct ErtsRunQueue_ { struct { ErtsRunQueueInfo info; - struct port *start; - struct port *end; + Port *start; + Port *end; } ports; }; @@ -476,7 +478,7 @@ struct ErtsSchedulerData_ { ErtsSchedulerSleepInfo *ssi; Process *current_process; Uint no; /* Scheduler number */ - struct port *current_port; + Port *current_port; ErtsRunQueue *run_queue; int virtual_reds; int cpu_id; /* >= 0 when bound */ @@ -1219,7 +1221,7 @@ Eterm erts_sched_stat_term(Process *p, int total); void erts_free_proc(Process *); -void erts_suspend(Process*, ErtsProcLocks, struct port*); +void erts_suspend(Process*, ErtsProcLocks, Port*); void erts_resume(Process*, ErtsProcLocks); int erts_resume_processes(ErtsProcList *); diff --git a/erts/emulator/beam/erl_ptab.c b/erts/emulator/beam/erl_ptab.c index 8195a350fb..8cab1de9da 100644 --- a/erts/emulator/beam/erl_ptab.c +++ b/erts/emulator/beam/erl_ptab.c @@ -660,11 +660,9 @@ erts_ptab_delete_element(ErtsPTab *ptab, erts_ptab_rwunlock(ptab); } -#ifdef ERTS_SMP erts_schedule_thr_prgr_later_op(ptab->r.o.release_element, (void *) ptab_el, &ptab_el->u.release); -#endif } /* @@ -1151,16 +1149,18 @@ ptab_list_bif_engine(Process *c_p, Eterm *res_accp, Binary *mbp) conses = ptlbdp->pid_ix; } - hp = HAlloc(c_p, conses*2); - ERTS_PTAB_LIST_DBG_SAVE_HEAP_ALLOC(ptlbdp, hp, conses*2); + if (conses) { + hp = HAlloc(c_p, conses*2); + ERTS_PTAB_LIST_DBG_SAVE_HEAP_ALLOC(ptlbdp, hp, conses*2); - for (ix = ptlbdp->pid_ix - 1; ix >= min_ix; ix--) { - ERTS_PTAB_LIST_ASSERT(erts_ptab_is_valid_id(ptlbdp->pid[ix])); - res = CONS(hp, ptlbdp->pid[ix], res); - hp += 2; - } + for (ix = ptlbdp->pid_ix - 1; ix >= min_ix; ix--) { + ERTS_PTAB_LIST_ASSERT(erts_ptab_is_valid_id(ptlbdp->pid[ix])); + res = CONS(hp, ptlbdp->pid[ix], res); + hp += 2; + } - ERTS_PTAB_LIST_DBG_VERIFY_HEAP_ALLOC_USED(ptlbdp, hp); + ERTS_PTAB_LIST_DBG_VERIFY_HEAP_ALLOC_USED(ptlbdp, hp); + } ptlbdp->pid_ix = min_ix; if (min_ix == 0) diff --git a/erts/emulator/beam/erl_ptab.h b/erts/emulator/beam/erl_ptab.h index b65db330e5..b2ae829d62 100644 --- a/erts/emulator/beam/erl_ptab.h +++ b/erts/emulator/beam/erl_ptab.h @@ -53,8 +53,6 @@ typedef struct { Eterm id; #ifdef ERTS_SMP erts_atomic32_t refc; -#else - erts_smp_atomic32_t refc; /* Temporary solution during dev; to be removed! */ #endif Eterm tracer_proc; Uint trace_flags; @@ -73,9 +71,7 @@ typedef struct { } alive; /* --- While being released --- */ -#ifdef ERTS_SMP ErtsThrPrgrLaterOp release; -#endif } u; } ErtsPTabElementCommon; diff --git a/erts/emulator/beam/erl_sys_driver.h b/erts/emulator/beam/erl_sys_driver.h index d429d0ce96..b991a2840c 100644 --- a/erts/emulator/beam/erl_sys_driver.h +++ b/erts/emulator/beam/erl_sys_driver.h @@ -31,7 +31,6 @@ #define ERL_SYS_DRV typedef long ErlDrvEvent; /* An event to be selected on. */ -typedef long ErlDrvPort; /* A port descriptor. */ /* typedef struct _SysDriverOpts SysDriverOpts; defined in sys.h */ diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c index dd2a121165..991e573c14 100644 --- a/erts/emulator/beam/erl_trace.c +++ b/erts/emulator/beam/erl_trace.c @@ -3442,10 +3442,8 @@ sys_msg_dispatcher_func(void *unused) goto queue_proc_msg; } else if (is_internal_port(receiver)) { - port = erts_id2port_sflgs(receiver, - NULL, - 0, - ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); + port = erts_thr_id2port_sflgs(receiver, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); if (!port) goto failure; else { @@ -3459,7 +3457,7 @@ sys_msg_dispatcher_func(void *unused) #ifdef DEBUG_PRINTOUTS erts_fprintf(stderr, "delivered\n"); #endif - erts_port_release(port); + erts_thr_port_release(port); if (smqp->bp) free_message_buffer(smqp->bp); } diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index d93ac723a4..ae31ab54b5 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -884,11 +884,6 @@ int erts_global_garbage_collect(Process*, int, Eterm*, int); /* io.c */ -struct erl_drv_port_data_lock { - erts_mtx_t mtx; - erts_atomic_t refc; -}; - typedef struct { char *name; char *driver_name; @@ -901,22 +896,18 @@ typedef struct { int erts_add_driver_entry(ErlDrvEntry *drv, DE_Handle *handle, int driver_list_locked); void erts_destroy_driver(erts_driver_t *drv); void erts_wake_process_later(Port*, Process*); -int erts_open_driver(erts_driver_t*, Eterm, char*, SysDriverOpts*, int *); +Port *erts_open_driver(erts_driver_t*, Eterm, char*, SysDriverOpts*, int *, int *); int erts_is_port_ioq_empty(Port *); void erts_terminate_port(Port *); -void close_port(Eterm); void init_io(void); -void cleanup_io(void); void erts_do_exit_port(Port *, Eterm, Eterm); void erts_port_command(Process *, Eterm, Port *, Eterm); Eterm erts_port_control(Process*, Port*, Uint, Eterm); int erts_write_to_port(Eterm caller_id, Port *p, Eterm list); -void print_port_info(int, void *, int); void erts_raw_port_command(Port*, byte*, Uint); -void driver_report_exit(int, int); +void driver_report_exit(ErlDrvPort, int); LineBuf* allocate_linebuf(int); int async_ready(Port *, void*); -Sint erts_test_next_port(int, Uint); ErtsPortNames *erts_get_port_names(Eterm); void erts_free_port_names(ErtsPortNames *); Uint erts_port_ioq_size(Port *pp); diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index b772783f64..4dcec356a9 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -57,15 +57,14 @@ static erts_smp_tsd_key_t driver_list_lock_status_key; /*stop recursive locks wh static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error on a per thread basis (for BC interfaces) */ -Port* erts_port; /* The port table */ +ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */ erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */ erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */ -Uint erts_max_ports; -Uint erts_port_tab_index_mask; - const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL; +const Port erts_invalid_port = {{ERTS_INVALID_PORT}}; + erts_driver_t vanilla_driver; erts_driver_t spawn_driver; erts_driver_t fd_driver; @@ -89,36 +88,12 @@ static void driver_monitor_unlock_pdl(Port *p); static ERTS_INLINE ErlIOQueue* drvport2ioq(ErlDrvPort drvport) { - int ix = (int) drvport; - erts_aint32_t state; - - if (ix < 0 || erts_max_ports <= ix) - return NULL; - - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); - -#ifdef ERTS_ENABLE_LOCK_CHECK - - if (erts_get_scheduler_data()) { - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); - ERTS_LC_ASSERT(!erts_port[ix].port_data_lock - || erts_lc_mtx_is_locked( - &erts_port[ix].port_data_lock->mtx)); - } - else { - ERTS_LC_ASSERT((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) - || erts_port[ix].port_data_lock); - ERTS_LC_ASSERT(!erts_port[ix].port_data_lock - || erts_lc_mtx_is_locked( - &erts_port[ix].port_data_lock->mtx)); - } - -#endif - + Port *prt = erts_thr_drvport2port_raw(drvport); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) return NULL; else - return &erts_port[ix].ioq; + return &prt->ioq; } static ERTS_INLINE int @@ -196,27 +171,13 @@ typedef struct line_buf_context { dtrace_port_str((PORT), port_str); #endif -/* The 'number' field in a port now has two parts: the lowest bits - contain the index in the port table, and the higher bits are a counter - which is incremented each time we look for a free port and start from - the beginning of the table. erts_max_ports is the number of file descriptors, - rounded up to a power of 2. - To get the index from a port, use the macro 'internal_port_index'; - 'port_number' returns the whole number field. -*/ - -static erts_smp_spinlock_t get_free_port_lck; -static Uint last_port_num; -static Uint port_num_mask; -erts_smp_atomic32_t erts_ports_snapshot; /* Identifies the _next_ snapshot (not the ongoing) */ - - static ERTS_INLINE void kill_port(Port *pp) { ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); + erts_ptab_delete_element(&erts_port, &pp->common); /* Time of death */ erts_port_task_free_port(pp); - ASSERT(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD); + /* In non-smp case the port structure may have been deallocated now */ } #ifdef ERTS_SMP @@ -227,149 +188,194 @@ erts_lc_is_port_locked(Port *prt) { if (!prt) return 0; + ERTS_SMP_LC_ASSERT(prt->lock); return erts_smp_lc_mtx_is_locked(prt->lock); } #endif #endif /* #ifdef ERTS_SMP */ -static int -get_free_port(void) -{ - Uint num; - Uint tries = erts_max_ports; - Port* port; - - erts_smp_spin_lock(&get_free_port_lck); - num = last_port_num + 1; - for (;; ++num) { - erts_aint32_t act; - - port = &erts_port[num & erts_port_tab_index_mask]; - - act = erts_smp_atomic32_read_nob(&port->state); - - while (act & ERTS_PORT_SFLG_FREE) { - erts_aint32_t exp = act; - act = erts_smp_atomic32_cmpxchg_relb(&port->state, - ERTS_PORT_SFLG_INITIALIZING, - exp); - if (act == exp) { - last_port_num = num; - erts_smp_spin_unlock(&get_free_port_lck); - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 0); - erts_smp_atomic32_set_nob(&port->common.refc, 2); /* Port alive + lock */ - return num & port_num_mask; - } - } +static void initq(Port* prt); - if (--tries == 0) { - erts_smp_spin_unlock(&get_free_port_lck); - return -1; - } - } +static void insert_port_struct(void *vprt, Eterm data) +{ + Port *prt = (Port *) vprt; + Eterm id = make_internal_port(data); +#ifdef ERTS_SMP + ASSERT(prt->drv_ptr && prt->lock); + /* + * We are breaking lock order in the port specific locking + * case. This is, however, safe since the lock has not been + * published, yet. + */ + if (!prt->drv_ptr->lock) + erts_mtx_init_locked_x(prt->lock, "port_lock", id); +#endif + prt->common.id = id; + erts_atomic32_init_relb(&prt->state, ERTS_PORT_SFLG_INITIALIZING); } -/* - * erts_test_next_port() is only used for testing. - */ -Sint -erts_test_next_port(int set, Uint next) +static Port *create_port(char *name, + erts_driver_t *driver, + erts_mtx_t *driver_lock, + Eterm pid, + int *enop) { - Uint i, num; - Sint res = -1; + Port *prt; + char *p; + size_t port_size, size; + erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; +#ifdef DEBUG + /* Make sure the debug flags survives until port is freed */ + state |= ERTS_PORT_SFLG_PORT_DEBUG; +#endif - erts_smp_spin_lock(&get_free_port_lck); - if (set) { - last_port_num = (next - 1) & port_num_mask; +#ifdef ERTS_SMP + if (!driver_lock) { + /* Align size for mutex following port struct */ + port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); + size += sizeof(erts_mtx_t); } - num = last_port_num + 1; - - for (i=0; i < erts_max_ports && res<0; ++i, ++num) { - - Port* port = &erts_port[num & erts_port_tab_index_mask]; + else +#endif + port_size = size = sizeof(Port); - erts_aint32_t state = erts_smp_atomic32_read_nob(&port->state); + size += sys_strlen(name) + 1; - if (state & ERTS_PORT_SFLG_FREE) { - last_port_num = num - 1; - res = num & port_num_mask; - } + p = erts_alloc_fnf(ERTS_ALC_T_PORT, size); + if (!p) { + if (enop) + *enop = ENOMEM; + return NULL; } - erts_smp_spin_unlock(&get_free_port_lck); - return res; -} -static void port_cleanup(Port *prt); + prt = (Port *) p; + p += port_size; #ifdef ERTS_SMP + if (driver_lock) { + prt->lock = driver_lock; + erts_mtx_lock(driver_lock); + } + else { + prt->lock = (erts_mtx_t *) p; + p += sizeof(erts_mtx_t); + state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; + } + erts_smp_atomic_set_nob(&prt->run_queue, + (erts_aint_t) erts_get_runq_current(NULL)); + prt->xports = NULL; +#else + erts_atomic32_init_nob(&prt->refc, 1); + prt->cleanup = 0; +#endif + + prt->name = p; + sys_strcpy(p, name); + prt->drv_ptr = driver; + ERTS_P_LINKS(prt) = NULL; + ERTS_P_MONITORS(prt) = NULL; + prt->linebuf = NULL; + prt->bp = NULL; + prt->suspended = NULL; + prt->data = am_undefined; + prt->port_data_lock = NULL; + prt->control_flags = 0; + prt->bytes_in = 0; + prt->bytes_out = 0; + prt->dist_entry = NULL; + prt->connected = pid; + prt->common.u.alive.reg = NULL; +#ifdef ERTS_SMP + prt->common.u.alive.ptimer = NULL; +#else + sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer)); +#endif + erts_port_task_handle_init(&prt->timeout_task); + erts_port_task_init_sched(&prt->sched); + prt->psd = NULL; + prt->drv_data = (SWord) 0; -static void -sched_port_cleanup(void *vprt) -{ - Port *prt = (Port *) vprt; - erts_smp_mtx_lock(prt->lock); - port_cleanup(prt); -} + /* Set default tracing */ + erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt)); + + ASSERT(((char *) prt) == ((char *) &prt->common)); + if (!erts_ptab_new_element(&erts_port, + &prt->common, + (void *) prt, + insert_port_struct)) { +#ifdef ERTS_SMP + if (driver_lock) + erts_mtx_unlock(driver_lock); #endif + if (enop) + *enop = 0; + return NULL; + } + + ASSERT(prt == (Port *) (erts_ptab_pix2intptr_nob( + &erts_port, + internal_port_index(prt->common.id)))); + initq(prt); + + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + erts_atomic32_set_relb(&prt->state, state); + return prt; +} + +#ifndef ERTS_SMP void erts_port_cleanup(Port *prt) { -#ifdef ERTS_SMP - if (erts_smp_mtx_trylock(prt->lock) == EBUSY) - erts_schedule_misc_op(sched_port_cleanup, (void *) prt); - else -#endif - port_cleanup(prt); + if (prt->drv_ptr && prt->drv_ptr->handle) + erts_ddll_dereference_driver(prt->drv_ptr->handle); + prt->drv_ptr = NULL; + erts_port_dec_refc(prt); } +#endif void -port_cleanup(Port *prt) +erts_port_free(Port *prt) { -#ifdef ERTS_SMP - erts_smp_mtx_t *mtx; -#endif #if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK) - erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); #endif - erts_driver_t *driver; - - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - driver = prt->drv_ptr; - prt->drv_ptr = NULL; - ASSERT(driver); - - ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_FREE_SCHEDULED); - ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); + ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_FREE_SCHEDULED + | ERTS_PORT_SFLG_INITIALIZING)); + ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE)); - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) == 0); - #ifdef ERTS_SMP - mtx = prt->lock; - ASSERT(mtx); - - prt->lock = NULL; - - erts_smp_mtx_unlock(mtx); + ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0); +#else + ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0); #endif - erts_smp_atomic32_set_relb(&prt->state, ERTS_PORT_SFLG_FREE); - #ifdef ERTS_SMP - if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) { - erts_smp_mtx_destroy(mtx); - erts_free(ERTS_ALC_T_PORT_LOCK, mtx); - } -#endif + ASSERT(prt->lock); + if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) + erts_mtx_destroy(prt->lock); - if (driver->handle) - erts_ddll_dereference_driver(driver->handle); + /* + * We cannot dereference a driver using driver + * locking until here in smp case. Otherwise, + * the driver lock may still be in use by others. + * + * In the non-smp case we cannot do it here since + * this function may be called by non-scheduler + * threads. This is done in erts_port_cleanup() + * in the non-smp case. + */ + if (prt->drv_ptr->handle) + erts_ddll_dereference_driver(prt->drv_ptr->handle); +#endif + erts_atomic32_set_nob(&prt->state, ERTS_PORT_SFLG_FREE); + erts_free(ERTS_ALC_T_PORT, prt); } - /* ** Initialize v_start to point to the small fixed vector. ** Once (reallocated) we never reset the pointer to the small vector @@ -416,73 +422,7 @@ static void stopq(Port* prt) if (prt->port_data_lock) { driver_pdl_unlock(prt->port_data_lock); driver_pdl_dec_refc(prt->port_data_lock); - prt->port_data_lock = NULL; - } -} - - - -static void -setup_port(Port* prt, Eterm pid, erts_driver_t *driver, - ErlDrvData drv_data, char *name, erts_aint32_t xstate) -{ - ErtsRunQueue *runq = erts_get_runq_current(NULL); - char *new_name, *old_name; -#ifdef DEBUG - /* Make sure the debug flags survives until port is freed */ - xstate |= ERTS_PORT_SFLG_PORT_DEBUG; -#endif - ASSERT(runq); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - - new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1); - sys_strcpy(new_name, name); - erts_smp_runq_lock(runq); - prt->snapshot = erts_smp_atomic32_read_nob(&erts_ports_snapshot); - old_name = prt->name; - prt->name = new_name; -#ifdef ERTS_SMP - erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); -#endif - ASSERT(!prt->drv_ptr); - prt->drv_ptr = driver; - erts_smp_atomic32_set_relb(&prt->state, - ERTS_PORT_SFLG_CONNECTED | xstate); - erts_smp_runq_unlock(runq); -#ifdef ERTS_SMP - ASSERT(!prt->xports); -#endif - if (old_name) { - erts_free(ERTS_ALC_T_PORT_NAME, (void *) old_name); } - - prt->control_flags = 0; - prt->connected = pid; - prt->drv_data = (SWord) drv_data; - prt->bytes_in = 0; - prt->bytes_out = 0; - prt->dist_entry = NULL; - prt->common.u.alive.reg = NULL; -#ifdef ERTS_SMP - prt->common.u.alive.ptimer = NULL; -#else - sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer)); -#endif - erts_port_task_handle_init(&prt->timeout_task); - prt->suspended = NULL; - sys_strcpy(prt->name, name); - ERTS_P_LINKS(prt) = NULL; - ERTS_P_MONITORS(prt) = NULL; - prt->linebuf = NULL; - prt->bp = NULL; - prt->data = am_undefined; - /* Set default tracing */ - erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt)); - - prt->psd = NULL; - - initq(prt); } void @@ -493,7 +433,7 @@ erts_wake_process_later(Port *prt, Process *process) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (erts_smp_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) + if (erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) return; for (p = &(prt->suspended); *p != NULL; p = &((*p)->next)) @@ -513,36 +453,34 @@ erts_wake_process_later(Port *prt, Process *process) (*error_number_ptr must contain either BADARG or SYSTEM_LIMIT). The driver start function must obey the same conventions. */ -int +Port * erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ Eterm pid, /* Current process. */ char* name, /* Driver name. */ SysDriverOpts* opts, /* Options. */ - int *error_number_ptr) /* errno in case -2 is returned */ + int *error_type_ptr, /* error type */ + int *error_number_ptr) /* errno in case of error type -2 */ { - int port_num; - int port_ix; + +#undef ERTS_OPEN_DRIVER_RET +#define ERTS_OPEN_DRIVER_RET(Prt, EType, ENo) \ + do { \ + if (error_type_ptr) \ + *error_type_ptr = (EType); \ + if (error_number_ptr) \ + *error_number_ptr = (ENo); \ + return (Prt); \ + } while (0) + ErlDrvData drv_data = 0; - erts_aint32_t xstate = 0; Port *port; int fpe_was_unmasked; - - if (error_number_ptr) - *error_number_ptr = 0; + int error_type, error_number; + int port_errno = 0; + erts_mtx_t *driver_lock = NULL; ERTS_SMP_CHK_NO_PROC_LOCKS; - if ((port_num = get_free_port()) < 0) { - if (error_number_ptr) { - *error_number_ptr = SYSTEM_LIMIT; - } - return -3; - } - - port_ix = port_num & erts_port_tab_index_mask; - port = &erts_port[port_ix]; - port->common.id = make_internal_port(port_num); - erts_smp_mtx_lock(&erts_driver_list_lock); if (!driver) { for (driver = driver_list; driver; driver = driver->next) { @@ -551,9 +489,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } if (!driver) { erts_smp_mtx_unlock(&erts_driver_list_lock); - if (error_number_ptr) - *error_number_ptr = BADARG; - return -3; + ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG); } } if (driver == &spawn_driver) { @@ -598,32 +534,11 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ if (driver == NULL || (driver != &spawn_driver && opts->exit_status)) { erts_smp_mtx_unlock(&erts_driver_list_lock); - if (error_number_ptr) { - *error_number_ptr = BADARG; - } - /* Need to mark the port as free again */ - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 2); - erts_smp_atomic32_set_nob(&port->common.refc, 0); - erts_smp_atomic32_set_relb(&port->state, ERTS_PORT_SFLG_FREE); - return -3; + ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG); } - /* - * We'll set up the port before calling the start function, - * to allow message sending and setting timers in the start function. - */ - #ifdef ERTS_SMP - ASSERT(!port->lock); - port->lock = driver->lock; - if (!port->lock) { - port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK, - sizeof(erts_smp_mtx_t)); - erts_smp_mtx_init_x(port->lock, - "port_lock", - port->common.id); - xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; - } + driver_lock = driver->lock; #endif if (driver->handle != NULL) { @@ -632,18 +547,32 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } erts_smp_mtx_unlock(&erts_driver_list_lock); -#ifdef ERTS_SMP - erts_smp_mtx_lock(port->lock); -#endif + /* + * We'll set up the port before calling the start function, + * to allow message sending and setting timers in the start function. + */ - setup_port(port, pid, driver, drv_data, name, xstate); + port = create_port(name, driver, driver_lock, pid, &port_errno); + if (!port) { + if (driver->handle) { + erts_smp_mtx_lock(&erts_driver_list_lock); + erts_ddll_decrement_port_count(driver->handle); + erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_ddll_dereference_driver(driver->handle); + } + if (port_errno) + ERTS_OPEN_DRIVER_RET(NULL, -2, port_errno); + else + ERTS_OPEN_DRIVER_RET(NULL, -3, SYSTEM_LIMIT); + } if (IS_TRACED_FL(port, F_TRACE_PORTS)) { trace_port_open(port, pid, am_atom_put(port->name, strlen(port->name))); } - + + error_number = error_type = 0; if (driver->start) { if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(port, am_in, am_start); @@ -656,15 +585,28 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } #endif fpe_was_unmasked = erts_block_fpe(); - drv_data = (*driver->start)((ErlDrvPort)(port_ix), - name, opts); + drv_data = (*driver->start)((ErlDrvPort) port, name, opts); + if (((SWord) drv_data) == -1) + error_type = -1; + else if (((SWord) drv_data) == -2) { + /* + * We need to save errno quickly after the + * call to the 'start' callback before + * something else modify it. + */ + error_type = -2; + error_number = errno; + } + else if (((SWord) drv_data) == -3) { + error_type = -3; + error_number = BADARG; + } + erts_unblock_fpe(fpe_was_unmasked); port->caller = NIL; if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(port, am_out, am_start); } - if (error_number_ptr && ((SWord) drv_data) == (SWord) -2) - *error_number_ptr = errno; #ifdef ERTS_SMP if (port->xports) erts_smp_xports_unlock(port); @@ -672,15 +614,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ #endif } - if (((SWord)drv_data) == -1 || - ((SWord)drv_data) == -2 || - ((SWord)drv_data) == -3) { - int res = (int) ((SWord) drv_data); - - if (res == -3 && error_number_ptr) { - *error_number_ptr = BADARG; - } - + if (error_type) { /* * Must clean up the port. */ @@ -690,7 +624,6 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_cancel_timer(&(port->common.u.alive.tm)); #endif stopq(port); - kill_port(port); if (port->linebuf != NULL) { erts_free(ERTS_ALC_T_LINEBUF, (void *) port->linebuf); @@ -701,11 +634,14 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_ddll_decrement_port_count(driver->handle); erts_smp_mtx_unlock(&erts_driver_list_lock); } + kill_port(port); erts_port_release(port); - return res; + ERTS_OPEN_DRIVER_RET(NULL, error_type, error_number); } - port->drv_data = (SWord) drv_data; - return port_ix; + port->drv_data = (UWord) drv_data; + ERTS_OPEN_DRIVER_RET(port, 0, 0); + +#undef ERTS_OPEN_DRIVER_RET } #ifdef ERTS_SMP @@ -734,15 +670,21 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ Port* port; erts_driver_t *driver; Process *rp; - int port_num; - Eterm port_id; - erts_aint32_t xstate = 0; + erts_mtx_t *driver_lock = NULL; ERTS_SMP_CHK_NO_PROC_LOCKS; + /* Need to be called from a scheduler thread */ + if (!erts_get_scheduler_id()) + return ERTS_INVALID_ERL_DRV_PORT; + creator_port = erts_drvport2port(creator_port_ix, NULL); if (!creator_port) - return (ErlDrvTermData) -1; + return ERTS_INVALID_ERL_DRV_PORT; + + rp = erts_proc_lookup(pid); + if (!rp) + return ERTS_INVALID_ERL_DRV_PORT; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(creator_port)); @@ -750,55 +692,61 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ erts_smp_mtx_lock(&erts_driver_list_lock); if (!erts_ddll_driver_ok(driver->handle)) { erts_smp_mtx_unlock(&erts_driver_list_lock); - return (ErlDrvTermData) -1; + return ERTS_INVALID_ERL_DRV_PORT; } - rp = erts_pid2proc(NULL, 0, pid, ERTS_PROC_LOCK_LINK); - if (!rp) { - erts_smp_mtx_unlock(&erts_driver_list_lock); - return (ErlDrvTermData) -1; /* pid does not exist */ + if (driver->handle != NULL) { + erts_ddll_increment_port_count(driver->handle); + erts_ddll_reference_referenced_driver(driver->handle); + } + +#ifdef ERTS_SMP + driver_lock = driver->lock; +#endif + + erts_smp_mtx_unlock(&erts_driver_list_lock); + + port = create_port(name, driver, driver_lock, pid, NULL); + if (!port) { + if (driver->handle) { + erts_smp_mtx_lock(&erts_driver_list_lock); + erts_ddll_decrement_port_count(driver->handle); + erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_ddll_dereference_driver(driver->handle); + } + return ERTS_INVALID_ERL_DRV_PORT; } - if ((port_num = get_free_port()) < 0) { - errno = SYSTEM_LIMIT; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port)); + + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); + if (ERTS_PROC_IS_EXITING(rp)) { erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - erts_smp_mtx_unlock(&erts_driver_list_lock); - return (ErlDrvTermData) -1; + if (driver->handle) { + erts_smp_mtx_lock(&erts_driver_list_lock); + erts_ddll_decrement_port_count(driver->handle); + erts_smp_mtx_unlock(&erts_driver_list_lock); + } + kill_port(port); + erts_port_release(port); + return ERTS_INVALID_ERL_DRV_PORT; } - port_id = make_internal_port(port_num); - port = &erts_port[port_num & erts_port_tab_index_mask]; + erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid); + erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port->common.id); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); #ifdef ERTS_SMP - ASSERT(!port->lock); - port->lock = driver->lock; - if (!port->lock) { + if (!driver_lock) { ErtsXPortsList *xplp = xports_list_alloc(); xplp->port = port; xplp->next = creator_port->xports; creator_port->xports = xplp; - port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK, - sizeof(erts_smp_mtx_t)); - erts_smp_mtx_init_locked_x(port->lock, "port_lock", port_id); - xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } - #endif - if (driver->handle != NULL) { - erts_ddll_increment_port_count(driver->handle); - erts_ddll_reference_referenced_driver(driver->handle); - } - erts_smp_mtx_unlock(&erts_driver_list_lock); + port->drv_data = (UWord) drv_data; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port)); - - setup_port(port, pid, driver, drv_data, name, xstate); - port->common.id = port_id; - - erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid); - erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port_id); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - return port_num & erts_port_tab_index_mask; + return (ErlDrvPort) port; } #ifdef ERTS_SMP @@ -1282,15 +1230,22 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) } } +#ifdef ERTS_SMP +static void +release_port(void *vport) +{ + erts_port_dec_refc((Port *) vport); +} +#endif + /* initialize the port array */ void init_io(void) { - int i; ErlDrvEntry** dp; char maxports[21]; /* enough for any 64-bit integer */ size_t maxportssize = sizeof(maxports); Uint ports_bits = ERTS_PORTS_BITS; - Sint port_extra_shift; + int port_tab_size; #ifdef ERTS_SMP init_xports_list_alloc(); @@ -1299,66 +1254,39 @@ void init_io(void) pdl_init(); if (erts_sys_getenv("ERL_MAX_PORTS", maxports, &maxportssize) == 0) - erts_max_ports = atoi(maxports); + port_tab_size = atoi(maxports); else - erts_max_ports = sys_max_files(); + port_tab_size = sys_max_files(); - if (erts_max_ports > ERTS_MAX_PORTS) - erts_max_ports = ERTS_MAX_PORTS; - if (erts_max_ports < 1024) - erts_max_ports = 1024; + if (port_tab_size > ERTS_MAX_PORTS) + port_tab_size = ERTS_MAX_PORTS; + if (port_tab_size < 1024) + port_tab_size = 1024; if (erts_use_r9_pids_ports) { ports_bits = ERTS_R9_PORTS_BITS; - if (erts_max_ports > ERTS_MAX_R9_PORTS) - erts_max_ports = ERTS_MAX_R9_PORTS; + if (port_tab_size > ERTS_MAX_R9_PORTS) + port_tab_size = ERTS_MAX_R9_PORTS; } - port_extra_shift = erts_fit_in_bits_int32(erts_max_ports - 1); - port_num_mask = (1 << ports_bits) - 1; - - erts_port_tab_index_mask = ~(~((Uint) 0) << port_extra_shift); - erts_max_ports = 1 << port_extra_shift; - erts_smp_mtx_init(&erts_driver_list_lock,"driver_list"); driver_list = NULL; erts_smp_tsd_key_create(&driver_list_lock_status_key); erts_smp_tsd_key_create(&driver_list_last_error_key); - if (erts_max_ports * sizeof(Port) <= erts_max_ports) { - /* More memory needed than the whole address space. */ - erts_alloc_enomem(ERTS_ALC_T_PORT_TABLE, ~((Uint) 0)); - } - - erts_port = (Port *) erts_alloc(ERTS_ALC_T_PORT_TABLE, - erts_max_ports * sizeof(Port)); - - erts_smp_atomic_init_nob(&erts_bytes_out, 0); - erts_smp_atomic_init_nob(&erts_bytes_in, 0); - - for (i = 0; i < erts_max_ports; i++) { - erts_port_task_init_sched(&erts_port[i].sched); - erts_smp_atomic32_init_nob(&erts_port[i].common.refc, 0); + erts_ptab_init_table(&erts_port, + ERTS_ALC_T_PORT_TABLE, #ifdef ERTS_SMP - erts_port[i].lock = NULL; - erts_port[i].xports = NULL; - erts_smp_spinlock_init_x(&erts_port[i].state_lck, "port_state", make_small(i)); + release_port, +#else + NULL, #endif - ERTS_TRACER_PROC(&erts_port[i]) = NIL; - ERTS_TRACE_FLAGS(&erts_port[i]) = 0; - - erts_port[i].drv_ptr = NULL; - erts_smp_atomic32_init_nob(&erts_port[i].state, ERTS_PORT_SFLG_FREE); - erts_port[i].name = NULL; - ERTS_P_LINKS(&erts_port[i]) = NULL; - ERTS_P_MONITORS(&erts_port[i]) = NULL; - erts_port[i].linebuf = NULL; - erts_port[i].port_data_lock = NULL; - } + (ErtsPTabElementCommon *) &erts_invalid_port.common, + port_tab_size, + "port_table"); - erts_smp_atomic32_init_nob(&erts_ports_snapshot, (erts_aint32_t) 0); - last_port_num = 0; - erts_smp_spinlock_init(&get_free_port_lck, "get_free_port"); + erts_smp_atomic_init_nob(&erts_bytes_out, 0); + erts_smp_atomic_init_nob(&erts_bytes_in, 0); sys_init_io(); @@ -1769,7 +1697,7 @@ deliver_vec_message(Port* prt, /* Port */ if (!rp) return; - state = erts_smp_atomic32_read_nob(&prt->state); + state = erts_atomic32_read_nob(&prt->state); /* * Calculate the exact number of heap words needed. */ @@ -1907,7 +1835,7 @@ static void flush_port(Port *p) ASSERT(!p->xports); #endif } - if ((erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0 + if ((erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0 && is_port_ioq_empty(p)) { terminate_port(p); } @@ -1929,8 +1857,8 @@ terminate_port(Port *prt) ASSERT(!ERTS_P_MONITORS(prt)); /* state may be altered by kill_port() below */ - state = erts_smp_atomic32_read_band_nob(&prt->state, - ~ERTS_PORT_SFLG_SEND_CLOSED); + state = erts_atomic32_read_band_nob(&prt->state, + ~ERTS_PORT_SFLG_SEND_CLOSED); if (state & ERTS_PORT_SFLG_SEND_CLOSED) { send_closed_port_id = prt->common.id; connected_id = prt->connected; @@ -1981,6 +1909,8 @@ terminate_port(Port *prt) if (prt->psd) erts_free(ERTS_ALC_T_PRTSD, prt->psd); + ASSERT(prt->dist_entry == NULL); + kill_port(prt); /* @@ -1989,13 +1919,11 @@ terminate_port(Port *prt) */ if ((state & ERTS_PORT_SFLG_HALT) && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) { - erts_smp_port_unlock(prt); /* We will exit and never return */ + erts_port_release(prt); /* We will exit and never return */ erl_exit_flush_async(erts_halt_code, ""); } if (is_internal_port(send_closed_port_id)) deliver_result(send_closed_port_id, connected_id, am_closed); - - ASSERT(prt->dist_entry == NULL); } void @@ -2130,7 +2058,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) } #endif - state = erts_smp_atomic32_read_nob(&p->state); + state = erts_atomic32_read_nob(&p->state); if ((state & (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_EXITING | ERTS_PORT_SFLG_IMMORTAL)) @@ -2149,12 +2077,12 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) * Setting the port to not busy here, frees the list of pending * processes and makes them runnable. */ - set_busy_port((ErlDrvPort)internal_port_index(p->common.id), 0); + set_busy_port((ErlDrvPort) p, 0); if (p->common.u.alive.reg != NULL) (void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name); - state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); + state = erts_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); { SweepContext sc = {p->common.id, rreason}; @@ -2174,16 +2102,16 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) erts_do_net_exits(p->dist_entry, rreason); erts_deref_dist_entry(p->dist_entry); p->dist_entry = NULL; - erts_smp_atomic32_read_band_relb(&p->state, - ~ERTS_PORT_SFLG_DISTRIBUTION); + erts_atomic32_read_band_relb(&p->state, + ~ERTS_PORT_SFLG_DISTRIBUTION); } if ((reason != am_kill) && !is_port_ioq_empty(p)) { /* must turn exiting flag off */ - erts_smp_atomic32_read_bset_relb(&p->state, - (ERTS_PORT_SFLG_EXITING - | ERTS_PORT_SFLG_CLOSING), - ERTS_PORT_SFLG_CLOSING); + erts_atomic32_read_bset_relb(&p->state, + (ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_CLOSING), + ERTS_PORT_SFLG_CLOSING); flush_port(p); } else { @@ -2232,8 +2160,8 @@ void erts_port_command(Process *proc, if ((pid = port->connected) == tp[1]) { /* PID must be connected */ if (tp[2] == am_close) { - erts_smp_atomic32_read_bor_relb(&port->state, - ERTS_PORT_SFLG_SEND_CLOSED); + erts_atomic32_read_bor_relb(&port->state, + ERTS_PORT_SFLG_SEND_CLOSED); erts_do_exit_port(port, pid, am_normal); #ifdef USE_VM_PROBES @@ -2456,16 +2384,15 @@ static void prt_one_lnk(ErtsLink *lnk, void *vprtd) } void -print_port_info(int to, void *arg, int i) +print_port_info(Port *p, int to, void *arg) { - Port* p = &erts_port[i]; - erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); + erts_aint32_t state = erts_atomic32_read_nob(&p->state); if (state & ERTS_PORT_SFLGS_DEAD) return; erts_print(to, arg, "=port:%T\n", p->common.id); - erts_print(to, arg, "Slot: %d\n", i); + erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id)); if (state & ERTS_PORT_SFLG_CONNECTED) { erts_print(to, arg, "Connected: %T", p->connected); erts_print(to, arg, "\n"); @@ -2505,43 +2432,46 @@ print_port_info(int to, void *arg, int i) void set_busy_port(ErlDrvPort port_num, int on) { + Port *prt; #ifdef USE_VM_PROBES DTRACE_CHARBUF(port_str, 16); #endif ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num])); + prt = erts_drvport2port_raw(port_num); + if (!prt) + return; if (on) { - erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state, - ERTS_PORT_SFLG_PORT_BUSY); + erts_atomic32_read_bor_relb(&prt->state, + ERTS_PORT_SFLG_PORT_BUSY); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { erts_snprintf(port_str, sizeof(port_str), - "%T", erts_port[port_num].id); + "%T", prt->id); DTRACE1(port_busy, port_str); } #endif } else { - ErtsProcList* plp = erts_port[port_num].suspended; - erts_smp_atomic32_read_band_relb(&erts_port[port_num].state, - ~ERTS_PORT_SFLG_PORT_BUSY); - erts_port[port_num].suspended = NULL; + ErtsProcList* plp = prt->suspended; + erts_atomic32_read_band_relb(&prt->state, + ~ERTS_PORT_SFLG_PORT_BUSY); + prt->suspended = NULL; #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_not_busy)) { erts_snprintf(port_str, sizeof(port_str), - "%T", erts_port[port_num].id); + "%T", prt->id); DTRACE1(port_not_busy, port_str); } #endif - if (erts_port[port_num].dist_entry) { + if (prt->dist_entry) { /* * Processes suspended on distribution ports are * normally queued on the dist entry. */ - erts_dist_port_not_busy(&erts_port[port_num]); + erts_dist_port_not_busy(prt); } /* @@ -2587,10 +2517,9 @@ set_busy_port(ErlDrvPort port_num, int on) void set_port_control_flags(ErlDrvPort port_num, int flags) { - - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num])); - - erts_port[port_num].control_flags = flags; + Port *prt = erts_drvport2port_raw(port_num); + if (prt) + prt->control_flags = flags; } int get_port_flags(ErlDrvPort ix) @@ -2649,7 +2578,7 @@ int async_ready(Port *p, void* data) ERTS_SMP_CHK_NO_PROC_LOCKS; if (p) { - erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); + erts_aint32_t state = erts_atomic32_read_nob(&p->state); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); ASSERT(!(state & ERTS_PORT_SFLGS_DEAD)); if (p->drv_ptr->ready_async != NULL) { @@ -2697,7 +2626,7 @@ erts_stale_drv_select(Eterm port, int deselect) { char *type; - ErlDrvPort drv_port = internal_port_index(port); + ErlDrvPort drv_port = (ErlDrvPort) erts_port_lookup_raw(port); ErtsPortNames *pnp = erts_get_port_names(port); erts_dsprintf_buf_t *dsbufp; @@ -2737,16 +2666,16 @@ erts_stale_drv_select(Eterm port, ErtsPortNames * erts_get_port_names(Eterm id) { + Port *prt = erts_port_lookup_raw(id); ErtsPortNames *pnp; ASSERT(is_nil(id) || is_internal_port(id)); - - if (is_not_internal_port(id)) { + + if (!prt) { pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, sizeof(ErtsPortNames)); pnp->name = NULL; pnp->driver_name = NULL; } else { - Port* prt = &erts_port[internal_port_index(id)]; int do_realloc = 1; int len = -1; size_t pnp_len = sizeof(ErtsPortNames); @@ -2762,17 +2691,10 @@ erts_get_port_names(Eterm id) pnp_len = sizeof(ErtsPortNames) + len; pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len); } - erts_smp_port_minor_lock(prt); - if (id != prt->common.id) { - len = nlen = 0; - name = driver_name = NULL; - } - else { - name = prt->name; - len = nlen = name ? sys_strlen(name) + 1 : 0; - driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL); - len += driver_name ? sys_strlen(driver_name) + 1 : 0; - } + name = prt->name; + len = nlen = name ? sys_strlen(name) + 1 : 0; + driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL); + len += driver_name ? sys_strlen(driver_name) + 1 : 0; if (len <= pnp_len - sizeof(ErtsPortNames)) { if (!name) pnp->name = NULL; @@ -2790,7 +2712,6 @@ erts_get_port_names(Eterm id) } do_realloc = 0; } - erts_smp_port_minor_unlock(prt); } while (do_realloc); } return pnp; @@ -2827,7 +2748,7 @@ ErlDrvTermData driver_mk_term_nil(void) return driver_term_nil; } -void driver_report_exit(int ix, int status) +void driver_report_exit(ErlDrvPort ix, int status) { Port* prt = erts_drvport2port(ix, NULL); Eterm* hp; @@ -2868,27 +2789,6 @@ void driver_report_exit(int ix, int status) erts_smp_proc_dec_refc(rp); } - -static ERTS_INLINE int -deliver_term_check_port(ErlDrvPort drvport) -{ - int res; - int ix = (int) drvport; - if (ix < 0 || erts_max_ports <= ix) - res = -1; /* invalid */ - else { - Port* prt = &erts_port[ix]; - erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state); - if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) - res = 1; /* ok */ - else if (state & ERTS_PORT_SFLG_CLOSING) - res = 0; /* closing */ - else - res = -1; /* invalid (dead) */ - } - return res; -} - #define ERTS_B2T_STATES_DEF_STATES_SZ 5 #define ERTS_B2T_STATES_DEF_STATES_INC 100 @@ -2976,10 +2876,7 @@ cleanup_b2t_states(struct b2t_states__ *b2tsp) */ static int -driver_deliver_term(ErlDrvPort port, - Eterm to, - ErlDrvTermData* data, - int len) +driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) { #define ERTS_DDT_FAIL do { res = -1; goto done; } while (0) Uint need = 0; @@ -3181,11 +3078,8 @@ driver_deliver_term(ErlDrvPort port, b2t.ix = 0; /* - * The term is OK. Go ahead and validate the port and process. + * The term is OK. Go ahead and validate the process. */ - res = deliver_term_check_port(port); - if (res <= 0) - goto done; /* * Increase refc on proc if done from a non-scheduler thread. @@ -3449,25 +3343,29 @@ driver_deliver_term(ErlDrvPort port, #undef ERTS_DDT_FAIL } - int -driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len) +driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len) { - Port* prt = erts_drvport2port(ix, NULL); - + erts_aint32_t state; + Port* prt = erts_drvport2port(drvport, &state); + if (!prt) + return -1; /* invalid (dead) */ ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - if (prt == NULL) - return -1; - return driver_deliver_term(ix, prt->connected, data, len); + if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) + return driver_deliver_term(prt->connected, data, len); + else if (state & ERTS_PORT_SFLG_CLOSING) + return 0; /* closing */ + else + return -1; /* invalid (dead) */ } int driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len) { - return driver_deliver_term(ix, to, data, len); + /* driver_send_term() assume port is ok... */ + return driver_deliver_term(to, data, len); } @@ -3817,6 +3715,7 @@ static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl) { ERTS_LC_ASSERT(driver_pdl_get_refc(pdl) == 0); erts_mtx_destroy(&pdl->mtx); + erts_port_dec_refc(pdl->prt); erts_free(ERTS_ALC_T_PORT_DATA_LOCK, pdl); } @@ -3861,6 +3760,8 @@ driver_pdl_create(ErlDrvPort dp) sizeof(struct erl_drv_port_data_lock)); erts_mtx_init(&pdl->mtx, "port_data_lock"); pdl_init_refc(pdl); + erts_port_inc_refc(pp); + pdl->prt = pp; pp->port_data_lock = pdl; #ifdef HARDDEBUG erts_fprintf(stderr, "driver_pdl_create(%T) -> 0x%08X\r\n",pp->common.id,(unsigned) pdl); @@ -4317,7 +4218,7 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t) if (prt == NULL) return -1; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + if (prt->drv_ptr->timeout == NULL) return -1; drv_cancel_timer(prt); @@ -4424,7 +4325,7 @@ static int do_driver_monitor_process(Port *prt, /* * This can be called from a non scheduler thread iff a port_data_lock exists */ -int driver_monitor_process(ErlDrvPort port, +int driver_monitor_process(ErlDrvPort drvport, ErlDrvTermData process, ErlDrvMonitor *monitor) { @@ -4434,15 +4335,12 @@ int driver_monitor_process(ErlDrvPort port, #if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif - int ix = (int) port; - if (ix < 0 || erts_max_ports <= ix) { - return -1; - } - prt = &erts_port[ix]; + + prt = erts_thr_drvport2port_raw(drvport); DRV_MONITOR_LOCK_PDL(prt); - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); @@ -4509,7 +4407,7 @@ static int do_driver_demonitor_process(Port *prt, Eterm *buf, return 0; } -int driver_demonitor_process(ErlDrvPort port, +int driver_demonitor_process(ErlDrvPort drvport, const ErlDrvMonitor *monitor) { Port *prt; @@ -4518,15 +4416,12 @@ int driver_demonitor_process(ErlDrvPort port, #if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif - int ix = (int) port; - if (ix < 0 || erts_max_ports <= ix) { - return -1; - } - prt = &erts_port[ix]; + + prt = erts_thr_drvport2port_raw(drvport); DRV_MONITOR_LOCK_PDL(prt); - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); @@ -4575,7 +4470,7 @@ static ErlDrvTermData do_driver_get_monitored_process(Port *prt, Eterm *buf, } -ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, +ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport, const ErlDrvMonitor *monitor) { Port *prt; @@ -4584,15 +4479,12 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, #if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif - int ix = (int) port; - if (ix < 0 || erts_max_ports <= ix) { - return driver_term_nil; - } - prt = &erts_port[ix]; + + prt = erts_thr_drvport2port_raw(drvport); DRV_MONITOR_LOCK_PDL(prt); - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); return driver_term_nil; @@ -4711,8 +4603,6 @@ int driver_exit(ErlDrvPort ix, int err) if (prt == NULL) return -1; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK); if (rp) { rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id); @@ -5007,7 +4897,7 @@ no_event_callback(ErlDrvData drv_data, ErlDrvEvent event, ErlDrvEventData event_ { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Event", "event()"); - driver_event((ErlDrvPort) internal_port_index(prt->common.id), event, NULL); + driver_event((ErlDrvPort) prt, event, NULL); } static void @@ -5015,7 +4905,7 @@ no_ready_input_callback(ErlDrvData drv_data, ErlDrvEvent event) { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Input", "ready_input()"); - driver_select((ErlDrvPort) internal_port_index(prt->common.id), event, + driver_select((ErlDrvPort) prt, event, (ERL_DRV_READ | ERL_DRV_USE_NO_CALLBACK), 0); } @@ -5024,7 +4914,7 @@ no_ready_output_callback(ErlDrvData drv_data, ErlDrvEvent event) { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Output", "ready_output()"); - driver_select((ErlDrvPort) internal_port_index(prt->common.id), event, + driver_select((ErlDrvPort) prt, event, (ERL_DRV_WRITE | ERL_DRV_USE_NO_CALLBACK), 0); } @@ -5059,13 +4949,13 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle) drv->lock = NULL; else { drv->lock = erts_alloc(ERTS_ALC_T_DRIVER_LOCK, - sizeof(erts_smp_mtx_t)); - erts_smp_mtx_init_x(drv->lock, - "driver_lock", + sizeof(erts_mtx_t)); + erts_mtx_init_x(drv->lock, + "driver_lock", #if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT) - am_atom_put(drv->name, sys_strlen(drv->name)) + am_atom_put(drv->name, sys_strlen(drv->name)) #else - NIL + NIL #endif ); } diff --git a/erts/emulator/beam/register.c b/erts/emulator/beam/register.c index feae745749..757e2800e6 100644 --- a/erts/emulator/beam/register.c +++ b/erts/emulator/beam/register.c @@ -182,7 +182,7 @@ int erts_register_name(Process *c_p, Eterm name, Eterm id) return res; erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); if (is_internal_port(id)) { - port = erts_id2port(id, NULL, 0); + port = erts_id2port(id); if (!port) goto done; } @@ -241,7 +241,7 @@ int erts_register_name(Process *c_p, Eterm name, Eterm id) done: reg_write_unlock(); if (port) - erts_smp_port_unlock(port); + erts_port_release(port); if (c_p != proc) { if (proc) erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_MAIN); @@ -397,16 +397,14 @@ erts_whereis_name(Process *c_p, if (!rp || !rp->pt) *port = NULL; else { -#ifndef ERTS_SMP - erts_smp_atomic32_inc_nob(&rp->pt->common.refc); -#else +#ifdef ERTS_SMP if (pending_port == rp->pt) pending_port = NULL; else { if (pending_port) { /* Ahh! Registered port changed while reg lock was unlocked... */ - erts_smp_port_unlock(pending_port); + erts_port_release(pending_port); pending_port = NULL; } @@ -418,7 +416,7 @@ erts_whereis_name(Process *c_p, current_c_p_locks = 0; } reg_read_unlock(); - pending_port = erts_id2port(id, NULL, 0); + pending_port = erts_id2port(id); goto restart; } } @@ -432,7 +430,7 @@ erts_whereis_name(Process *c_p, if (c_p && !current_c_p_locks) erts_smp_proc_lock(c_p, c_p_locks); if (pending_port) - erts_smp_port_unlock(pending_port); + erts_port_release(pending_port); #endif reg_read_unlock(); @@ -506,12 +504,10 @@ int erts_unregister_name(Process *c_p, if ((rp = (RegProc*) hash_get(&process_reg, (void*) &r)) != NULL) { if (rp->pt) { if (port != rp->pt) { -#ifndef ERTS_SMP - erts_smp_atomic32_inc_nob(&rp->pt->common.refc); -#else +#ifdef ERTS_SMP if (port) { ASSERT(port != c_prt); - erts_smp_port_unlock(port); + erts_port_release(port); port = NULL; } @@ -523,7 +519,7 @@ int erts_unregister_name(Process *c_p, current_c_p_locks = 0; } reg_write_unlock(); - port = erts_id2port(id, NULL, 0); + port = erts_id2port(id); goto restart; } #endif @@ -569,7 +565,7 @@ int erts_unregister_name(Process *c_p, reg_write_unlock(); if (c_prt != port) { if (port) { - erts_smp_port_unlock(port); + erts_port_release(port); } if (c_prt) { erts_smp_port_lock(c_prt); diff --git a/erts/emulator/beam/register.h b/erts/emulator/beam/register.h index 38e8cfbf28..7170463375 100644 --- a/erts/emulator/beam/register.h +++ b/erts/emulator/beam/register.h @@ -24,26 +24,19 @@ #ifndef __REGPROC_H__ #define __REGPROC_H__ -#ifndef __SYS_H__ #include "sys.h" -#endif - -#ifndef __HASH_H__ #include "hash.h" -#endif - -#ifndef __PROCESS_H__ #include "erl_process.h" -#endif - -struct port; +#define ERL_PORT_GET_PORT_TYPE_ONLY__ +#include "erl_port.h" +#undef ERL_PORT_GET_PORT_TYPE_ONLY__ typedef struct reg_proc { HashBucket bucket; /* MUST BE LOCATED AT TOP OF STRUCT!!! */ Process *p; /* The process registered (only one of this and 'pt' is non-NULL */ - struct port *pt; /* The port registered */ + Port *pt; /* The port registered */ Eterm name; /* Atom name */ } RegProc; @@ -55,12 +48,12 @@ int erts_register_name(Process *, Eterm, Eterm); Eterm erts_whereis_name_to_id(Process *, Eterm); void erts_whereis_name(Process *, ErtsProcLocks, Eterm, Process**, ErtsProcLocks, int, - struct port**); + Port**); Process *erts_whereis_process(Process *, ErtsProcLocks, Eterm, ErtsProcLocks, int); -int erts_unregister_name(Process *, ErtsProcLocks, struct port *, Eterm); +int erts_unregister_name(Process *, ErtsProcLocks, Port *, Eterm); #endif diff --git a/erts/emulator/hipe/hipe_bif_list.m4 b/erts/emulator/hipe/hipe_bif_list.m4 index 942fa0c5cb..8a430bf561 100644 --- a/erts/emulator/hipe/hipe_bif_list.m4 +++ b/erts/emulator/hipe/hipe_bif_list.m4 @@ -145,6 +145,7 @@ * Zero-arity BIFs that can fail. */ standard_bif_interface_0(nbif_processes_0, processes_0) +standard_bif_interface_0(nbif_ports_0, ports_0) /* * BIFs and primops that may do a GC (change heap limit and walk the native stack). diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 5d814e0164..9ef861bfe4 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -201,17 +201,6 @@ static void event_large_fd_error(ErlDrvPort, ErtsSysFdType, ErlDrvEventData); #endif static void steal_pending_stop_select(erts_dsprintf_buf_t*, ErlDrvPort, ErtsDrvEventState*, int mode, int on); -static ERTS_INLINE Eterm -drvport2id(ErlDrvPort dp) -{ - Port *pp = erts_drvport2port(dp, NULL); - if (pp) - return pp->common.id; - else { - ASSERT(0); - return am_undefined; - } -} #ifdef ERTS_SMP ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST) @@ -491,7 +480,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, int on) { void (*stop_select_fn)(ErlDrvEvent, void*) = NULL; - Eterm id = drvport2id(ix); + Eterm id = erts_drvport2id(ix); ErtsSysFdType fd = (ErtsSysFdType) e; ErtsPollEvents ctl_events = (ErtsPollEvents) 0; ErtsPollEvents new_events, old_events; @@ -718,7 +707,7 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix, ErtsPollEvents events; ErtsPollEvents add_events; ErtsPollEvents remove_events; - Eterm id = drvport2id(ix); + Eterm id = erts_drvport2id(ix); ErtsDrvEventState *state; int do_wake = 0; int ret; diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c index 57ffb9f151..ea98af26bb 100644 --- a/erts/emulator/sys/unix/sys.c +++ b/erts/emulator/sys/unix/sys.c @@ -1023,7 +1023,8 @@ void fini_getenv_state(GETENV_STATE *state) /* This data is shared by these drivers - initialized by spawn_init() */ static struct driver_data { - int port_num, ofd, packet_bytes; + ErlDrvPort port_num; + int ofd, packet_bytes; ErtsSysReportExit *report_exit; int pid; int alive; @@ -1137,7 +1138,7 @@ static RETSIGTYPE onchld(int signum) #endif } -static int set_driver_data(int port_num, +static int set_driver_data(ErlDrvPort port_num, int ifd, int ofd, int packet_bytes, @@ -1153,7 +1154,7 @@ static int set_driver_data(int port_num, report_exit = erts_alloc(ERTS_ALC_T_PRT_REP_EXIT, sizeof(ErtsSysReportExit)); report_exit->next = report_exit_list; - report_exit->port = erts_port[port_num].common.id; + report_exit->port = erts_drvport2id(port_num); report_exit->pid = pid; report_exit->ifd = read_write & DO_READ ? ifd : -1; report_exit->ofd = read_write & DO_WRITE ? ofd : -1; @@ -1234,7 +1235,7 @@ static void close_pipes(int ifd[2], int ofd[2], int read_write) } } -static void init_fd_data(int fd, int prt) +static void init_fd_data(int fd, ErlDrvPort port_num) { fd_data[fd].buf = NULL; fd_data[fd].cpos = NULL; @@ -1922,7 +1923,7 @@ static void clear_fd_data(int fd) fd_data[fd].psz = 0; } -static void nbio_stop_fd(int prt, int fd) +static void nbio_stop_fd(ErlDrvPort prt, int fd) { driver_select(prt,fd,DO_READ|DO_WRITE,0); clear_fd_data(fd); @@ -1970,7 +1971,8 @@ static ErlDrvData vanilla_start(ErlDrvPort port_num, char* name, static void stop(ErlDrvData fd) { - int prt, ofd; + ErlDrvPort prt; + int ofd; prt = driver_data[(int)(long)fd].port_num; nbio_stop_fd(prt, (int)(long)fd); @@ -1983,7 +1985,7 @@ static void stop(ErlDrvData fd) CHLD_STAT_LOCK; - /* Mark as unused. Maybe resetting the 'port_num' slot is better? */ + /* Mark as unused. */ driver_data[(int)(long)fd].pid = -1; CHLD_STAT_UNLOCK; @@ -1999,7 +2001,7 @@ static void stop(ErlDrvData fd) static void outputv(ErlDrvData e, ErlIOVec* ev) { int fd = (int)(long)e; - int ix = driver_data[fd].port_num; + ErlDrvPort ix = driver_data[fd].port_num; int pb = driver_data[fd].packet_bytes; int ofd = driver_data[fd].ofd; ssize_t n; @@ -2049,7 +2051,7 @@ static void outputv(ErlDrvData e, ErlIOVec* ev) static void output(ErlDrvData e, char* buf, ErlDrvSizeT len) { int fd = (int)(long)e; - int ix = driver_data[fd].port_num; + ErlDrvPort ix = driver_data[fd].port_num; int pb = driver_data[fd].packet_bytes; int ofd = driver_data[fd].ofd; ssize_t n; @@ -2100,7 +2102,7 @@ static void output(ErlDrvData e, char* buf, ErlDrvSizeT len) return; /* 0; */ } -static int port_inp_failure(int port_num, int ready_fd, int res) +static int port_inp_failure(ErlDrvPort port_num, int ready_fd, int res) /* Result: 0 (eof) or -1 (error) */ { int err = errno; @@ -2150,7 +2152,7 @@ static int port_inp_failure(int port_num, int ready_fd, int res) static void ready_input(ErlDrvData e, ErlDrvEvent ready_fd) { int fd = (int)(long)e; - int port_num; + ErlDrvPort port_num; int packet_bytes; int res; Uint h; @@ -2273,7 +2275,7 @@ static void ready_input(ErlDrvData e, ErlDrvEvent ready_fd) static void ready_output(ErlDrvData e, ErlDrvEvent ready_fd) { int fd = (int)(long)e; - int ix = driver_data[fd].port_num; + ErlDrvPort ix = driver_data[fd].port_num; int n; struct iovec* iv; int vsize; @@ -2558,19 +2560,20 @@ report_exit_status(ErtsSysReportExit *rep, int status) Port *pp; #ifdef ERTS_SMP CHLD_STAT_UNLOCK; -#endif + pp = erts_thr_id2port_sflgs(rep->port, + ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); + CHLD_STAT_LOCK; +#else pp = erts_id2port_sflgs(rep->port, NULL, 0, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); -#ifdef ERTS_SMP - CHLD_STAT_LOCK; #endif if (pp) { if (rep->ifd >= 0) { driver_data[rep->ifd].alive = 0; driver_data[rep->ifd].status = status; - (void) driver_select((ErlDrvPort) internal_port_index(pp->common.id), + (void) driver_select((ErlDrvPort) pp, rep->ifd, (ERL_DRV_READ|ERL_DRV_USE), 1); @@ -2578,12 +2581,16 @@ report_exit_status(ErtsSysReportExit *rep, int status) if (rep->ofd >= 0) { driver_data[rep->ofd].alive = 0; driver_data[rep->ofd].status = status; - (void) driver_select((ErlDrvPort) internal_port_index(pp->common.id), + (void) driver_select((ErlDrvPort) pp, rep->ofd, (ERL_DRV_WRITE|ERL_DRV_USE), 1); } +#ifdef ERTS_SMP + erts_thr_port_release(pp); +#else erts_port_release(pp); +#endif } erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep); } diff --git a/erts/emulator/sys/vxworks/sys.c b/erts/emulator/sys/vxworks/sys.c index 739b026fb1..205c123ffc 100644 --- a/erts/emulator/sys/vxworks/sys.c +++ b/erts/emulator/sys/vxworks/sys.c @@ -522,7 +522,8 @@ static Uint tmp_buf_size; /* This data is shared by these drivers - initialized by spawn_init() */ static struct driver_data { - int port_num, ofd, packet_bytes, report_exit; + ErlDrvPort port_num; + int ofd, packet_bytes, report_exit; int exitcode, exit_reported; /* For returning of exit codes. */ } *driver_data; /* indexed by fd */ @@ -678,7 +679,7 @@ static int pre_set_driver_data(int ifd, int ofd, ** Set up the driver_data structure, it may have been initiated ** partly by the function above, but we dont care. */ -static int set_driver_data(int port_num, int ifd, int ofd, +static int set_driver_data(ErlDrvPort port_num, int ifd, int ofd, int packet_bytes, int read_write, int report_exit) { @@ -908,7 +909,7 @@ static void close_pipes(int ifd[2], int ofd[2], int read_write) } } -static void init_fd_data(int fd, int port_unused_argument) +static void init_fd_data(int fd, ErlDrvPort port_unused_argument) { SET_NONBLOCKING(fd); fd_data[fd].pending = NULL; @@ -1062,7 +1063,7 @@ static void clear_fd_data(int fd) fd_data[fd].cpos = NULL; } -static void nbio_stop_fd(int port_num, int fd) +static void nbio_stop_fd(ErlDrvPort port_num, int fd) { Pend *p, *p1; @@ -1132,7 +1133,8 @@ vanilla_start(ErlDrvPort port_num, char *name, SysDriverOpts* opts) static void stop(ErlDrvData drv_data) { - int port_num, ofd; + ErlDrvPort port_num; + int ofd; int fd = (int) drv_data; port_num = driver_data[fd].port_num; @@ -1146,7 +1148,7 @@ static void stop(ErlDrvData drv_data) } } -static int sched_write(int port_num,int fd, char *buf, int len, int pb) +static int sched_write(ErlDrvPort port_num,int fd, char *buf, int len, int pb) { Pend *p, *p2, *p3; int p_bytes = len; @@ -1189,7 +1191,8 @@ static int sched_write(int port_num,int fd, char *buf, int len, int pb) /* Fd is the value returned as drv_data by the start func */ static void output(ErlDrvData drv_data, char *buf, ErlDrvSizeT len) { - int buf_done, port_num, wval, pb, ofd; + ErlDrvPort port_num; + int buf_done, wval, pb, ofd; byte lb[4]; struct iovec iv[2]; int fd = (int) drv_data; @@ -1271,7 +1274,7 @@ static int ensure_header(int fd,char *buf,int packet_size, int sofar) return(res); } -static int port_inp_failure(int port_num, int ready_fd, int res) +static int port_inp_failure(ErlDrvPort port_num, int ready_fd, int res) { (void) driver_select(port_num, ready_fd, ERL_DRV_READ|ERL_DRV_WRITE, 0); clear_fd_data(ready_fd); @@ -1302,7 +1305,8 @@ static int port_inp_failure(int port_num, int ready_fd, int res) static void ready_input(ErlDrvData drv_data, ErlDrvEvent drv_event) { - int port_num, packet_bytes, res; + ErlDrvPort port_num; + int packet_bytes, res; Uint h = 0; char *buf; int fd = (int) drv_data; diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c index b106f0932d..c4861b92a2 100755 --- a/erts/emulator/sys/win32/sys.c +++ b/erts/emulator/sys/win32/sys.c @@ -87,9 +87,6 @@ static erts_smp_tsd_key_t win32_errstr_key; static erts_smp_atomic_t pipe_creation_counter; -static erts_smp_mtx_t sys_driver_data_lock; - - /* Results from application_type(_w) is one of */ #define APPL_NONE 0 #define APPL_DOS 1 @@ -97,7 +94,6 @@ static erts_smp_mtx_t sys_driver_data_lock; #define APPL_WIN32 3 static int driver_write(long, HANDLE, byte*, int); -static void common_stop(int); static int create_file_thread(struct async_io* aio, int mode); #ifdef ERTS_SMP static void close_active_handle(ErlDrvPort, HANDLE handle); @@ -115,9 +111,6 @@ BOOL WINAPI ctrl_handler(DWORD dwCtrlType); #define PORT_BUFSIZ 4096 -#define PORT_FREE (-1) -#define PORT_EXITING (-2) - #define DRV_BUF_ALLOC(SZ) \ erts_alloc_fnf(ERTS_ALC_T_DRV_DATA_BUF, (SZ)) #define DRV_BUF_REALLOC(P, SZ) \ @@ -454,7 +447,7 @@ typedef struct driver_data { byte *inbuf; /* Buffer to use for overlapped read. */ int outBufSize; /* Size of output buffer. */ byte *outbuf; /* Buffer to use for overlapped write. */ - ErlDrvPort port_num; /* The port number. */ + ErlDrvPort port_num; /* The port handle. */ int packet_bytes; /* 0: continous stream, 1, 2, or 4: the number * of bytes in the packet header. */ @@ -464,8 +457,6 @@ typedef struct driver_data { int report_exit; /* Do report exit status for the port */ } DriverData; -static DriverData* driver_data; /* Pointer to array of driver data. */ - /* Driver interfaces */ static ErlDrvData spawn_start(ErlDrvPort, char*, SysDriverOpts*); static ErlDrvData fd_start(ErlDrvPort, char*, SysDriverOpts*); @@ -577,67 +568,53 @@ struct erl_drv_entry vanilla_driver_entry = { */ static DriverData* -new_driver_data(int port_num, int packet_bytes, int wait_objs_required, int use_threads) +new_driver_data(ErlDrvPort port_num, int packet_bytes, int wait_objs_required, int use_threads) { DriverData* dp; - - erts_smp_mtx_lock(&sys_driver_data_lock); - DEBUGF(("new_driver_data(port_num %d, pb %d)\n", - port_num, packet_bytes)); + DEBUGF(("new_driver_data(%p, pb %d)\n", port_num, packet_bytes)); + dp = driver_alloc(sizeof(DriverData)); + if (!dp) + return NULL; /* * We used to test first at all that there is enough room in the * array used by WaitForMultipleObjects(), but that is not necessary * any more, since driver_select() can't fail. */ - /* - * Search for a free slot. - */ + dp->bytesInBuffer = 0; + dp->totalNeeded = packet_bytes; + dp->inBufSize = PORT_BUFSIZ; + dp->inbuf = DRV_BUF_ALLOC(dp->inBufSize); + if (dp->inbuf == NULL) + goto buf_alloc_error; + erts_smp_atomic_add_nob(&sys_misc_mem_sz, dp->inBufSize); + dp->outBufSize = 0; + dp->outbuf = NULL; + dp->port_num = port_num; + dp->packet_bytes = packet_bytes; + dp->port_pid = INVALID_HANDLE_VALUE; + if (init_async_io(&dp->in, use_threads) == -1) + goto async_io_error1; + if (init_async_io(&dp->out, use_threads) == -1) + goto async_io_error2; - for (dp = driver_data; dp < driver_data+max_files; dp++) { - if (dp->port_num == PORT_FREE) { - dp->bytesInBuffer = 0; - dp->totalNeeded = packet_bytes; - dp->inBufSize = PORT_BUFSIZ; - dp->inbuf = DRV_BUF_ALLOC(dp->inBufSize); - if (dp->inbuf == NULL) { - erts_smp_mtx_unlock(&sys_driver_data_lock); - return NULL; - } - erts_smp_atomic_add_nob(&sys_misc_mem_sz, dp->inBufSize); - dp->outBufSize = 0; - dp->outbuf = NULL; - dp->port_num = port_num; - dp->packet_bytes = packet_bytes; - dp->port_pid = INVALID_HANDLE_VALUE; - if (init_async_io(&dp->in, use_threads) == -1) - break; - if (init_async_io(&dp->out, use_threads) == -1) - break; - erts_smp_mtx_unlock(&sys_driver_data_lock); - return dp; - } - } + return dp; - /* - * Error or no free driver data. - */ +async_io_error2: + release_async_io(&dp->in, dp->port_num); +async_io_error1: + release_async_io(&dp->out, dp->port_num); - if (dp < driver_data+max_files) { - release_async_io(&dp->in, dp->port_num); - release_async_io(&dp->out, dp->port_num); - } - erts_smp_mtx_unlock(&sys_driver_data_lock); +buf_alloc_error: + driver_free(dp); return NULL; } static void release_driver_data(DriverData* dp) { - erts_smp_mtx_lock(&sys_driver_data_lock); - #ifdef ERTS_SMP #ifdef USE_CANCELIOEX if (fpCancelIoEx != NULL) { @@ -721,8 +698,7 @@ release_driver_data(DriverData* dp) * the exit thread. */ - dp->port_num = PORT_FREE; - erts_smp_mtx_unlock(&sys_driver_data_lock); + driver_free(dp); } #ifdef ERTS_SMP @@ -817,7 +793,6 @@ threaded_handle_closer(LPVOID param) static ErlDrvData set_driver_data(DriverData* dp, HANDLE ifd, HANDLE ofd, int read_write, int report_exit) { - int index = dp - driver_data; int result; dp->in.fd = ifd; @@ -836,13 +811,12 @@ set_driver_data(DriverData* dp, HANDLE ifd, HANDLE ofd, int read_write, int repo ERL_DRV_WRITE|ERL_DRV_USE, 1); ASSERT(result != -1); } - return (ErlDrvData)index; + return (ErlDrvData) dp; } static ErlDrvData reuse_driver_data(DriverData *dp, HANDLE ifd, HANDLE ofd, int read_write, ErlDrvPort port_num) { - int index = dp - driver_data; int result; dp->port_num = port_num; @@ -861,7 +835,7 @@ reuse_driver_data(DriverData *dp, HANDLE ifd, HANDLE ofd, int read_write, ErlDrv ERL_DRV_WRITE|ERL_DRV_USE, 1); ASSERT(result != -1); } - return (ErlDrvData)index; + return (ErlDrvData) dp; } /* @@ -1118,12 +1092,6 @@ spawn_init(void) ((module != NULL) ? GetProcAddress(module,"CancelIoEx") : NULL); DEBUGF(("fpCancelIoEx = %p\r\n", fpCancelIoEx)); #endif - driver_data = (struct driver_data *) - erts_alloc(ERTS_ALC_T_DRV_TAB, max_files * sizeof(struct driver_data)); - erts_smp_atomic_add_nob(&sys_misc_mem_sz, - max_files*sizeof(struct driver_data)); - for (i = 0; i < max_files; i++) - driver_data[i].port_num = PORT_FREE; return 0; } @@ -2187,12 +2155,10 @@ fd_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts) return ERL_DRV_ERROR_GENERAL; if (!create_file_thread(&dp->in, DO_READ)) { - dp->port_num = PORT_FREE; return ERL_DRV_ERROR_GENERAL; } if (!create_file_thread(&dp->out, DO_WRITE)) { - dp->port_num = PORT_FREE; return ERL_DRV_ERROR_GENERAL; } @@ -2212,10 +2178,9 @@ fd_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts) } } -static void fd_stop(ErlDrvData d) +static void fd_stop(ErlDrvData data) { - int fd = (int)d; - DriverData* dp = driver_data+fd; + DriverData * dp = (DriverData *) data; /* * There's no way we can terminate an fd port in a consistent way. * Instead we let it live until it's opened again (which it is, @@ -2276,16 +2241,10 @@ vanilla_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts) } static void -stop(ErlDrvData index) -{ - common_stop((int)index); -} - -static void common_stop(int index) +stop(ErlDrvData data) { - DriverData* dp = driver_data+index; - - DEBUGF(("common_stop(%d)\n", index)); + DriverData *dp = (DriverData *) data; + DEBUGF(("stop(%p)\n", dp)); if (dp->in.ov.hEvent != NULL) { (void) driver_select(dp->port_num, @@ -2307,7 +2266,6 @@ static void common_stop(int index) */ HANDLE thread; DWORD tid; - dp->port_num = PORT_EXITING; thread = (HANDLE *) _beginthreadex(NULL, 0, threaded_exiter, dp, 0, &tid); CloseHandle(thread); } @@ -2432,22 +2390,17 @@ threaded_exiter(LPVOID param) static void output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len) -/* long drv_data; /* The slot to use in the driver data table. +/* ErlDrvData drv_data; /* The slot to use in the driver data table. * For Windows NT, this is *NOT* a file handle. * The handle is found in the driver data. */ /* char *buf; /* Pointer to data to write to the port program. */ /* ErlDrvSizeT len; /* Number of bytes to write. */ { - DriverData* dp; + DriverData* dp = (DriverData *) drv_data; int pb; /* The header size for this port. */ - int port_num; /* The actual port number (for diagnostics). */ char* current; - dp = driver_data + (int)drv_data; - if ((port_num = dp->port_num) == -1) - return ; /*-1;*/ - pb = dp->packet_bytes; if ((pb+len) == 0) @@ -2458,7 +2411,7 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len) */ if ((pb == 2 && len > 65535) || (pb == 1 && len > 255)) { - driver_failure_posix(port_num, EINVAL); + driver_failure_posix(dp->port_num, EINVAL); return ; /* -1; */ } @@ -2472,7 +2425,7 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len) ASSERT(!dp->outbuf); dp->outbuf = DRV_BUF_ALLOC(pb+len); if (!dp->outbuf) { - driver_failure_posix(port_num, ENOMEM); + driver_failure_posix(dp->port_num, ENOMEM); return ; /* -1; */ } @@ -2502,7 +2455,7 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len) memcpy(current, buf, len); if (!async_write_file(&dp->out, dp->outbuf, pb+len)) { - set_busy_port(port_num, 1); + set_busy_port(dp->port_num, 1); } else { dp->out.ov.Offset += pb+len; /* For vanilla driver. */ /* XXX OffsetHigh should be changed too. */ @@ -2537,10 +2490,9 @@ ready_input(ErlDrvData drv_data, ErlDrvEvent ready_event) { int error = 0; /* The error code (assume initially no errors). */ DWORD bytesRead; /* Number of bytes read. */ - DriverData* dp; + DriverData* dp = (DriverData *) drv_data; int pb; - dp = driver_data+(int)drv_data; pb = dp->packet_bytes; #ifdef ERTS_SMP if(dp->in.thread == (HANDLE) -1) { @@ -2708,7 +2660,7 @@ static void ready_output(ErlDrvData drv_data, ErlDrvEvent ready_event) { DWORD bytesWritten; - DriverData* dp = driver_data + (int)drv_data; + DriverData *dp = (DriverData *) drv_data; int error; #ifdef ERTS_SMP @@ -2716,7 +2668,7 @@ ready_output(ErlDrvData drv_data, ErlDrvEvent ready_event) dp->out.async_io_active = 0; } #endif - DEBUGF(("ready_output(%d, 0x%x)\n", drv_data, ready_event)); + DEBUGF(("ready_output(%p, 0x%x)\n", drv_data, ready_event)); set_busy_port(dp->port_num, 0); if (!(dp->outbuf)) { /* Happens because event sometimes get signalled during a successful @@ -2771,7 +2723,7 @@ sys_init_io(void) can change our view of the number of open files possible. We estimate the number to twice the amount of ports. We really dont know on windows, do we? */ - max_files = 2*erts_max_ports; + max_files = 2*erts_ptab_max(&erts_port); } #ifdef ERTS_SMP @@ -3225,9 +3177,6 @@ void erl_sys_init(void) noinherit_std_handle(STD_INPUT_HANDLE); noinherit_std_handle(STD_ERROR_HANDLE); - - erts_smp_mtx_init(&sys_driver_data_lock, "sys_driver_data_lock"); - #ifdef ERTS_SMP erts_smp_tsd_key_create(&win32_errstr_key); InitializeCriticalSection(&htbc_lock); -- cgit v1.2.3