aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-08-15 01:28:55 +0200
committerRickard Green <[email protected]>2012-12-03 21:18:06 +0100
commit50cb7c24f061fd3d7df5970d8202f47c470a4047 (patch)
tree8d352994515c3217052cc5b64b78b597028ca75c /erts
parentb434a3ab242dde66e23a72122474854f51a61eff (diff)
downloadotp-50cb7c24f061fd3d7df5970d8202f47c470a4047.tar.gz
otp-50cb7c24f061fd3d7df5970d8202f47c470a4047.tar.bz2
otp-50cb7c24f061fd3d7df5970d8202f47c470a4047.zip
Use ptab functionality also for ports
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/beam_emu.c5
-rw-r--r--erts/emulator/beam/bif.c100
-rw-r--r--erts/emulator/beam/break.c9
-rw-r--r--erts/emulator/beam/dist.c25
-rw-r--r--erts/emulator/beam/dist.h2
-rw-r--r--erts/emulator/beam/erl_alloc.c5
-rw-r--r--erts/emulator/beam/erl_alloc.h2
-rw-r--r--erts/emulator/beam/erl_alloc.types6
-rw-r--r--erts/emulator/beam/erl_async.c9
-rw-r--r--erts/emulator/beam/erl_bif_ddll.c45
-rw-r--r--erts/emulator/beam/erl_bif_info.c30
-rw-r--r--erts/emulator/beam/erl_bif_port.c78
-rw-r--r--erts/emulator/beam/erl_bif_trace.c59
-rw-r--r--erts/emulator/beam/erl_db_util.c11
-rw-r--r--erts/emulator/beam/erl_driver.h2
-rw-r--r--erts/emulator/beam/erl_lock_check.c17
-rw-r--r--erts/emulator/beam/erl_lock_check.h1
-rw-r--r--erts/emulator/beam/erl_monitors.c7
-rw-r--r--erts/emulator/beam/erl_node_container_utils.h11
-rw-r--r--erts/emulator/beam/erl_node_tables.c36
-rw-r--r--erts/emulator/beam/erl_node_tables.h3
-rw-r--r--erts/emulator/beam/erl_port.h421
-rw-r--r--erts/emulator/beam/erl_port_task.c100
-rw-r--r--erts/emulator/beam/erl_process.c61
-rw-r--r--erts/emulator/beam/erl_process.h12
-rw-r--r--erts/emulator/beam/erl_ptab.c20
-rw-r--r--erts/emulator/beam/erl_ptab.h4
-rw-r--r--erts/emulator/beam/erl_sys_driver.h1
-rw-r--r--erts/emulator/beam/erl_trace.c8
-rw-r--r--erts/emulator/beam/global.h13
-rw-r--r--erts/emulator/beam/io.c876
-rw-r--r--erts/emulator/beam/register.c24
-rw-r--r--erts/emulator/beam/register.h19
-rw-r--r--erts/emulator/hipe/hipe_bif_list.m41
-rw-r--r--erts/emulator/sys/common/erl_check_io.c15
-rw-r--r--erts/emulator/sys/unix/sys.c41
-rw-r--r--erts/emulator/sys/vxworks/sys.c22
-rwxr-xr-xerts/emulator/sys/win32/sys.c143
39 files changed, 1104 insertions, 1141 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 201719507f..d6d8633f6f 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -419,6 +419,7 @@ atom pid
atom port
atom ports
atom port_count
+atom port_limit
atom print
atom priority
atom private
diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c
index 3fb2fbd01c..d8ccbacc26 100644
--- a/erts/emulator/beam/beam_emu.c
+++ b/erts/emulator/beam/beam_emu.c
@@ -1660,6 +1660,7 @@ void process_main(void)
reg[0] = r(0);
result = erl_send(c_p, r(0), x(1));
PreFetch(0, next);
+ ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
if (c_p->mbuf || MSO(c_p).overhead >= BIN_VHEAP_SZ(c_p)) {
@@ -2636,6 +2637,7 @@ void process_main(void)
reg[0] = r(0);
result = (*bf)(c_p, reg, I);
ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result));
+ ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
ERTS_HOLE_CHECK(c_p);
ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
@@ -3370,7 +3372,6 @@ void process_main(void)
PROCESS_MAIN_CHK_LOCKS(c_p);
bif_nif_arity = I[-1];
ERTS_SMP_UNREQ_PROC_MAIN_LOCK(c_p);
- ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
ASSERT(!ERTS_PROC_IS_EXITING(c_p));
{
@@ -3415,7 +3416,6 @@ void process_main(void)
bif_nif_arity = I[-1];
ASSERT(bif_nif_arity <= 3);
ERTS_SMP_UNREQ_PROC_MAIN_LOCK(c_p);
- ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
reg[0] = r(0);
{
Eterm (*bf)(Process*, Eterm*, BeamInstr*) = vbf;
@@ -4646,7 +4646,6 @@ void process_main(void)
ERTS_SMP_UNREQ_PROC_MAIN_LOCK(c_p);
flags = erts_call_trace(c_p, ep->code, ep->match_prog_set, reg,
0, &ERTS_TRACER_PROC(c_p));
- ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
ASSERT(!ERTS_PROC_IS_EXITING(c_p));
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 84513c1b0e..5ea441ad81 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -37,6 +37,8 @@
#include "erl_db_util.h"
#include "register.h"
#include "erl_thr_progress.h"
+#define ERTS_PTAB_WANT_BIF_IMPL__
+#include "erl_ptab.h"
static Export* flush_monitor_message_trap = NULL;
static Export* set_cpu_topology_trap = NULL;
@@ -155,7 +157,10 @@ BIF_RETTYPE link_1(BIF_ALIST_1)
}
if (is_internal_port(BIF_ARG_1)) {
- Port *pt = erts_id2port(BIF_ARG_1, BIF_P, ERTS_PROC_LOCK_MAIN);
+ Port *pt = erts_id2port_sflgs(BIF_ARG_1,
+ BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
if (!pt) {
goto res_no_proc;
}
@@ -167,7 +172,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1)
/* else: already linked */
erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK);
- erts_smp_port_unlock(pt);
+ erts_port_release(pt);
BIF_RET(am_true);
}
else if (is_external_port(BIF_ARG_1)
@@ -948,7 +953,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
#ifdef ERTS_SMP
if (ERTS_PROC_PENDING_EXIT(BIF_P)) {
if (pt)
- erts_smp_port_unlock(pt);
+ erts_port_release(pt);
goto handle_pending_exit;
}
#endif
@@ -959,7 +964,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
if (pt) {
rl = erts_remove_link(&ERTS_P_LINKS(pt), BIF_P->common.id);
- erts_smp_port_unlock(pt);
+ erts_port_release(pt);
if (rl)
erts_destroy_link(rl);
}
@@ -1341,7 +1346,7 @@ BIF_RETTYPE exit_2(BIF_ALIST_2)
if (is_internal_port(BIF_ARG_1)) {
Port *prt;
erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
- prt = erts_id2port(BIF_ARG_1, NULL, 0);
+ prt = erts_id2port(BIF_ARG_1);
if (prt) {
erts_do_exit_port(prt, BIF_P->common.id, BIF_ARG_2);
erts_port_release(prt);
@@ -1894,7 +1899,10 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) {
if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
profile_runnable_proc(p, am_inactive);
}
- pt = erts_id2port(to, p, ERTS_PROC_LOCK_MAIN);
+ pt = erts_id2port_sflgs(to,
+ p,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
port_common:
ERTS_SMP_LC_ASSERT(!pt || erts_lc_is_port_locked(pt));
@@ -1908,7 +1916,7 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) {
profile_runnable_port(pt, am_active);
}
- state = erts_smp_atomic32_read_nob(&pt->state);
+ state = erts_atomic32_read_nob(&pt->state);
/* XXX let port_command handle the busy stuff !!! */
if (state & ERTS_PORT_SFLG_PORT_BUSY) {
if (suspend) {
@@ -3510,75 +3518,25 @@ BIF_RETTYPE garbage_collect_message_area_0(BIF_ALIST_0)
#endif
}
+
/**********************************************************************/
-/* Return a list of active ports */
+/*
+ * The erlang:processes/0 BIF.
+ */
-BIF_RETTYPE ports_0(BIF_ALIST_0)
+BIF_RETTYPE processes_0(BIF_ALIST_0)
{
- Eterm res = NIL;
- Eterm* port_buf = erts_alloc(ERTS_ALC_T_TMP,
- sizeof(Eterm)*erts_max_ports);
- Eterm* pp = port_buf;
- Eterm* dead_ports;
- int alive, dead;
- Uint32 next_ss;
- int i;
-
- /* To get a consistent snapshot...
- * We add alive ports from start of the buffer
- * while dying ports are added from the other end by the killing threads.
- */
-
- erts_smp_mtx_lock(&ports_snapshot_mtx); /* One snapshot at a time */
-
- erts_smp_atomic_set_nob(&erts_dead_ports_ptr,
- (erts_aint_t) (port_buf + erts_max_ports));
-
- next_ss = erts_smp_atomic32_inc_read_relb(&erts_ports_snapshot);
-
- for (i = erts_max_ports-1; i >= 0; i--) {
- Port* prt = &erts_port[i];
- erts_aint32_t state = erts_smp_atomic32_read_acqb(&prt->state);
- if (!(state & ERTS_PORT_SFLGS_DEAD)) {
- erts_smp_port_minor_lock(prt);
- state = erts_smp_atomic32_read_nob(&prt->state);
- if (!(state & ERTS_PORT_SFLGS_DEAD) && prt->snapshot != next_ss) {
- ASSERT(prt->snapshot == next_ss - 1);
- *pp++ = prt->common.id;
- prt->snapshot = next_ss; /* Consumed by this snapshot */
- }
- erts_smp_port_minor_unlock(prt);
- }
- }
-
- dead_ports = (Eterm*)erts_smp_atomic_xchg_nob(&erts_dead_ports_ptr,
- (erts_aint_t) NULL);
- erts_smp_mtx_unlock(&ports_snapshot_mtx);
-
- ASSERT(pp <= dead_ports);
-
- alive = pp - port_buf;
- dead = port_buf + erts_max_ports - dead_ports;
-
- ASSERT((alive+dead) <= erts_max_ports);
-
- if (alive+dead > 0) {
- erts_aint_t i;
- Eterm *hp = HAlloc(BIF_P, (alive+dead)*2);
-
- for (i = 0; i < alive; i++) {
- res = CONS(hp, port_buf[i], res);
- hp += 2;
- }
- for (i = 0; i < dead; i++) {
- res = CONS(hp, dead_ports[i], res);
- hp += 2;
- }
- }
+ return erts_ptab_list(BIF_P, &erts_proc);
+}
- erts_free(ERTS_ALC_T_TMP, port_buf);
+/**********************************************************************/
+/*
+ * The erlang:ports/0 BIF.
+ */
- BIF_RET(res);
+BIF_RETTYPE ports_0(BIF_ALIST_0)
+{
+ return erts_ptab_list(BIF_P, &erts_port);
}
/**********************************************************************/
diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c
index fba734623a..ad05316b39 100644
--- a/erts/emulator/beam/break.c
+++ b/erts/emulator/beam/break.c
@@ -61,9 +61,12 @@ extern char* erts_system_version[];
static void
port_info(int to, void *to_arg)
{
- int i;
- for (i = 0; i < erts_max_ports; i++)
- print_port_info(to, to_arg, i);
+ int i, max = erts_ptab_max(&erts_port);
+ for (i = 0; i < max; i++) {
+ Port *p = erts_pix2port(i);
+ if (p)
+ print_port_info(p, to, to_arg);
+ }
}
void
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 7aef26aa21..13715034cb 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -144,7 +144,7 @@ create_cache(DistEntry *dep)
ERTS_SMP_LC_ASSERT(
is_internal_port(dep->cid)
- && erts_lc_is_port_locked(&erts_port[internal_port_index(dep->cid)]));
+ && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
ASSERT(!dep->cache);
dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE,
@@ -421,9 +421,9 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
ASSERT(is_internal_port(prt_id));
erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx);
- prt = erts_id2port(prt_id, NULL, 0);
+ prt = erts_id2port(prt_id);
if (prt) {
- ASSERT(erts_smp_atomic32_read_nob(&prt->state)
+ ASSERT(erts_atomic32_read_nob(&prt->state)
& ERTS_PORT_SFLG_DISTRIBUTION);
ASSERT(prt->dist_entry);
/* will call do_net_exists !!! */
@@ -455,7 +455,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
erts_smp_de_rwlock(dep);
ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid)
- && erts_lc_is_port_locked(&erts_port[internal_port_index(dep->cid)]));
+ && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
if (erts_port_task_is_scheduled(&dep->dist_cmd))
erts_port_task_abort(dep->cid, &dep->dist_cmd);
@@ -1957,7 +1957,7 @@ erts_dist_command(Port *prt, int reds_limit)
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
- state = erts_smp_atomic32_read_nob(&prt->state);
+ state = erts_atomic32_read_nob(&prt->state);
if (reds > reds_limit)
goto preempted;
@@ -1978,7 +1978,7 @@ erts_dist_command(Port *prt, int reds_limit)
obufsize += size_obuf(fob);
foq.first = foq.first->next;
free_dist_obuf(fob);
- state = erts_smp_atomic32_read_nob(&prt->state);
+ state = erts_atomic32_read_nob(&prt->state);
preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD);
if (state & ERTS_PORT_SFLG_PORT_BUSY)
break;
@@ -2060,7 +2060,7 @@ erts_dist_command(Port *prt, int reds_limit)
obufsize += size_obuf(fob);
oq.first = oq.first->next;
free_dist_obuf(fob);
- state = erts_smp_atomic32_read_nob(&prt->state);
+ state = erts_atomic32_read_nob(&prt->state);
preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD);
if ((state & ERTS_PORT_SFLG_PORT_BUSY) && oq.first && !preempt)
goto finalize_only;
@@ -2580,10 +2580,13 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
else if (!dep)
goto system_limit; /* Should never happen!!! */
- pp = erts_id2port(BIF_ARG_2, BIF_P, ERTS_PROC_LOCK_MAIN);
+ pp = erts_id2port_sflgs(BIF_ARG_2,
+ BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
erts_smp_de_rwlock(dep);
- if (!pp || (erts_smp_atomic32_read_nob(&pp->state)
+ if (!pp || (erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLG_EXITING))
goto badarg;
@@ -2613,7 +2616,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
if (pp->dist_entry || is_not_nil(dep->cid))
goto badarg;
- erts_smp_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
pp->dist_entry = dep;
@@ -2658,7 +2661,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
}
if (pp)
- erts_smp_port_unlock(pp);
+ erts_port_release(pp);
return ret;
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 98c069da8a..7de8786c4a 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -187,7 +187,7 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry)
if (prt) {
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- ASSERT((erts_smp_atomic32_read_nob(&prt->state)
+ ASSERT((erts_atomic32_read_nob(&prt->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
ASSERT(prt->dist_entry);
diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c
index d0c0a13400..e8a9c11034 100644
--- a/erts/emulator/beam/erl_alloc.c
+++ b/erts/emulator/beam/erl_alloc.c
@@ -2273,6 +2273,7 @@ erts_allocated_areas(int *print_to_p, void *print_to_arg, void *proc)
int i, length;
Uint reserved_atom_space, atom_space;
int max_processes = erts_ptab_max(&erts_proc);
+ int max_ports = erts_ptab_max(&erts_port);
if (proc) {
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN
@@ -2303,8 +2304,8 @@ erts_allocated_areas(int *print_to_p, void *print_to_arg, void *proc)
values[i].arity = 2;
values[i].name = "static";
- values[i].ui[0] =
- erts_max_ports*sizeof(Port) /* Port table */
+ values[i].ui[0] =
+ max_ports*sizeof(Port) /* Port table */
+ erts_timer_wheel_memory_size() /* Timer wheel */
#ifdef SYS_TMP_BUF_SIZE
+ SYS_TMP_BUF_SIZE /* tmp_buf in sys on vxworks & ose */
diff --git a/erts/emulator/beam/erl_alloc.h b/erts/emulator/beam/erl_alloc.h
index e475f9d8a2..ba5ec9c367 100644
--- a/erts/emulator/beam/erl_alloc.h
+++ b/erts/emulator/beam/erl_alloc.h
@@ -267,6 +267,8 @@ typedef void (*erts_alloc_verify_func_t)(Allctr_t *);
erts_alloc_verify_func_t
erts_alloc_get_verify_unused_temp_alloc(Allctr_t **allctr);
+#define ERTS_ALC_DATA_ALIGN_SIZE(SZ) \
+ (((((SZ) - 1) / 8) + 1) * 8)
#define ERTS_ALC_CACHE_LINE_ALIGN_SIZE(SZ) \
(((((SZ) - 1) / ERTS_CACHE_LINE_SIZE) + 1) * ERTS_CACHE_LINE_SIZE)
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 4ad0c41b50..92ba15214e 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -146,6 +146,7 @@ class SYSTEM system_data
type SBMBC SBMBC SYSTEM small_block_mbc
type PROC FIXED_SIZE PROCESSES proc
+type PORT DRIVER SYSTEM port
type ATOM LONG_LIVED ATOM atom_entry
type MODULE LONG_LIVED CODE module_entry
type REG_PROC STANDARD PROCESSES reg_proc
@@ -188,7 +189,7 @@ type PORT_TABLE LONG_LIVED SYSTEM port_tab
type TIMER_WHEEL LONG_LIVED SYSTEM timer_wheel
type DRV DRIVER SYSTEM drv_internal
type DRV_BINARY BINARY BINARIES drv_binary
-type DRIVER STANDARD SYSTEM driver
+type DRIVER DRIVER SYSTEM driver
type NIF DRIVER SYSTEM nif_internal
type BINARY BINARY BINARIES binary
type NBIF_TABLE SYSTEM SYSTEM nbif_tab
@@ -196,7 +197,6 @@ type ARG_REG STANDARD PROCESSES arg_reg
type PROC_DICT STANDARD PROCESSES proc_dict
type CALLS_BUF STANDARD PROCESSES calls_buf
type BPD STANDARD SYSTEM bpd
-type PORT_NAME STANDARD SYSTEM port_name
type LINEBUF STANDARD SYSTEM line_buf
type IOQ STANDARD SYSTEM io_queue
type BITS_BUF STANDARD SYSTEM bits_buf
@@ -236,7 +236,7 @@ type PORT_TASK SHORT_LIVED SYSTEM port_task
type PORT_TASKQ SHORT_LIVED SYSTEM port_task_queue
type MISC_OP_LIST SHORT_LIVED SYSTEM misc_op_list
type PORT_NAMES SHORT_LIVED SYSTEM port_names
-type PORT_DATA_LOCK STANDARD SYSTEM port_data_lock
+type PORT_DATA_LOCK DRIVER SYSTEM port_data_lock
type NODES_MON STANDARD PROCESSES nodes_monitor
type PTAB_LIST_DEL SHORT_LIVED PROCESSES ptab_list_deleted_el
type PTAB_LIST_CNKI SHORT_LIVED PROCESSES ptab_list_chunk_info
diff --git a/erts/emulator/beam/erl_async.c b/erts/emulator/beam/erl_async.c
index 8d3bea82c1..d8adaedc91 100644
--- a/erts/emulator/beam/erl_async.c
+++ b/erts/emulator/beam/erl_async.c
@@ -375,10 +375,15 @@ static ERTS_INLINE ErtsAsync *async_get(ErtsThrQ_t *q,
static ERTS_INLINE void call_async_ready(ErtsAsync *a)
{
+#if ERTS_USE_ASYNC_READY_Q
Port *p = erts_id2port_sflgs(a->port,
NULL,
0,
ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
+#else
+ Port *p = erts_thr_id2port_sflgs(a->port,
+ ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
+#endif
if (!p) {
if (a->async_free)
a->async_free(a->async_data);
@@ -388,7 +393,11 @@ static ERTS_INLINE void call_async_ready(ErtsAsync *a)
if (a->async_free)
a->async_free(a->async_data);
}
+#if ERTS_USE_ASYNC_READY_Q
erts_port_release(p);
+#else
+ erts_thr_port_release(p);
+#endif
}
if (a->hndl)
erts_ddll_dereference_driver(a->hndl);
diff --git a/erts/emulator/beam/erl_bif_ddll.c b/erts/emulator/beam/erl_bif_ddll.c
index a120a3e5f2..e18b1f0159 100644
--- a/erts/emulator/beam/erl_bif_ddll.c
+++ b/erts/emulator/beam/erl_bif_ddll.c
@@ -117,48 +117,27 @@ static void ddll_no_more_references(void *vdh);
static void
kill_ports_driver_unloaded(DE_Handle *dh)
{
- int ix;
+ int ix, max = erts_ptab_max(&erts_port);
- for (ix = 0; ix < erts_max_ports; ix++) {
- Port* prt = &erts_port[ix];
+ for (ix = 0; ix < max; ix++) {
erts_aint32_t state;
- state = erts_smp_atomic32_read_acqb(&prt->state);
- if (state & FREE_PORT_FLAGS)
+ Port* prt = erts_pix2port(ix);
+ if (!prt)
continue;
- erts_smp_port_minor_lock(prt);
+ ERTS_SMP_DATA_DEPENDENCY_READ_MEMORY_BARRIER;
- if (prt->drv_ptr->handle != dh) {
- erts_smp_port_minor_unlock(prt);
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & FREE_PORT_FLAGS)
continue;
- }
- erts_smp_atomic32_inc_nob(&prt->common.refc);
- erts_smp_port_minor_unlock(prt);
-
- state = erts_smp_atomic32_read_acqb(&prt->state);
- if (!(state & FREE_PORT_FLAGS)) {
-#if DDLL_SMP
- if (state & ERTS_PORT_SFLG_INITIALIZING) {
- /* Extremely rare busy wait */
- do {
- ERTS_SPIN_BODY;
- state = erts_smp_atomic32_read_acqb(&prt->state);
- } while (state & ERTS_PORT_SFLG_INITIALIZING);
- }
+ erts_smp_port_lock(prt);
- erts_smp_mtx_lock(prt->lock);
+ state = erts_atomic32_read_nob(&prt->state);
+ if (!(state & ERTS_PORT_SFLGS_DEAD) && prt->drv_ptr->handle == dh)
+ driver_failure_atom((ErlDrvPort) prt, "driver_unloaded");
- if (prt->drv_ptr->handle == dh) {
- state = erts_smp_atomic32_read_acqb(&prt->state);
- if (!(state & ERTS_PORT_SFLGS_DEAD))
- driver_failure_atom(ix, "driver_unloaded");
- }
-#else
- driver_failure_atom(ix, "driver_unloaded");
-#endif
- erts_port_release(prt);
- }
+ erts_port_release(prt);
}
}
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 847a0c33e9..0586a89041 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -2189,6 +2189,10 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1)
BIF_RET(make_small(erts_ptab_count(&erts_proc)));
} else if (BIF_ARG_1 == am_process_limit) {
BIF_RET(make_small(erts_ptab_max(&erts_proc)));
+ } else if (BIF_ARG_1 == am_port_count) {
+ BIF_RET(make_small(erts_ptab_count(&erts_port)));
+ } else if (BIF_ARG_1 == am_port_limit) {
+ BIF_RET(make_small(erts_ptab_max(&erts_port)));
} else if (BIF_ARG_1 == am_info
|| BIF_ARG_1 == am_procs
|| BIF_ARG_1 == am_loaded
@@ -2840,7 +2844,10 @@ static BIF_RETTYPE port_info(Process* p, Eterm portid, Eterm item)
int count;
if (is_internal_port(portid))
- prt = erts_id2port(portid, p, ERTS_PROC_LOCK_MAIN);
+ prt = erts_id2port_sflgs(portid,
+ p,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
else if (is_atom(portid))
erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
portid, NULL, 0, 0, &prt);
@@ -2975,7 +2982,7 @@ static BIF_RETTYPE port_info(Process* p, Eterm portid, Eterm item)
#ifndef ERTS_SMP
res = am_false;
#else
- if (erts_smp_atomic32_read_nob(&prt->state)
+ if (erts_atomic32_read_nob(&prt->state)
& ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) {
DECL_AM(port_level);
ASSERT(prt->drv_ptr->flags
@@ -2999,7 +3006,7 @@ static BIF_RETTYPE port_info(Process* p, Eterm portid, Eterm item)
done:
- erts_smp_port_unlock(prt);
+ erts_port_release(prt);
return ret;
}
@@ -3357,9 +3364,8 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
Eterm res;
if (ERTS_IS_ATOM_STR("next_pid", BIF_ARG_1))
res = erts_ptab_test_next_id(&erts_proc, 0, 0);
- else {
- res = erts_test_next_port(0, 0);
- }
+ else
+ res = erts_ptab_test_next_id(&erts_port, 0, 0);
if (res < 0)
BIF_RET(am_false);
BIF_RET(erts_make_integer(res, BIF_P));
@@ -3466,11 +3472,14 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
}
else if(is_internal_port(tp[2])) {
Eterm res;
- Port *p = erts_id2port(tp[2], BIF_P, ERTS_PROC_LOCK_MAIN);
+ Port *p = erts_id2port_sflgs(tp[2],
+ BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
if(!p)
BIF_RET(am_undefined);
res = make_link_list(BIF_P, ERTS_P_LINKS(p), NIL);
- erts_smp_port_unlock(p);
+ erts_port_release(p);
BIF_RET(res);
}
else if(is_node_name_atom(tp[2])) {
@@ -3714,9 +3723,8 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2)
if (ERTS_IS_ATOM_STR("next_pid", BIF_ARG_1))
res = erts_ptab_test_next_id(&erts_proc, 1, next);
- else {
- res = erts_test_next_port(1, next);
- }
+ else
+ res = erts_ptab_test_next_id(&erts_port, 1, next);
if (res < 0)
BIF_RET(am_false);
BIF_RET(erts_make_integer(res, BIF_P));
diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c
index 2b7760f556..31151dc9f5 100644
--- a/erts/emulator/beam/erl_bif_port.c
+++ b/erts/emulator/beam/erl_bif_port.c
@@ -42,7 +42,7 @@
#include "erl_bits.h"
#include "dtrace-wrapper.h"
-static int open_port(Process* p, Eterm name, Eterm settings, int *err_nump);
+static Port *open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump);
static byte* convert_environment(Process* p, Eterm env);
static char **convert_args(Eterm);
static void free_args(char **);
@@ -54,16 +54,17 @@ port_call(Process* p, Eterm arg1, Eterm arg2, Eterm arg3);
BIF_RETTYPE open_port_2(BIF_ALIST_2)
{
- int port_num;
- Eterm port_val;
+ Port *port;
+ Eterm port_id;
char *str;
- int err_num;
+ int err_type, err_num;
- if ((port_num = open_port(BIF_P, BIF_ARG_1, BIF_ARG_2, &err_num)) < 0) {
- if (port_num == -3) {
+ port = open_port(BIF_P, BIF_ARG_1, BIF_ARG_2, &err_type, &err_num);
+ if (!port) {
+ if (err_type == -3) {
ASSERT(err_num == BADARG || err_num == SYSTEM_LIMIT);
BIF_ERROR(BIF_P, err_num);
- } else if (port_num == -2) {
+ } else if (err_type == -2) {
str = erl_errno_id(err_num);
} else {
str = "einval";
@@ -74,15 +75,15 @@ BIF_RETTYPE open_port_2(BIF_ALIST_2)
erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK);
- port_val = erts_port[port_num].common.id;
- erts_add_link(&ERTS_P_LINKS(&erts_port[port_num]), LINK_PID, BIF_P->common.id);
- erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, port_val);
+ port_id = port->common.id;
+ erts_add_link(&ERTS_P_LINKS(port), LINK_PID, BIF_P->common.id);
+ erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, port_id);
erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK);
- erts_port_release(&erts_port[port_num]);
+ erts_port_release(port);
- BIF_RET(port_val);
+ BIF_RET(port_id);
}
/****************************************************************************
@@ -112,7 +113,10 @@ id_or_name2port(Process *c_p, Eterm id)
{
Port *port;
if (is_not_atom(id))
- port = erts_id2port(id, c_p, ERTS_PROC_LOCK_MAIN);
+ port = erts_id2port_sflgs(id,
+ c_p,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
else
erts_whereis_name(c_p, ERTS_PROC_LOCK_MAIN, id, NULL, 0, 0, &port);
return port;
@@ -164,7 +168,7 @@ do_port_command(Process *BIF_P, Eterm arg1, Eterm arg2, Eterm arg3,
ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP);
}
else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE)
- && (erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLG_PORT_BUSY)) {
+ && (erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLG_PORT_BUSY)) {
if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) {
ERTS_BIF_PREP_RET(res, am_false);
}
@@ -538,7 +542,7 @@ BIF_RETTYPE port_connect_2(BIF_ALIST_2)
rp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN,
pid, ERTS_PROC_LOCK_LINK);
if (!rp) {
- erts_smp_port_unlock(prt);
+ erts_port_release(prt);
ERTS_SMP_ASSERT_IS_NOT_EXITING(BIF_P);
goto error;
}
@@ -549,7 +553,7 @@ BIF_RETTYPE port_connect_2(BIF_ALIST_2)
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
prt->connected = pid; /* internal pid */
- erts_smp_port_unlock(prt);
+ erts_port_release(prt);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_connect)) {
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);
@@ -591,7 +595,7 @@ BIF_RETTYPE port_set_data_2(BIF_ALIST_2)
hp = bp->mem;
prt->data = copy_struct(data, size, &hp, &bp->off_heap);
}
- erts_smp_port_unlock(prt);
+ erts_port_release(prt);
BIF_RET(am_true);
}
@@ -612,7 +616,7 @@ BIF_RETTYPE port_get_data_1(BIF_ALIST_1)
Eterm* hp = HAlloc(BIF_P, prt->bp->used_size);
res = copy_struct(prt->data, prt->bp->used_size, &hp, &MSO(BIF_P));
}
- erts_smp_port_unlock(prt);
+ erts_port_release(prt);
BIF_RET(res);
}
@@ -625,11 +629,10 @@ BIF_RETTYPE port_get_data_1(BIF_ALIST_1)
* either BADARG or SYSTEM_LIMIT).
*/
-static int
-open_port(Process* p, Eterm name, Eterm settings, int *err_nump)
+static Port *
+open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump)
{
-#define OPEN_PORT_ERROR(VAL) do { port_num = (VAL); goto do_return; } while (0)
- int i, port_num;
+ int i;
Eterm option;
Uint arity;
Eterm* tp;
@@ -641,6 +644,7 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_nump)
Eterm edir = NIL;
byte dir[MAXPATHLEN];
erts_aint32_t sflgs = 0;
+ Port *port;
/* These are the defaults */
opts.packet_bytes = 0;
@@ -923,38 +927,40 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_nump)
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN);
- port_num = erts_open_driver(driver, p->common.id, name_buf, &opts, err_nump);
+ port = erts_open_driver(driver, p->common.id, name_buf, &opts, err_typep, err_nump);
#ifdef USE_VM_PROBES
- if (port_num >= 0 && DTRACE_ENABLED(port_open)) {
+ if (port && DTRACE_ENABLED(port_open)) {
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);
dtrace_proc_str(p, process_str);
- erts_snprintf(port_str, sizeof(port_str), "%T", erts_port[port_num].id);
+ erts_snprintf(port_str, sizeof(port_str), "%T", port->common.id);
DTRACE3(port_open, process_str, name_buf, port_str);
}
#endif
erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN);
- if (port_num < 0) {
- DEBUGF(("open_driver returned %d(%d)\n", port_num, *err_nump));
+ if (!port) {
+ DEBUGF(("open_driver returned (%d:%d)\n",
+ err_typep ? *err_typep : 4711,
+ err_nump ? *err_nump : 4711));
if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) {
trace_virtual_sched(p, am_in);
}
- OPEN_PORT_ERROR(port_num);
+ goto do_return;
}
if (IS_TRACED_FL(p, F_TRACE_SCHED_PROCS)) {
trace_virtual_sched(p, am_in);
}
- if (linebuf && erts_port[port_num].linebuf == NULL){
- erts_port[port_num].linebuf = allocate_linebuf(linebuf);
+ if (linebuf && port->linebuf == NULL){
+ port->linebuf = allocate_linebuf(linebuf);
sflgs |= ERTS_PORT_SFLG_LINEBUF_IO;
}
if (sflgs)
- erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state, sflgs);
+ erts_atomic32_read_bor_relb(&port->state, sflgs);
do_return:
if (name_buf)
@@ -965,13 +971,15 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_nump)
if (opts.wd && opts.wd != ((char *)dir)) {
erts_free(ERTS_ALC_T_TMP, (void *) opts.wd);
}
- return port_num;
+ return port;
badarg:
- *err_nump = BADARG;
- OPEN_PORT_ERROR(-3);
+ if (err_typep)
+ *err_typep = -3;
+ if (err_nump)
+ *err_nump = BADARG;
+ port = NULL;
goto do_return;
-#undef OPEN_PORT_ERROR
}
/* Arguments can be given i unicode and as raw binaries, convert filename is used to convert */
diff --git a/erts/emulator/beam/erl_bif_trace.c b/erts/emulator/beam/erl_bif_trace.c
index d94b05fc25..60d5d039ed 100644
--- a/erts/emulator/beam/erl_bif_trace.c
+++ b/erts/emulator/beam/erl_bif_trace.c
@@ -151,14 +151,12 @@ trace_pattern(Process* p, Eterm MFA, Eterm Pattern, Eterm flaglist)
}
} else if (is_internal_port(meta_tracer_pid)) {
Port *meta_tracer_port;
- meta_tracer_proc = NULL;
- if (internal_port_index(meta_tracer_pid) >= erts_max_ports)
+ meta_tracer_proc = NULL;
+ meta_tracer_port = (erts_port_lookup(
+ meta_tracer_pid,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP));
+ if (!meta_tracer_port)
goto error;
- meta_tracer_port =
- &erts_port[internal_port_index(meta_tracer_pid)];
- if (INVALID_TRACER_PORT(meta_tracer_port, meta_tracer_pid)) {
- goto error;
- }
} else {
goto error;
}
@@ -479,14 +477,14 @@ Eterm trace_3(BIF_ALIST_3)
? ERTS_PROC_LOCKS_ALL_MINOR
: ERTS_PROC_LOCKS_ALL));
} else if (is_internal_port(tracer)) {
- Port *tracer_port = erts_id2port(tracer, p, ERTS_PROC_LOCK_MAIN);
- if (!erts_is_valid_tracer_port(tracer)) {
- if (tracer_port)
- erts_smp_port_unlock(tracer_port);
+ Port *tracer_port = erts_id2port_sflgs(tracer,
+ p,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
+ if (!tracer_port)
goto error;
- }
ERTS_TRACE_FLAGS(tracer_port) |= F_TRACER;
- erts_smp_port_unlock(tracer_port);
+ erts_port_release(tracer_port);
} else
goto error;
@@ -519,12 +517,15 @@ Eterm trace_3(BIF_ALIST_3)
if (pid_spec == tracer)
goto error;
- tracee_port = erts_id2port(pid_spec, p, ERTS_PROC_LOCK_MAIN);
+ tracee_port = erts_id2port_sflgs(pid_spec,
+ p,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
if (!tracee_port)
goto error;
if (tracer != NIL && port_already_traced(p, tracee_port, tracer)) {
- erts_smp_port_unlock(tracee_port);
+ erts_port_release(tracee_port);
goto already_traced;
}
@@ -538,7 +539,7 @@ Eterm trace_3(BIF_ALIST_3)
else if (tracer != NIL)
ERTS_TRACER_PROC(tracee_port) = tracer;
- erts_smp_port_unlock(tracee_port);
+ erts_port_release(tracee_port);
matches = 1;
} else if (is_pid(pid_spec)) {
@@ -677,15 +678,21 @@ Eterm trace_3(BIF_ALIST_3)
}
}
if (ports || mods) {
+ int max = erts_ptab_max(&erts_port);
/* tracing of ports */
- for (i = 0; i < erts_max_ports; i++) {
- Port *tracee_port = &erts_port[i];
+ for (i = 0; i < max; i++) {
erts_aint32_t state;
- state = erts_smp_atomic32_read_nob(&tracee_port->state);
- if (state & ERTS_PORT_SFLGS_DEAD) continue;
+ Port *tracee_port = erts_pix2port(i);
+ if (!tracee_port)
+ continue;
+ state = erts_atomic32_read_nob(&tracee_port->state);
+ if (state & ERTS_PORT_SFLGS_DEAD)
+ continue;
if (tracer != NIL) {
- if (tracee_port->common.id == tracer) continue;
- if (port_already_traced(NULL, tracee_port, tracer)) continue;
+ if (tracee_port->common.id == tracer)
+ continue;
+ if (port_already_traced(NULL, tracee_port, tracer))
+ continue;
}
if (on) ERTS_TRACE_FLAGS(tracee_port) |= mask;
@@ -2147,9 +2154,11 @@ BIF_RETTYPE system_profile_2(BIF_ALIST_2)
if (!profiler_p)
goto error;
} else if (is_internal_port(profiler)) {
- if (internal_port_index(profiler) >= erts_max_ports) goto error;
- profiler_port = &erts_port[internal_port_index(profiler)];
- if (INVALID_TRACER_PORT(profiler_port, profiler)) goto error;
+ profiler_port = (erts_port_lookup(
+ profiler,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP));
+ if (!profiler_port)
+ goto error;
} else {
goto error;
}
diff --git a/erts/emulator/beam/erl_db_util.c b/erts/emulator/beam/erl_db_util.c
index d89190e3c1..67c2d0eb7f 100644
--- a/erts/emulator/beam/erl_db_util.c
+++ b/erts/emulator/beam/erl_db_util.c
@@ -182,12 +182,13 @@ set_match_trace(Process *tracee_p, Eterm fail_term, Eterm tracer,
}
} else if (is_internal_port(tracer)) {
Port *tracer_port =
- erts_id2port(tracer, tracee_p, ERTS_PROC_LOCKS_ALL);
+ erts_id2port_sflgs(tracer,
+ tracee_p,
+ ERTS_PROC_LOCKS_ALL,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
if (tracer_port) {
- if (! INVALID_TRACER_PORT(tracer_port, tracer)) {
- ret = set_tracee_flags(tracee_p, tracer, d_flags, e_flags);
- }
- erts_smp_port_unlock(tracer_port);
+ ret = set_tracee_flags(tracee_p, tracer, d_flags, e_flags);
+ erts_port_release(tracer_port);
}
} else {
ASSERT(is_nil(tracer));
diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h
index 1ae9a211d7..1dc43419fe 100644
--- a/erts/emulator/beam/erl_driver.h
+++ b/erts/emulator/beam/erl_driver.h
@@ -210,8 +210,8 @@ typedef struct erl_drv_binary {
typedef struct _erl_drv_data* ErlDrvData; /* Data to be used by the driver itself. */
#ifndef ERL_SYS_DRV
typedef struct _erl_drv_event* ErlDrvEvent; /* An event to be selected on. */
-typedef struct _erl_drv_port* ErlDrvPort; /* A port descriptor. */
#endif
+typedef struct _erl_drv_port* ErlDrvPort; /* A port descriptor. */
typedef struct _erl_drv_port* ErlDrvThreadData; /* Thread data. */
#if !defined(__WIN32__) && !defined(_WIN32) && !defined(_WIN32_) && !defined(USE_SELECT)
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index 1db7b27412..06203a706d 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -112,9 +112,6 @@ static erts_lc_lock_order_t erts_lock_order[] = {
#if defined(ENABLE_CHILD_WAITER_THREAD) || defined(ERTS_SMP)
{ "child_status", NULL },
#endif
-#ifdef __WIN32__
- { "sys_driver_data_lock", NULL },
-#endif
{ "drv_ev_state_grow", NULL, },
{ "drv_ev_state", "address" },
{ "safe_hash", "address" },
@@ -159,7 +156,7 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "port_taskq_pre_alloc_lock", "address" },
{ "proclist_pre_alloc_lock", "address" },
{ "port_tasks_lock", NULL },
- { "get_free_port", NULL },
+ { "port_table", NULL },
{ "port_state", "address" },
{ "xports_list_pre_alloc_lock", "address" },
{ "inet_buffer_stack_lock", NULL },
@@ -248,6 +245,7 @@ typedef struct {
typedef struct erts_lc_locked_locks_t_ erts_lc_locked_locks_t;
struct erts_lc_locked_locks_t_ {
char *thread_name;
+ int emu_thread;
erts_tid_t tid;
erts_lc_locked_locks_t *next;
erts_lc_locked_locks_t *prev;
@@ -365,6 +363,7 @@ create_locked_locks(char *thread_name)
if (!l_lcks->thread_name)
lc_abort();
+ l_lcks->emu_thread = 0;
l_lcks->tid = erts_thr_self();
l_lcks->required.first = NULL;
l_lcks->required.last = NULL;
@@ -672,7 +671,7 @@ erts_lc_set_thread_name(char *thread_name)
{
erts_lc_locked_locks_t *l_lcks = get_my_locked_locks();
if (!l_lcks)
- (void) create_locked_locks(thread_name);
+ l_lcks = create_locked_locks(thread_name);
else {
ASSERT(l_lcks->thread_name);
free((void *) l_lcks->thread_name);
@@ -680,6 +679,14 @@ erts_lc_set_thread_name(char *thread_name)
if (!l_lcks->thread_name)
lc_abort();
}
+ l_lcks->emu_thread = 1;
+}
+
+int
+erts_lc_is_emu_thr(void)
+{
+ erts_lc_locked_locks_t *l_lcks = get_my_locked_locks();
+ return l_lcks->emu_thread;
}
int
diff --git a/erts/emulator/beam/erl_lock_check.h b/erts/emulator/beam/erl_lock_check.h
index b67f36fa06..e76bcf556e 100644
--- a/erts/emulator/beam/erl_lock_check.h
+++ b/erts/emulator/beam/erl_lock_check.h
@@ -102,6 +102,7 @@ void erts_lc_unrequire_lock_flg(erts_lc_lock_t *lck, Uint16 op_flags);
void erts_lc_require_lock(erts_lc_lock_t *lck);
void erts_lc_unrequire_lock(erts_lc_lock_t *lck);
+int erts_lc_is_emu_thr(void);
#define ERTS_LC_ASSERT(A) \
((void) ((A) ? 1 : erts_lc_assert_failed(__FILE__, __LINE__, #A)))
diff --git a/erts/emulator/beam/erl_monitors.c b/erts/emulator/beam/erl_monitors.c
index 450aad975d..63175c44d6 100644
--- a/erts/emulator/beam/erl_monitors.c
+++ b/erts/emulator/beam/erl_monitors.c
@@ -985,12 +985,15 @@ Eterm erts_debug_dump_links_1(BIF_ALIST_1)
Process *rp;
DistEntry *dep;
if (is_internal_port(pid)) {
- Port *rport = erts_id2port(pid, p, ERTS_PROC_LOCK_MAIN);
+ Port *rport = erts_id2port_sflgs(pid,
+ p,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
if (rport) {
erts_printf("Dumping port links----------------------\n");
erts_dump_links(ERTS_P_LINKS(rport), 0);
erts_printf("Links dumped----------------------------\n");
- erts_smp_port_unlock(rport);
+ erts_port_release(rport);
BIF_RET(am_true);
} else {
BIF_ERROR(p,BADARG);
diff --git a/erts/emulator/beam/erl_node_container_utils.h b/erts/emulator/beam/erl_node_container_utils.h
index 62fb277eef..7796cbfc2b 100644
--- a/erts/emulator/beam/erl_node_container_utils.h
+++ b/erts/emulator/beam/erl_node_container_utils.h
@@ -197,8 +197,10 @@ extern ErtsPTab erts_proc;
* Ports *
\* */
-#define internal_port_index(x) (internal_port_data((x)) \
- & erts_port_tab_index_mask)
+extern ErtsPTab erts_port;
+
+#define internal_port_index(x) erts_ptab_data2ix(&erts_port, \
+ internal_port_data((x)))
#define internal_port_node_name(x) (internal_port_node((x))->sysname)
#define external_port_node_name(x) (external_port_node((x))->sysname)
@@ -237,7 +239,7 @@ extern ErtsPTab erts_proc;
#define is_not_port(x) (!is_port(x))
/* Highest port-ID part in a term of type Port
- Not necessarily the same as the variable erts_max_ports
+ Not necessarily the same as current maximum port table size
which defines the maximum number of simultaneous Ports
in the Erlang node. ERTS_MAX_PORTS is a hard upper limit.
*/
@@ -249,6 +251,9 @@ extern ErtsPTab erts_proc;
#define ERTS_R9_PORTS_BITS (_PORT_R9_NUM_SIZE)
#define ERTS_PORTS_BITS (_PORT_NUM_SIZE)
+
+#define ERTS_INVALID_PORT make_internal_port(ERTS_MAX_PORT_DATA)
+
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\
* Refs *
\* */
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 3b400ce0de..67f413ef28 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -1274,7 +1274,7 @@ setup_reference_table(void)
ErlHeapFragment *hfp;
DistEntry *dep;
HashInfo hi;
- int i, max_processes;
+ int i, max;
DeclareTmpHeapNoproc(heap,3);
inserted_bins = NULL;
@@ -1309,9 +1309,9 @@ setup_reference_table(void)
#endif
UnUseTmpHeapNoproc(3);
- max_processes = erts_ptab_max(&erts_proc);
+ max = erts_ptab_max(&erts_proc);
/* Insert all processes */
- for (i = 0; i < max_processes; i++) {
+ for (i = 0; i < max; i++) {
Process *proc = erts_pix2proc(i);
if (proc) {
ErlMessage *msg;
@@ -1383,25 +1383,33 @@ setup_reference_table(void)
#endif
/* Insert all ports */
- for (i = 0; i < erts_max_ports; i++) {
- erts_aint32_t state = erts_smp_atomic32_read_nob(&erts_port[i].state);
+ max = erts_ptab_max(&erts_port);
+ for (i = 0; i < max; i++) {
+ erts_aint32_t state;
+ Port *prt;
+
+ prt = erts_pix2port(i);
+ if (!prt)
+ continue;
+
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_DEAD)
continue;
/* Insert links */
- if (ERTS_P_LINKS(&erts_port[i]))
- insert_links(ERTS_P_LINKS(&erts_port[i]), erts_port[i].common.id);
+ if (ERTS_P_LINKS(prt))
+ insert_links(ERTS_P_LINKS(prt), prt->common.id);
/* Insert monitors */
- if (ERTS_P_MONITORS(&erts_port[i]))
- insert_monitors(ERTS_P_MONITORS(&erts_port[i]), erts_port[i].common.id);
+ if (ERTS_P_MONITORS(prt))
+ insert_monitors(ERTS_P_MONITORS(prt), prt->common.id);
/* Insert port data */
- for(hfp = erts_port[i].bp; hfp; hfp = hfp->next)
- insert_offheap(&(hfp->off_heap), HEAP_REF, erts_port[i].common.id);
+ for(hfp = prt->bp; hfp; hfp = hfp->next)
+ insert_offheap(&(hfp->off_heap), HEAP_REF, prt->common.id);
/* Insert controller */
- if (erts_port[i].dist_entry)
- insert_dist_entry(erts_port[i].dist_entry,
+ if (prt->dist_entry)
+ insert_dist_entry(prt->dist_entry,
CTRL_REF,
- erts_port[i].common.id,
+ prt->common.id,
0);
}
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index 4a015bdef9..3a74888522 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -100,7 +100,6 @@ typedef struct {
*/
struct erl_link;
-struct port;
typedef struct dist_entry_ {
HashBucket hash_bucket; /* Hash bucket */
@@ -141,7 +140,7 @@ typedef struct dist_entry_ {
erts_smp_atomic_t dist_cmd_scheduled;
ErtsPortTaskHandle dist_cmd;
- Uint (*send)(struct port *prt, ErtsDistOutputBuf *obuf);
+ Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
struct cache* cache; /* The atom cache */
} DistEntry;
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 1a5bc636e2..fb34a6da2d 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -17,15 +17,21 @@
* %CopyrightEnd%
*/
-#ifndef ERL_PORT_H__
+#ifndef ERL_PORT_TYPE__
+#define ERL_PORT_TYPE__
+typedef struct _erl_drv_port Port;
+#endif
+
+#if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__)
#define ERL_PORT_H__
-typedef struct port Port;
#include "erl_port_task.h"
#include "erl_ptab.h"
+#include "erl_thr_progress.h"
typedef struct erts_driver_t_ erts_driver_t;
+#define ERTS_INVALID_ERL_DRV_PORT ((ErlDrvPort) (SWord) -1)
#define SMALL_IO_QUEUE 5 /* Number of fixed elements */
typedef struct {
@@ -90,16 +96,18 @@ typedef struct ErtsXPortsList_ ErtsXPortsList;
*
*/
-struct port {
+struct _erl_drv_port {
ErtsPTabElementCommon common; /* *Need* to be first in struct */
ErtsPortTaskSched sched;
ErtsPortTaskHandle timeout_task;
#ifdef ERTS_SMP
- erts_smp_mtx_t *lock;
+ erts_mtx_t *lock;
ErtsXPortsList *xports;
erts_smp_atomic_t run_queue;
- erts_smp_spinlock_t state_lck; /* protects: id, status, snapshot */
+#else
+ erts_atomic32_t refc;
+ int cleanup;
#endif
Eterm connected; /* A connected process */
Eterm caller; /* Current caller. */
@@ -116,15 +124,20 @@ struct port {
ErtsProcList *suspended; /* List of suspended processes. */
LineBuf *linebuf; /* Buffer to hold data not ready for
process to get (line oriented I/O)*/
- erts_smp_atomic32_t state; /* Status and type flags */
+ erts_atomic32_t state; /* Status and type flags */
int control_flags; /* Flags for port_control() */
- erts_aint32_t snapshot; /* Next snapshot that port should be part of */
ErlDrvPDL port_data_lock;
ErtsPrtSD *psd; /* Port specific data */
};
+struct erl_drv_port_data_lock {
+ erts_mtx_t mtx;
+ erts_atomic_t refc;
+ Port *prt;
+};
+
ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
@@ -183,35 +196,6 @@ erts_prtsd_set(Port *prt, int ix, void *data)
#endif
-/* arrays that get malloced at startup */
-extern Port* erts_port;
-
-extern Uint erts_max_ports;
-extern Uint erts_port_tab_index_mask;
-extern erts_smp_atomic32_t erts_ports_snapshot;
-extern erts_smp_atomic_t erts_dead_ports_ptr;
-
-ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt);
-
-#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-
-ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt)
-{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck));
- if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) {
- /* Dead ports are added from the end of the snapshot buffer */
- Eterm* tombstone;
- tombstone = (Eterm*) erts_smp_atomic_add_read_nob(&erts_dead_ports_ptr,
- -(erts_aint_t)sizeof(Eterm));
- ASSERT(tombstone+1 != NULL);
- ASSERT(prt->snapshot == erts_smp_atomic32_read_nob(&erts_ports_snapshot) - 1);
- *tombstone = prt->common.id;
- }
- /*else no ongoing snapshot or port was already included or created after snapshot */
-}
-
-#endif
-
extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */
extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
@@ -263,8 +247,11 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
| ERTS_PORT_SFLG_PORT_BUSY \
| ERTS_PORT_SFLG_DISTRIBUTION)
-
+void print_port_info(Port *, int, void *);
+void erts_port_free(Port *);
+#ifndef ERTS_SMP
void erts_port_cleanup(Port *);
+#endif
void erts_fire_port_monitor(Port *prt, Eterm ref);
#ifdef ERTS_SMP
void erts_smp_xports_unlock(Port *);
@@ -274,8 +261,9 @@ void erts_smp_xports_unlock(Port *);
int erts_lc_is_port_locked(Port *);
#endif
-ERTS_GLB_INLINE void erts_smp_port_minor_lock(Port*);
-ERTS_GLB_INLINE void erts_smp_port_minor_unlock(Port*);
+ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt);
+ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt);
+ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc);
ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt);
ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt);
@@ -283,64 +271,71 @@ ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-ERTS_GLB_INLINE void
-erts_smp_port_minor_lock(Port* prt)
+ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt)
{
#ifdef ERTS_SMP
- erts_smp_spin_lock(&prt->state_lck);
+ erts_ptab_inc_refc(&prt->common);
+#else
+ erts_atomic32_inc_nob(&prt->refc);
#endif
}
-ERTS_GLB_INLINE void
-erts_smp_port_minor_unlock(Port *prt)
+ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt)
{
#ifdef ERTS_SMP
- erts_smp_spin_unlock(&prt->state_lck);
+ int referred = erts_ptab_dec_test_refc(&prt->common);
+ if (!referred)
+ erts_port_free(prt);
+#else
+ int refc = erts_atomic32_dec_read_nob(&prt->refc);
+ if (refc == 0)
+ erts_port_free(prt);
#endif
}
+ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc)
+{
+#ifdef ERTS_SMP
+ int referred = erts_ptab_add_test_refc(&prt->common, add_refc);
+ if (!referred)
+ erts_port_free(prt);
+#else
+ int refc = erts_atomic32_add_read_nob(&prt->refc, add_refc);
+ if (refc == 0)
+ erts_port_free(prt);
+#endif
+}
ERTS_GLB_INLINE int
erts_smp_port_trylock(Port *prt)
{
- int res;
-
- ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0);
- erts_smp_atomic32_inc_nob(&prt->common.refc);
-
#ifdef ERTS_SMP
- res = erts_smp_mtx_trylock(prt->lock);
- if (res == EBUSY) {
- erts_smp_atomic32_dec_nob(&prt->common.refc);
- }
+ /* *Need* to be a managed thread */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ return erts_mtx_trylock(prt->lock);
#else
- res = 0;
+ return 0;
#endif
-
- return res;
}
ERTS_GLB_INLINE void
erts_smp_port_lock(Port *prt)
{
- ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0);
- erts_smp_atomic32_inc_nob(&prt->common.refc);
#ifdef ERTS_SMP
- erts_smp_mtx_lock(prt->lock);
+ /* *Need* to be a managed thread */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ erts_mtx_lock(prt->lock);
#endif
}
ERTS_GLB_INLINE void
erts_smp_port_unlock(Port *prt)
{
- erts_aint32_t refc;
#ifdef ERTS_SMP
- erts_smp_mtx_unlock(prt->lock);
+ /* *Need* to be a managed thread */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ erts_mtx_unlock(prt->lock);
#endif
- refc = erts_smp_atomic32_dec_read_nob(&prt->common.refc);
- ASSERT(refc >= 0);
- if (refc == 0)
- erts_port_cleanup(prt);
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
@@ -348,7 +343,7 @@ erts_smp_port_unlock(Port *prt)
#define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \
(!(PP) \
- || (erts_smp_atomic32_read_nob(&(PP)->state) & (FLGS)) \
+ || (erts_atomic32_read_nob(&(PP)->state) & (FLGS)) \
|| (PP)->common.id != (ID))
/* port lookup */
@@ -365,125 +360,301 @@ erts_smp_port_unlock(Port *prt)
#define ERTS_PORT_SCHED_ID(P, ID) \
((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID)))
+extern const Port erts_invalid_port;
+#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port)
+
#ifdef ERTS_SMP
Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks);
#endif
-#define erts_id2port(ID, P, PL) \
- erts_id2port_sflgs((ID), (P), (PL), ERTS_PORT_SFLGS_INVALID_LOOKUP)
-
-ERTS_GLB_INLINE Port*erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
+ERTS_GLB_INLINE Port *erts_pix2port(int);
+ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm);
+ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32);
+ERTS_GLB_INLINE Port*erts_id2port(Eterm id);
+ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
ERTS_GLB_INLINE void erts_port_release(Port *);
-ERTS_GLB_INLINE Port*erts_drvport2port(ErlDrvPort, erts_aint32_t *);
-ERTS_GLB_INLINE Port*erts_drvportid2port(Eterm);
-ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id);
-ERTS_GLB_INLINE int erts_is_port_alive(Eterm id);
-ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm id);
+#ifdef ERTS_SMP
+ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs);
+ERTS_GLB_INLINE void erts_thr_port_release(Port *prt);
+#endif
+ERTS_GLB_INLINE Port *erts_thr_drvport2port_raw(ErlDrvPort);
+ERTS_GLB_INLINE Port *erts_drvport2port_raw(ErlDrvPort drvport);
+ERTS_GLB_INLINE Port *erts_drvport2port(ErlDrvPort, erts_aint32_t *);
+ERTS_GLB_INLINE Port *erts_drvportid2port(Eterm);
+ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort);
+ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm);
+ERTS_GLB_INLINE int erts_is_port_alive(Eterm);
+ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+ERTS_GLB_INLINE Port *erts_pix2port(int ix)
+{
+ Port *prt;
+ ASSERT(0 <= ix && ix < erts_ptab_max(&erts_port));
+ prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, ix);
+ return prt == ERTS_PORT_LOCK_BUSY ? NULL : prt;
+}
+
+ERTS_GLB_INLINE Port *
+erts_port_lookup_raw(Eterm id)
+{
+ Port *prt;
+
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying());
+
+ if (is_not_internal_port(id))
+ return NULL;
+
+ prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
+ internal_port_index(id));
+ return prt && prt->common.id == id ? prt : NULL;
+}
+
+ERTS_GLB_INLINE Port *
+erts_port_lookup(Eterm id, Uint32 invalid_sflgs)
+{
+ Port *prt = erts_port_lookup_raw(id);
+ erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
+ return (state & invalid_sflgs) ? NULL : prt;
+}
+
+
ERTS_GLB_INLINE Port*
-erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs)
+erts_id2port(Eterm id)
+{
+ erts_aint32_t state;
+ Port *prt;
+
+ /* Only allowed to be called from managed threads */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+
+ if (is_not_internal_port(id))
+ return NULL;
+
+ prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
+ internal_port_index(id));
+
+ if (!prt || prt->common.id != id)
+ return NULL;
+
+ erts_smp_port_lock(prt);
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
+ erts_smp_port_unlock(prt);
+ return NULL;
+ }
+
+ return prt;
+}
+
+
+ERTS_GLB_INLINE Port*
+erts_id2port_sflgs(Eterm id,
+ Process *c_p, ErtsProcLocks c_p_locks,
+ Uint32 invalid_sflgs)
{
#ifdef ERTS_SMP
int no_proc_locks = !c_p || !c_p_locks;
#endif
+ erts_aint32_t state;
Port *prt;
+ /* Only allowed to be called from managed threads */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+
if (is_not_internal_port(id))
return NULL;
- prt = &erts_port[internal_port_index(id)];
+ prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
+ internal_port_index(id));
- erts_smp_port_minor_lock(prt);
- if (ERTS_INVALID_PORT_OPT(prt, id, sflgs)) {
- erts_smp_port_minor_unlock(prt);
- prt = NULL;
+ if (!prt || prt->common.id != id)
+ return NULL;
+
+#ifdef ERTS_SMP
+ if (no_proc_locks)
+ erts_smp_port_lock(prt);
+ else if (erts_smp_port_trylock(prt) == EBUSY) {
+ /* Unlock process locks, and acquire locks in lock order... */
+ erts_smp_proc_unlock(c_p, c_p_locks);
+ erts_smp_port_lock(prt);
+ erts_smp_proc_lock(c_p, c_p_locks);
+ }
+#endif
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & invalid_sflgs) {
+#ifdef ERTS_SMP
+ erts_smp_port_unlock(prt);
+#endif
+ return NULL;
}
- else {
- erts_smp_atomic32_inc_nob(&prt->common.refc);
- erts_smp_port_minor_unlock(prt);
+ return prt;
+}
+
+ERTS_GLB_INLINE void
+erts_port_release(Port *prt)
+{
+ /* Only allowed to be called from managed threads */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
#ifdef ERTS_SMP
- if (no_proc_locks)
- erts_smp_mtx_lock(prt->lock);
- else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) {
- /* Unlock process locks, and acquire locks in lock order... */
- erts_smp_proc_unlock(c_p, c_p_locks);
- erts_smp_mtx_lock(prt->lock);
- erts_smp_proc_lock(c_p, c_p_locks);
+ erts_smp_port_unlock(prt);
+#else
+ if (prt->cleanup) {
+ prt->cleanup = 0;
+ erts_port_cleanup(prt);
+ }
+#endif
+}
+
+#ifdef ERTS_SMP
+
+/*
+ * erts_thr_id2port_sflgs() and erts_thr_port_release() can
+ * be used by unmanaged threads in the SMP case.
+ */
+ERTS_GLB_INLINE Port *
+erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs)
+{
+ Port *prt;
+ ErtsThrPrgrDelayHandle dhndl;
+
+ if (is_not_internal_port(id))
+ return NULL;
+
+ dhndl = erts_thr_progress_unmanaged_delay();
+
+ prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
+ internal_port_index(id));
+
+ if (!prt || prt->common.id != id) {
+ erts_thr_progress_unmanaged_continue(dhndl);
+ prt = NULL;
+ }
+ else {
+ erts_aint32_t state;
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
+ erts_port_inc_refc(prt);
+ erts_thr_progress_unmanaged_continue(dhndl);
}
- /* The id may not have changed... */
- ERTS_SMP_LC_ASSERT(prt->common.id == id);
- /* ... but state may have... */
- if (erts_smp_atomic32_read_nob(&prt->state) & sflgs) {
- erts_smp_port_unlock(prt); /* Also decrements common.refc... */
+ erts_mtx_lock(prt->lock);
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & invalid_sflgs) {
+ erts_mtx_unlock(prt->lock);
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(prt);
prt = NULL;
}
-#endif
-
}
return prt;
}
ERTS_GLB_INLINE void
-erts_port_release(Port *prt)
+erts_thr_port_release(Port *prt)
{
- erts_smp_port_unlock(prt);
+ erts_mtx_unlock(prt->lock);
+#ifdef ERTS_SMP
+ if (!erts_thr_progress_is_managed_thread())
+ erts_port_dec_refc(prt);
+#endif
+}
+
+#endif
+
+ERTS_GLB_INLINE Port*
+erts_thr_drvport2port_raw(ErlDrvPort drvport)
+{
+#if ERTS_ENABLE_LOCK_CHECK
+ int emu_thread = erts_lc_is_emu_thr();
+#endif
+ if (drvport == ERTS_INVALID_ERL_DRV_PORT)
+ return NULL;
+ else {
+ Port *prt = (Port *) drvport;
+#if ERTS_ENABLE_LOCK_CHECK
+ if (emu_thread) {
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(!prt->port_data_lock
+ || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
+ }
+ else {
+ ERTS_LC_ASSERT(prt->port_data_lock);
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
+ }
+#endif
+ return prt;
+ }
+}
+
+ERTS_GLB_INLINE Port*
+erts_drvport2port_raw(ErlDrvPort drvport)
+{
+ ERTS_LC_ASSERT(erts_lc_is_emu_thr());
+ if (drvport == ERTS_INVALID_ERL_DRV_PORT)
+ return NULL;
+ else {
+ Port *prt = (Port *) drvport;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ return prt;
+ }
}
ERTS_GLB_INLINE Port*
erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep)
{
- int ix = (int) drvport;
+ Port *prt = erts_drvport2port_raw(drvport);
erts_aint32_t state;
- if (ix < 0 || erts_max_ports <= ix)
+ if (!prt)
return NULL;
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return NULL;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
if (statep)
*statep = state;
- return &erts_port[ix];
+ return prt;
}
ERTS_GLB_INLINE Port*
erts_drvportid2port(Eterm id)
{
- int ix;
+ Port *prt;
erts_aint32_t state;
if (is_not_internal_port(id))
return NULL;
- ix = (int) internal_port_index(id);
- if (erts_max_ports <= ix)
+ prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port,
+ internal_port_index(id));
+ if (!prt)
return NULL;
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
- if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ if (prt->common.id != id)
return NULL;
- if (erts_port[ix].common.id != id)
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return NULL;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
- return &erts_port[ix];
+ return prt;
+}
+
+ERTS_GLB_INLINE Eterm
+erts_drvport2id(ErlDrvPort drvport)
+{
+ Port *prt = erts_drvport2port_raw(drvport);
+ if (!prt)
+ return am_undefined;
+ else
+ return prt->common.id;
}
ERTS_GLB_INLINE Uint32
erts_portid2status(Eterm id)
{
- if (is_not_internal_port(id))
+ Port *prt = erts_port_lookup_raw(id);
+ if (prt)
+ return (Uint32) erts_atomic32_read_acqb(&prt->state);
+ else
return ERTS_PORT_SFLG_INVALID;
- else {
- erts_aint32_t state;
- int ix = internal_port_index(id);
- if (erts_max_ports <= ix)
- return ERTS_PORT_SFLG_INVALID;
- state = erts_smp_atomic32_read_ddrb(&erts_port[ix].state);
- if (erts_port[ix].common.id != id)
- return ERTS_PORT_SFLG_INVALID;
- return state;
- }
}
ERTS_GLB_INLINE int
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index 6eb8c4869c..112d27c94e 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -51,12 +51,9 @@
#define ERTS_PORT_TASK_INVALID_PORT(P, ID) \
- ((erts_smp_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \
+ ((erts_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \
|| (P)->common.id != (ID))
-#define ERTS_PORT_IS_IN_RUNQ(RQ, P) \
- ((P)->sched.next || (P)->sched.prev || (RQ)->ports.start == (P))
-
#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP) \
if (DTRACE_ENABLED(driver_ready_input)) { \
@@ -76,7 +73,6 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
struct ErtsPortTaskQueue_ {
ErtsPortTask *first;
ErtsPortTask *last;
- Port *port;
};
struct ErtsPortTask_ {
@@ -275,7 +271,6 @@ port_taskq_init(ErtsPortTaskQueue *ptqp, Port *pp)
if (ptqp) {
ptqp->first = NULL;
ptqp->last = NULL;
- ptqp->port = pp;
}
return ptqp;
}
@@ -429,7 +424,10 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp)
ErtsPortTask *ptp;
Port *pp;
- pp = &erts_port[internal_port_index(id)];
+ pp = erts_port_lookup_raw(id);
+ if (!pp)
+ return 1;
+
runq = erts_port_runq(pp);
if (!runq)
return 1;
@@ -512,7 +510,10 @@ erts_port_task_schedule(Eterm id,
ptp = port_task_alloc();
ASSERT(is_internal_port(id));
- pp = &erts_port[internal_port_index(id)];
+ pp = erts_port_lookup_raw(id);
+ if (!pp)
+ return -1;
+
runq = erts_port_runq(pp);
if (!runq || ERTS_PORT_TASK_INVALID_PORT(pp, id)) {
@@ -616,7 +617,7 @@ erts_port_task_free_port(Port *pp)
ErtsPortTaskQueue *ptqp;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
- ASSERT(!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
+ ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
runq = erts_port_runq(pp);
ASSERT(runq);
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
@@ -627,14 +628,10 @@ erts_port_task_free_port(Port *pp)
ErtsPortTask *ptp;
enqueue_free:
ptp = port_task_alloc();
- erts_smp_port_minor_lock(pp);
- erts_smp_atomic32_read_bset_relb(&pp->state,
- (ERTS_PORT_SFLG_CLOSING
- | ERTS_PORT_SFLG_FREE_SCHEDULED),
- ERTS_PORT_SFLG_FREE_SCHEDULED);
- erts_may_save_closed_port(pp);
- erts_smp_port_minor_unlock(pp);
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 1);
+ erts_atomic32_read_bset_relb(&pp->state,
+ (ERTS_PORT_SFLG_CLOSING
+ | ERTS_PORT_SFLG_FREE_SCHEDULED),
+ ERTS_PORT_SFLG_FREE_SCHEDULED);
ptp->type = ERTS_PORT_TASK_FREE;
ptp->event = (ErlDrvEvent) -1;
ptp->event_data = NULL;
@@ -651,20 +648,18 @@ erts_port_task_free_port(Port *pp)
goto enqueue_free;
}
ASSERT(!pp->sched.taskq);
- erts_smp_port_minor_lock(pp);
- erts_smp_atomic32_read_bset_relb(&pp->state,
- (ERTS_PORT_SFLG_CLOSING
- | ERTS_PORT_SFLG_FREE_SCHEDULED),
- ERTS_PORT_SFLG_FREE_SCHEDULED);
- erts_may_save_closed_port(pp);
- erts_smp_port_minor_unlock(pp);
- erts_smp_atomic32_dec_nob(&pp->common.refc); /* Not alive */
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 0); /* Lock */
+ erts_atomic32_read_bset_relb(&pp->state,
+ (ERTS_PORT_SFLG_CLOSING
+ | ERTS_PORT_SFLG_FREE_SCHEDULED),
+ ERTS_PORT_SFLG_FREE_SCHEDULED);
handle_remaining_tasks(runq, pp); /* May release runq lock */
ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first));
pp->sched.taskq = NULL;
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
erts_smp_runq_unlock(runq);
+#ifndef ERTS_SMP
+ pp->cleanup = 1;
+#endif
}
}
@@ -703,14 +698,21 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
goto done;
}
+ if (erts_smp_port_trylock(pp) == EBUSY) {
+ erts_smp_runq_unlock(runq);
+ erts_smp_port_lock(pp);
+ erts_smp_runq_lock(runq);
+ }
+
ASSERT(pp->sched.in_runq);
pp->sched.in_runq = 0;
+
if (!pp->sched.taskq) {
if (erts_system_profile_flags.runnable_ports)
profile_runnable_port(pp, am_inactive);
res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
!= (erts_aint_t) 0);
- goto done;
+ goto release_port_done;
}
*curr_port_pp = pp;
@@ -721,12 +723,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ASSERT(!pp->sched.exe_taskq);
pp->sched.exe_taskq = ptqp;
-
- if (erts_smp_port_trylock(pp) == EBUSY) {
- erts_smp_runq_unlock(runq);
- erts_smp_port_lock(pp);
- erts_smp_runq_lock(runq);
- }
if (erts_sched_stat.enabled) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
@@ -771,31 +767,31 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_smp_runq_lock(runq);
erts_unblock_fpe(fpe_was_unmasked);
- ASSERT(erts_smp_atomic32_read_nob(&pp->state)
+ ASSERT(erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLG_FREE_SCHEDULED);
if (ptqp->first || (pp->sched.taskq && pp->sched.taskq->first))
handle_remaining_tasks(runq, pp);
ASSERT(!ptqp->first
&& (!pp->sched.taskq || !pp->sched.taskq->first));
- erts_smp_atomic32_dec_nob(&pp->common.refc); /* Not alive */
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 0); /* Lock */
port_task_free(ptp);
if (pp->sched.taskq)
port_taskq_free(pp->sched.taskq);
pp->sched.taskq = NULL;
-
+#ifndef ERTS_SMP
+ pp->cleanup = 1;
+#endif
goto tasks_done;
case ERTS_PORT_TASK_TIMEOUT:
reds += ERTS_PORT_REDS_TIMEOUT;
- if (!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) {
+ if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) {
DTRACE_DRIVER(driver_timeout, pp);
(*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
}
break;
case ERTS_PORT_TASK_INPUT:
reds += ERTS_PORT_REDS_INPUT;
- ASSERT((erts_smp_atomic32_read_nob(&pp->state)
+ ASSERT((erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_input, pp);
/* NOTE some windows drivers use ->ready_input for input and output */
@@ -804,7 +800,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
break;
case ERTS_PORT_TASK_OUTPUT:
reds += ERTS_PORT_REDS_OUTPUT;
- ASSERT((erts_smp_atomic32_read_nob(&pp->state)
+ ASSERT((erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_output, pp);
(*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event);
@@ -812,7 +808,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
break;
case ERTS_PORT_TASK_EVENT:
reds += ERTS_PORT_REDS_EVENT;
- ASSERT((erts_smp_atomic32_read_nob(&pp->state)
+ ASSERT((erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_event, pp);
(*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data);
@@ -828,7 +824,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
break;
}
- if ((erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING)
+ if ((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING)
&& erts_is_port_ioq_empty(pp)) {
reds += ERTS_PORT_REDS_TERMINATE;
erts_terminate_port(pp);
@@ -879,7 +875,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ErtsRunQueue *xrunq;
#endif
- ASSERT(!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
+ ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
ASSERT(pp->sched.taskq->first);
#ifdef ERTS_SMP
@@ -922,23 +918,9 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
trace_sched_ports(pp, am_out);
}
-#ifndef ERTS_SMP
+
+release_port_done:
erts_port_release(pp);
-#else
- {
- erts_aint32_t refc;
- erts_smp_mtx_unlock(pp->lock);
- refc = erts_smp_atomic32_dec_read_nob(&pp->common.refc);
- ASSERT(refc >= 0);
- if (refc == 0) {
- erts_smp_runq_unlock(runq);
- erts_port_cleanup(pp); /* Might aquire runq lock */
- erts_smp_runq_lock(runq);
- res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- != (erts_aint_t) 0);
- }
- }
-#endif
done:
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index f9ff3c4bd5..e6fe4ccdc1 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -43,7 +43,6 @@
#include "erl_thr_queue.h"
#include "erl_async.h"
#include "dtrace-wrapper.h"
-#define ERTS_PTAB_WANT_BIF_IMPL__
#include "erl_ptab.h"
#define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS)
@@ -1482,37 +1481,32 @@ handle_reap_ports(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_REAP_PORTS);
awdp->esdp->run_queue->halt_in_progress = 1;
if (erts_smp_atomic32_dec_read_acqb(&erts_halt_progress) == 0) {
- int i;
+ int i, max = erts_ptab_max(&erts_port);
erts_smp_atomic32_set_nob(&erts_halt_progress, 1);
- for (i = 0; i < erts_max_ports; i++) {
- Port *prt = &erts_port[i];
- erts_aint32_t state = erts_smp_atomic32_read_acqb(&prt->state);
+ for (i = 0; i < max; i++) {
+ erts_aint32_t state;
+ Port *prt = erts_pix2port(i);
+ if (!prt)
+ continue;
+ state = erts_atomic32_read_acqb(&prt->state);
if (state & (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP
| ERTS_PORT_SFLG_HALT))
continue;
- erts_smp_port_minor_lock(prt);
+
/* We need to set the halt flag - get the port lock */
-#ifdef ERTS_SMP
- erts_smp_atomic32_inc_nob(&prt->common.refc);
-#endif
- erts_smp_port_minor_unlock(prt);
-#ifdef ERTS_SMP
- erts_smp_mtx_lock(prt->lock);
-#endif
- state = erts_smp_atomic32_read_acqb(&prt->state);
- if (state & (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP
- | ERTS_PORT_SFLG_HALT)) {
- erts_port_release(prt);
- continue;
- }
- state = erts_smp_atomic32_read_bor_relb(&prt->state,
+
+ erts_smp_port_lock(prt);
+
+ state = erts_atomic32_read_nob(&prt->state);
+ if (!(state & (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP
+ | ERTS_PORT_SFLG_HALT))) {
+ state = erts_atomic32_read_bor_relb(&prt->state,
ERTS_PORT_SFLG_HALT);
- erts_smp_atomic32_inc_nob(&erts_halt_progress);
- if (state & (ERTS_PORT_SFLG_EXITING|ERTS_PORT_SFLG_CLOSING)) {
- erts_port_release(prt);
- continue;
+ erts_smp_atomic32_inc_nob(&erts_halt_progress);
+ if (!(state & (ERTS_PORT_SFLG_EXITING|ERTS_PORT_SFLG_CLOSING)))
+ erts_do_exit_port(prt, prt->common.id, am_killed);
}
- erts_do_exit_port(prt, prt->common.id, am_killed);
+
erts_port_release(prt);
}
if (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0) {
@@ -4304,6 +4298,11 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
erts_smp_atomic32_init_relb(&erts_halt_progress, -1);
erts_halt_code = 0;
+
+#if !defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
+ erts_lc_set_thread_name("scheduler 1");
+#endif
+
}
ErtsRunQueue *
@@ -8112,7 +8111,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
ASSERT(mon->type == MON_TARGET);
ASSERT(is_pid(mon->pid) || is_internal_port(mon->pid));
if (is_internal_port(mon->pid)) {
- Port *prt = erts_id2port(mon->pid, NULL, 0);
+ Port *prt = erts_id2port(mon->pid);
if (prt == NULL) {
goto done;
}
@@ -8199,7 +8198,7 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext)
switch(lnk->type) {
case LINK_PID:
if(is_internal_port(item)) {
- Port *prt = erts_id2port(item, NULL, 0);
+ Port *prt = erts_id2port(item);
if (prt) {
rlnk = erts_remove_link(&ERTS_P_LINKS(prt), p->common.id);
if (rlnk)
@@ -8758,14 +8757,6 @@ stack_element_dump(int to, void *to_arg, Process* p, Eterm* sp, int yreg)
}
/*
- * The processes/0 BIF.
- */
-BIF_RETTYPE processes_0(BIF_ALIST_0)
-{
- return erts_ptab_list(BIF_P, &erts_proc);
-}
-
-/*
* A nice system halt closing all open port goes as follows:
* 1) This function schedules the aux work ERTS_SSI_AUX_WORK_REAP_PORTS
* on all schedulers, then schedules itself out.
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 6f882c5e57..f5b7921820 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -42,6 +42,9 @@ typedef struct process Process;
#include "erl_process_lock.h" /* Only pull out important types... */
#undef ERTS_PROCESS_LOCK_ONLY_PROC_LOCK_TYPE__
+#define ERL_PORT_GET_PORT_TYPE_ONLY__
+#include "erl_port.h"
+#undef ERL_PORT_GET_PORT_TYPE_ONLY__
#include "erl_vm.h"
#include "erl_smp.h"
#include "erl_message.h"
@@ -66,7 +69,6 @@ typedef struct process Process;
#undef ERL_THR_PROGRESS_TSD_TYPE_ONLY
struct ErtsNodesMonitor_;
-struct port;
#define ERTS_MAX_NO_OF_SCHEDULERS 1024
@@ -378,8 +380,8 @@ struct ErtsRunQueue_ {
struct {
ErtsRunQueueInfo info;
- struct port *start;
- struct port *end;
+ Port *start;
+ Port *end;
} ports;
};
@@ -476,7 +478,7 @@ struct ErtsSchedulerData_ {
ErtsSchedulerSleepInfo *ssi;
Process *current_process;
Uint no; /* Scheduler number */
- struct port *current_port;
+ Port *current_port;
ErtsRunQueue *run_queue;
int virtual_reds;
int cpu_id; /* >= 0 when bound */
@@ -1219,7 +1221,7 @@ Eterm erts_sched_stat_term(Process *p, int total);
void erts_free_proc(Process *);
-void erts_suspend(Process*, ErtsProcLocks, struct port*);
+void erts_suspend(Process*, ErtsProcLocks, Port*);
void erts_resume(Process*, ErtsProcLocks);
int erts_resume_processes(ErtsProcList *);
diff --git a/erts/emulator/beam/erl_ptab.c b/erts/emulator/beam/erl_ptab.c
index 8195a350fb..8cab1de9da 100644
--- a/erts/emulator/beam/erl_ptab.c
+++ b/erts/emulator/beam/erl_ptab.c
@@ -660,11 +660,9 @@ erts_ptab_delete_element(ErtsPTab *ptab,
erts_ptab_rwunlock(ptab);
}
-#ifdef ERTS_SMP
erts_schedule_thr_prgr_later_op(ptab->r.o.release_element,
(void *) ptab_el,
&ptab_el->u.release);
-#endif
}
/*
@@ -1151,16 +1149,18 @@ ptab_list_bif_engine(Process *c_p, Eterm *res_accp, Binary *mbp)
conses = ptlbdp->pid_ix;
}
- hp = HAlloc(c_p, conses*2);
- ERTS_PTAB_LIST_DBG_SAVE_HEAP_ALLOC(ptlbdp, hp, conses*2);
+ if (conses) {
+ hp = HAlloc(c_p, conses*2);
+ ERTS_PTAB_LIST_DBG_SAVE_HEAP_ALLOC(ptlbdp, hp, conses*2);
- for (ix = ptlbdp->pid_ix - 1; ix >= min_ix; ix--) {
- ERTS_PTAB_LIST_ASSERT(erts_ptab_is_valid_id(ptlbdp->pid[ix]));
- res = CONS(hp, ptlbdp->pid[ix], res);
- hp += 2;
- }
+ for (ix = ptlbdp->pid_ix - 1; ix >= min_ix; ix--) {
+ ERTS_PTAB_LIST_ASSERT(erts_ptab_is_valid_id(ptlbdp->pid[ix]));
+ res = CONS(hp, ptlbdp->pid[ix], res);
+ hp += 2;
+ }
- ERTS_PTAB_LIST_DBG_VERIFY_HEAP_ALLOC_USED(ptlbdp, hp);
+ ERTS_PTAB_LIST_DBG_VERIFY_HEAP_ALLOC_USED(ptlbdp, hp);
+ }
ptlbdp->pid_ix = min_ix;
if (min_ix == 0)
diff --git a/erts/emulator/beam/erl_ptab.h b/erts/emulator/beam/erl_ptab.h
index b65db330e5..b2ae829d62 100644
--- a/erts/emulator/beam/erl_ptab.h
+++ b/erts/emulator/beam/erl_ptab.h
@@ -53,8 +53,6 @@ typedef struct {
Eterm id;
#ifdef ERTS_SMP
erts_atomic32_t refc;
-#else
- erts_smp_atomic32_t refc; /* Temporary solution during dev; to be removed! */
#endif
Eterm tracer_proc;
Uint trace_flags;
@@ -73,9 +71,7 @@ typedef struct {
} alive;
/* --- While being released --- */
-#ifdef ERTS_SMP
ErtsThrPrgrLaterOp release;
-#endif
} u;
} ErtsPTabElementCommon;
diff --git a/erts/emulator/beam/erl_sys_driver.h b/erts/emulator/beam/erl_sys_driver.h
index d429d0ce96..b991a2840c 100644
--- a/erts/emulator/beam/erl_sys_driver.h
+++ b/erts/emulator/beam/erl_sys_driver.h
@@ -31,7 +31,6 @@
#define ERL_SYS_DRV
typedef long ErlDrvEvent; /* An event to be selected on. */
-typedef long ErlDrvPort; /* A port descriptor. */
/* typedef struct _SysDriverOpts SysDriverOpts; defined in sys.h */
diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c
index dd2a121165..991e573c14 100644
--- a/erts/emulator/beam/erl_trace.c
+++ b/erts/emulator/beam/erl_trace.c
@@ -3442,10 +3442,8 @@ sys_msg_dispatcher_func(void *unused)
goto queue_proc_msg;
}
else if (is_internal_port(receiver)) {
- port = erts_id2port_sflgs(receiver,
- NULL,
- 0,
- ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
+ port = erts_thr_id2port_sflgs(receiver,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
if (!port)
goto failure;
else {
@@ -3459,7 +3457,7 @@ sys_msg_dispatcher_func(void *unused)
#ifdef DEBUG_PRINTOUTS
erts_fprintf(stderr, "delivered\n");
#endif
- erts_port_release(port);
+ erts_thr_port_release(port);
if (smqp->bp)
free_message_buffer(smqp->bp);
}
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index d93ac723a4..ae31ab54b5 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -884,11 +884,6 @@ int erts_global_garbage_collect(Process*, int, Eterm*, int);
/* io.c */
-struct erl_drv_port_data_lock {
- erts_mtx_t mtx;
- erts_atomic_t refc;
-};
-
typedef struct {
char *name;
char *driver_name;
@@ -901,22 +896,18 @@ typedef struct {
int erts_add_driver_entry(ErlDrvEntry *drv, DE_Handle *handle, int driver_list_locked);
void erts_destroy_driver(erts_driver_t *drv);
void erts_wake_process_later(Port*, Process*);
-int erts_open_driver(erts_driver_t*, Eterm, char*, SysDriverOpts*, int *);
+Port *erts_open_driver(erts_driver_t*, Eterm, char*, SysDriverOpts*, int *, int *);
int erts_is_port_ioq_empty(Port *);
void erts_terminate_port(Port *);
-void close_port(Eterm);
void init_io(void);
-void cleanup_io(void);
void erts_do_exit_port(Port *, Eterm, Eterm);
void erts_port_command(Process *, Eterm, Port *, Eterm);
Eterm erts_port_control(Process*, Port*, Uint, Eterm);
int erts_write_to_port(Eterm caller_id, Port *p, Eterm list);
-void print_port_info(int, void *, int);
void erts_raw_port_command(Port*, byte*, Uint);
-void driver_report_exit(int, int);
+void driver_report_exit(ErlDrvPort, int);
LineBuf* allocate_linebuf(int);
int async_ready(Port *, void*);
-Sint erts_test_next_port(int, Uint);
ErtsPortNames *erts_get_port_names(Eterm);
void erts_free_port_names(ErtsPortNames *);
Uint erts_port_ioq_size(Port *pp);
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index b772783f64..4dcec356a9 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -57,15 +57,14 @@ static erts_smp_tsd_key_t driver_list_lock_status_key; /*stop recursive locks wh
static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error on a
per thread basis (for BC interfaces) */
-Port* erts_port; /* The port table */
+ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */
erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */
erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */
-Uint erts_max_ports;
-Uint erts_port_tab_index_mask;
-
const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL;
+const Port erts_invalid_port = {{ERTS_INVALID_PORT}};
+
erts_driver_t vanilla_driver;
erts_driver_t spawn_driver;
erts_driver_t fd_driver;
@@ -89,36 +88,12 @@ static void driver_monitor_unlock_pdl(Port *p);
static ERTS_INLINE ErlIOQueue*
drvport2ioq(ErlDrvPort drvport)
{
- int ix = (int) drvport;
- erts_aint32_t state;
-
- if (ix < 0 || erts_max_ports <= ix)
- return NULL;
-
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
-
-#ifdef ERTS_ENABLE_LOCK_CHECK
-
- if (erts_get_scheduler_data()) {
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
- ERTS_LC_ASSERT(!erts_port[ix].port_data_lock
- || erts_lc_mtx_is_locked(
- &erts_port[ix].port_data_lock->mtx));
- }
- else {
- ERTS_LC_ASSERT((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
- || erts_port[ix].port_data_lock);
- ERTS_LC_ASSERT(!erts_port[ix].port_data_lock
- || erts_lc_mtx_is_locked(
- &erts_port[ix].port_data_lock->mtx));
- }
-
-#endif
-
+ Port *prt = erts_thr_drvport2port_raw(drvport);
+ erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return NULL;
else
- return &erts_port[ix].ioq;
+ return &prt->ioq;
}
static ERTS_INLINE int
@@ -196,27 +171,13 @@ typedef struct line_buf_context {
dtrace_port_str((PORT), port_str);
#endif
-/* The 'number' field in a port now has two parts: the lowest bits
- contain the index in the port table, and the higher bits are a counter
- which is incremented each time we look for a free port and start from
- the beginning of the table. erts_max_ports is the number of file descriptors,
- rounded up to a power of 2.
- To get the index from a port, use the macro 'internal_port_index';
- 'port_number' returns the whole number field.
-*/
-
-static erts_smp_spinlock_t get_free_port_lck;
-static Uint last_port_num;
-static Uint port_num_mask;
-erts_smp_atomic32_t erts_ports_snapshot; /* Identifies the _next_ snapshot (not the ongoing) */
-
-
static ERTS_INLINE void
kill_port(Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ erts_ptab_delete_element(&erts_port, &pp->common); /* Time of death */
erts_port_task_free_port(pp);
- ASSERT(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD);
+ /* In non-smp case the port structure may have been deallocated now */
}
#ifdef ERTS_SMP
@@ -227,149 +188,194 @@ erts_lc_is_port_locked(Port *prt)
{
if (!prt)
return 0;
+ ERTS_SMP_LC_ASSERT(prt->lock);
return erts_smp_lc_mtx_is_locked(prt->lock);
}
#endif
#endif /* #ifdef ERTS_SMP */
-static int
-get_free_port(void)
-{
- Uint num;
- Uint tries = erts_max_ports;
- Port* port;
-
- erts_smp_spin_lock(&get_free_port_lck);
- num = last_port_num + 1;
- for (;; ++num) {
- erts_aint32_t act;
-
- port = &erts_port[num & erts_port_tab_index_mask];
-
- act = erts_smp_atomic32_read_nob(&port->state);
-
- while (act & ERTS_PORT_SFLG_FREE) {
- erts_aint32_t exp = act;
- act = erts_smp_atomic32_cmpxchg_relb(&port->state,
- ERTS_PORT_SFLG_INITIALIZING,
- exp);
- if (act == exp) {
- last_port_num = num;
- erts_smp_spin_unlock(&get_free_port_lck);
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 0);
- erts_smp_atomic32_set_nob(&port->common.refc, 2); /* Port alive + lock */
- return num & port_num_mask;
- }
- }
+static void initq(Port* prt);
- if (--tries == 0) {
- erts_smp_spin_unlock(&get_free_port_lck);
- return -1;
- }
- }
+static void insert_port_struct(void *vprt, Eterm data)
+{
+ Port *prt = (Port *) vprt;
+ Eterm id = make_internal_port(data);
+#ifdef ERTS_SMP
+ ASSERT(prt->drv_ptr && prt->lock);
+ /*
+ * We are breaking lock order in the port specific locking
+ * case. This is, however, safe since the lock has not been
+ * published, yet.
+ */
+ if (!prt->drv_ptr->lock)
+ erts_mtx_init_locked_x(prt->lock, "port_lock", id);
+#endif
+ prt->common.id = id;
+ erts_atomic32_init_relb(&prt->state, ERTS_PORT_SFLG_INITIALIZING);
}
-/*
- * erts_test_next_port() is only used for testing.
- */
-Sint
-erts_test_next_port(int set, Uint next)
+static Port *create_port(char *name,
+ erts_driver_t *driver,
+ erts_mtx_t *driver_lock,
+ Eterm pid,
+ int *enop)
{
- Uint i, num;
- Sint res = -1;
+ Port *prt;
+ char *p;
+ size_t port_size, size;
+ erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
+#ifdef DEBUG
+ /* Make sure the debug flags survives until port is freed */
+ state |= ERTS_PORT_SFLG_PORT_DEBUG;
+#endif
- erts_smp_spin_lock(&get_free_port_lck);
- if (set) {
- last_port_num = (next - 1) & port_num_mask;
+#ifdef ERTS_SMP
+ if (!driver_lock) {
+ /* Align size for mutex following port struct */
+ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+ size += sizeof(erts_mtx_t);
}
- num = last_port_num + 1;
-
- for (i=0; i < erts_max_ports && res<0; ++i, ++num) {
-
- Port* port = &erts_port[num & erts_port_tab_index_mask];
+ else
+#endif
+ port_size = size = sizeof(Port);
- erts_aint32_t state = erts_smp_atomic32_read_nob(&port->state);
+ size += sys_strlen(name) + 1;
- if (state & ERTS_PORT_SFLG_FREE) {
- last_port_num = num - 1;
- res = num & port_num_mask;
- }
+ p = erts_alloc_fnf(ERTS_ALC_T_PORT, size);
+ if (!p) {
+ if (enop)
+ *enop = ENOMEM;
+ return NULL;
}
- erts_smp_spin_unlock(&get_free_port_lck);
- return res;
-}
-static void port_cleanup(Port *prt);
+ prt = (Port *) p;
+ p += port_size;
#ifdef ERTS_SMP
+ if (driver_lock) {
+ prt->lock = driver_lock;
+ erts_mtx_lock(driver_lock);
+ }
+ else {
+ prt->lock = (erts_mtx_t *) p;
+ p += sizeof(erts_mtx_t);
+ state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
+ }
+ erts_smp_atomic_set_nob(&prt->run_queue,
+ (erts_aint_t) erts_get_runq_current(NULL));
+ prt->xports = NULL;
+#else
+ erts_atomic32_init_nob(&prt->refc, 1);
+ prt->cleanup = 0;
+#endif
+
+ prt->name = p;
+ sys_strcpy(p, name);
+ prt->drv_ptr = driver;
+ ERTS_P_LINKS(prt) = NULL;
+ ERTS_P_MONITORS(prt) = NULL;
+ prt->linebuf = NULL;
+ prt->bp = NULL;
+ prt->suspended = NULL;
+ prt->data = am_undefined;
+ prt->port_data_lock = NULL;
+ prt->control_flags = 0;
+ prt->bytes_in = 0;
+ prt->bytes_out = 0;
+ prt->dist_entry = NULL;
+ prt->connected = pid;
+ prt->common.u.alive.reg = NULL;
+#ifdef ERTS_SMP
+ prt->common.u.alive.ptimer = NULL;
+#else
+ sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer));
+#endif
+ erts_port_task_handle_init(&prt->timeout_task);
+ erts_port_task_init_sched(&prt->sched);
+ prt->psd = NULL;
+ prt->drv_data = (SWord) 0;
-static void
-sched_port_cleanup(void *vprt)
-{
- Port *prt = (Port *) vprt;
- erts_smp_mtx_lock(prt->lock);
- port_cleanup(prt);
-}
+ /* Set default tracing */
+ erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt));
+
+ ASSERT(((char *) prt) == ((char *) &prt->common));
+ if (!erts_ptab_new_element(&erts_port,
+ &prt->common,
+ (void *) prt,
+ insert_port_struct)) {
+#ifdef ERTS_SMP
+ if (driver_lock)
+ erts_mtx_unlock(driver_lock);
#endif
+ if (enop)
+ *enop = 0;
+ return NULL;
+ }
+
+ ASSERT(prt == (Port *) (erts_ptab_pix2intptr_nob(
+ &erts_port,
+ internal_port_index(prt->common.id))));
+ initq(prt);
+
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
+ erts_atomic32_set_relb(&prt->state, state);
+ return prt;
+}
+
+#ifndef ERTS_SMP
void
erts_port_cleanup(Port *prt)
{
-#ifdef ERTS_SMP
- if (erts_smp_mtx_trylock(prt->lock) == EBUSY)
- erts_schedule_misc_op(sched_port_cleanup, (void *) prt);
- else
-#endif
- port_cleanup(prt);
+ if (prt->drv_ptr && prt->drv_ptr->handle)
+ erts_ddll_dereference_driver(prt->drv_ptr->handle);
+ prt->drv_ptr = NULL;
+ erts_port_dec_refc(prt);
}
+#endif
void
-port_cleanup(Port *prt)
+erts_port_free(Port *prt)
{
-#ifdef ERTS_SMP
- erts_smp_mtx_t *mtx;
-#endif
#if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK)
- erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state);
+ erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
#endif
- erts_driver_t *driver;
-
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- driver = prt->drv_ptr;
- prt->drv_ptr = NULL;
- ASSERT(driver);
-
- ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_FREE_SCHEDULED);
- ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
+ ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_FREE_SCHEDULED
+ | ERTS_PORT_SFLG_INITIALIZING));
+ ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE));
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) == 0);
-
#ifdef ERTS_SMP
- mtx = prt->lock;
- ASSERT(mtx);
-
- prt->lock = NULL;
-
- erts_smp_mtx_unlock(mtx);
+ ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0);
+#else
+ ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0);
#endif
- erts_smp_atomic32_set_relb(&prt->state, ERTS_PORT_SFLG_FREE);
-
#ifdef ERTS_SMP
- if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) {
- erts_smp_mtx_destroy(mtx);
- erts_free(ERTS_ALC_T_PORT_LOCK, mtx);
- }
-#endif
+ ASSERT(prt->lock);
+ if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
+ erts_mtx_destroy(prt->lock);
- if (driver->handle)
- erts_ddll_dereference_driver(driver->handle);
+ /*
+ * We cannot dereference a driver using driver
+ * locking until here in smp case. Otherwise,
+ * the driver lock may still be in use by others.
+ *
+ * In the non-smp case we cannot do it here since
+ * this function may be called by non-scheduler
+ * threads. This is done in erts_port_cleanup()
+ * in the non-smp case.
+ */
+ if (prt->drv_ptr->handle)
+ erts_ddll_dereference_driver(prt->drv_ptr->handle);
+#endif
+ erts_atomic32_set_nob(&prt->state, ERTS_PORT_SFLG_FREE);
+ erts_free(ERTS_ALC_T_PORT, prt);
}
-
/*
** Initialize v_start to point to the small fixed vector.
** Once (reallocated) we never reset the pointer to the small vector
@@ -416,73 +422,7 @@ static void stopq(Port* prt)
if (prt->port_data_lock) {
driver_pdl_unlock(prt->port_data_lock);
driver_pdl_dec_refc(prt->port_data_lock);
- prt->port_data_lock = NULL;
- }
-}
-
-
-
-static void
-setup_port(Port* prt, Eterm pid, erts_driver_t *driver,
- ErlDrvData drv_data, char *name, erts_aint32_t xstate)
-{
- ErtsRunQueue *runq = erts_get_runq_current(NULL);
- char *new_name, *old_name;
-#ifdef DEBUG
- /* Make sure the debug flags survives until port is freed */
- xstate |= ERTS_PORT_SFLG_PORT_DEBUG;
-#endif
- ASSERT(runq);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
-
- new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1);
- sys_strcpy(new_name, name);
- erts_smp_runq_lock(runq);
- prt->snapshot = erts_smp_atomic32_read_nob(&erts_ports_snapshot);
- old_name = prt->name;
- prt->name = new_name;
-#ifdef ERTS_SMP
- erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq);
-#endif
- ASSERT(!prt->drv_ptr);
- prt->drv_ptr = driver;
- erts_smp_atomic32_set_relb(&prt->state,
- ERTS_PORT_SFLG_CONNECTED | xstate);
- erts_smp_runq_unlock(runq);
-#ifdef ERTS_SMP
- ASSERT(!prt->xports);
-#endif
- if (old_name) {
- erts_free(ERTS_ALC_T_PORT_NAME, (void *) old_name);
}
-
- prt->control_flags = 0;
- prt->connected = pid;
- prt->drv_data = (SWord) drv_data;
- prt->bytes_in = 0;
- prt->bytes_out = 0;
- prt->dist_entry = NULL;
- prt->common.u.alive.reg = NULL;
-#ifdef ERTS_SMP
- prt->common.u.alive.ptimer = NULL;
-#else
- sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer));
-#endif
- erts_port_task_handle_init(&prt->timeout_task);
- prt->suspended = NULL;
- sys_strcpy(prt->name, name);
- ERTS_P_LINKS(prt) = NULL;
- ERTS_P_MONITORS(prt) = NULL;
- prt->linebuf = NULL;
- prt->bp = NULL;
- prt->data = am_undefined;
- /* Set default tracing */
- erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt));
-
- prt->psd = NULL;
-
- initq(prt);
}
void
@@ -493,7 +433,7 @@ erts_wake_process_later(Port *prt, Process *process)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (erts_smp_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD)
+ if (erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD)
return;
for (p = &(prt->suspended); *p != NULL; p = &((*p)->next))
@@ -513,36 +453,34 @@ erts_wake_process_later(Port *prt, Process *process)
(*error_number_ptr must contain either BADARG or SYSTEM_LIMIT).
The driver start function must obey the same conventions.
*/
-int
+Port *
erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
Eterm pid, /* Current process. */
char* name, /* Driver name. */
SysDriverOpts* opts, /* Options. */
- int *error_number_ptr) /* errno in case -2 is returned */
+ int *error_type_ptr, /* error type */
+ int *error_number_ptr) /* errno in case of error type -2 */
{
- int port_num;
- int port_ix;
+
+#undef ERTS_OPEN_DRIVER_RET
+#define ERTS_OPEN_DRIVER_RET(Prt, EType, ENo) \
+ do { \
+ if (error_type_ptr) \
+ *error_type_ptr = (EType); \
+ if (error_number_ptr) \
+ *error_number_ptr = (ENo); \
+ return (Prt); \
+ } while (0)
+
ErlDrvData drv_data = 0;
- erts_aint32_t xstate = 0;
Port *port;
int fpe_was_unmasked;
-
- if (error_number_ptr)
- *error_number_ptr = 0;
+ int error_type, error_number;
+ int port_errno = 0;
+ erts_mtx_t *driver_lock = NULL;
ERTS_SMP_CHK_NO_PROC_LOCKS;
- if ((port_num = get_free_port()) < 0) {
- if (error_number_ptr) {
- *error_number_ptr = SYSTEM_LIMIT;
- }
- return -3;
- }
-
- port_ix = port_num & erts_port_tab_index_mask;
- port = &erts_port[port_ix];
- port->common.id = make_internal_port(port_num);
-
erts_smp_mtx_lock(&erts_driver_list_lock);
if (!driver) {
for (driver = driver_list; driver; driver = driver->next) {
@@ -551,9 +489,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
if (!driver) {
erts_smp_mtx_unlock(&erts_driver_list_lock);
- if (error_number_ptr)
- *error_number_ptr = BADARG;
- return -3;
+ ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
}
}
if (driver == &spawn_driver) {
@@ -598,32 +534,11 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
if (driver == NULL || (driver != &spawn_driver && opts->exit_status)) {
erts_smp_mtx_unlock(&erts_driver_list_lock);
- if (error_number_ptr) {
- *error_number_ptr = BADARG;
- }
- /* Need to mark the port as free again */
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 2);
- erts_smp_atomic32_set_nob(&port->common.refc, 0);
- erts_smp_atomic32_set_relb(&port->state, ERTS_PORT_SFLG_FREE);
- return -3;
+ ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
}
- /*
- * We'll set up the port before calling the start function,
- * to allow message sending and setting timers in the start function.
- */
-
#ifdef ERTS_SMP
- ASSERT(!port->lock);
- port->lock = driver->lock;
- if (!port->lock) {
- port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK,
- sizeof(erts_smp_mtx_t));
- erts_smp_mtx_init_x(port->lock,
- "port_lock",
- port->common.id);
- xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
- }
+ driver_lock = driver->lock;
#endif
if (driver->handle != NULL) {
@@ -632,18 +547,32 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
erts_smp_mtx_unlock(&erts_driver_list_lock);
-#ifdef ERTS_SMP
- erts_smp_mtx_lock(port->lock);
-#endif
+ /*
+ * We'll set up the port before calling the start function,
+ * to allow message sending and setting timers in the start function.
+ */
- setup_port(port, pid, driver, drv_data, name, xstate);
+ port = create_port(name, driver, driver_lock, pid, &port_errno);
+ if (!port) {
+ if (driver->handle) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ erts_ddll_decrement_port_count(driver->handle);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ erts_ddll_dereference_driver(driver->handle);
+ }
+ if (port_errno)
+ ERTS_OPEN_DRIVER_RET(NULL, -2, port_errno);
+ else
+ ERTS_OPEN_DRIVER_RET(NULL, -3, SYSTEM_LIMIT);
+ }
if (IS_TRACED_FL(port, F_TRACE_PORTS)) {
trace_port_open(port,
pid,
am_atom_put(port->name, strlen(port->name)));
}
-
+
+ error_number = error_type = 0;
if (driver->start) {
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_in, am_start);
@@ -656,15 +585,28 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
}
#endif
fpe_was_unmasked = erts_block_fpe();
- drv_data = (*driver->start)((ErlDrvPort)(port_ix),
- name, opts);
+ drv_data = (*driver->start)((ErlDrvPort) port, name, opts);
+ if (((SWord) drv_data) == -1)
+ error_type = -1;
+ else if (((SWord) drv_data) == -2) {
+ /*
+ * We need to save errno quickly after the
+ * call to the 'start' callback before
+ * something else modify it.
+ */
+ error_type = -2;
+ error_number = errno;
+ }
+ else if (((SWord) drv_data) == -3) {
+ error_type = -3;
+ error_number = BADARG;
+ }
+
erts_unblock_fpe(fpe_was_unmasked);
port->caller = NIL;
if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) {
trace_sched_ports_where(port, am_out, am_start);
}
- if (error_number_ptr && ((SWord) drv_data) == (SWord) -2)
- *error_number_ptr = errno;
#ifdef ERTS_SMP
if (port->xports)
erts_smp_xports_unlock(port);
@@ -672,15 +614,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
#endif
}
- if (((SWord)drv_data) == -1 ||
- ((SWord)drv_data) == -2 ||
- ((SWord)drv_data) == -3) {
- int res = (int) ((SWord) drv_data);
-
- if (res == -3 && error_number_ptr) {
- *error_number_ptr = BADARG;
- }
-
+ if (error_type) {
/*
* Must clean up the port.
*/
@@ -690,7 +624,6 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
erts_cancel_timer(&(port->common.u.alive.tm));
#endif
stopq(port);
- kill_port(port);
if (port->linebuf != NULL) {
erts_free(ERTS_ALC_T_LINEBUF,
(void *) port->linebuf);
@@ -701,11 +634,14 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
erts_ddll_decrement_port_count(driver->handle);
erts_smp_mtx_unlock(&erts_driver_list_lock);
}
+ kill_port(port);
erts_port_release(port);
- return res;
+ ERTS_OPEN_DRIVER_RET(NULL, error_type, error_number);
}
- port->drv_data = (SWord) drv_data;
- return port_ix;
+ port->drv_data = (UWord) drv_data;
+ ERTS_OPEN_DRIVER_RET(port, 0, 0);
+
+#undef ERTS_OPEN_DRIVER_RET
}
#ifdef ERTS_SMP
@@ -734,15 +670,21 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
Port* port;
erts_driver_t *driver;
Process *rp;
- int port_num;
- Eterm port_id;
- erts_aint32_t xstate = 0;
+ erts_mtx_t *driver_lock = NULL;
ERTS_SMP_CHK_NO_PROC_LOCKS;
+ /* Need to be called from a scheduler thread */
+ if (!erts_get_scheduler_id())
+ return ERTS_INVALID_ERL_DRV_PORT;
+
creator_port = erts_drvport2port(creator_port_ix, NULL);
if (!creator_port)
- return (ErlDrvTermData) -1;
+ return ERTS_INVALID_ERL_DRV_PORT;
+
+ rp = erts_proc_lookup(pid);
+ if (!rp)
+ return ERTS_INVALID_ERL_DRV_PORT;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(creator_port));
@@ -750,55 +692,61 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
erts_smp_mtx_lock(&erts_driver_list_lock);
if (!erts_ddll_driver_ok(driver->handle)) {
erts_smp_mtx_unlock(&erts_driver_list_lock);
- return (ErlDrvTermData) -1;
+ return ERTS_INVALID_ERL_DRV_PORT;
}
- rp = erts_pid2proc(NULL, 0, pid, ERTS_PROC_LOCK_LINK);
- if (!rp) {
- erts_smp_mtx_unlock(&erts_driver_list_lock);
- return (ErlDrvTermData) -1; /* pid does not exist */
+ if (driver->handle != NULL) {
+ erts_ddll_increment_port_count(driver->handle);
+ erts_ddll_reference_referenced_driver(driver->handle);
+ }
+
+#ifdef ERTS_SMP
+ driver_lock = driver->lock;
+#endif
+
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+
+ port = create_port(name, driver, driver_lock, pid, NULL);
+ if (!port) {
+ if (driver->handle) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ erts_ddll_decrement_port_count(driver->handle);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ erts_ddll_dereference_driver(driver->handle);
+ }
+ return ERTS_INVALID_ERL_DRV_PORT;
}
- if ((port_num = get_free_port()) < 0) {
- errno = SYSTEM_LIMIT;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port));
+
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
+ if (ERTS_PROC_IS_EXITING(rp)) {
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- erts_smp_mtx_unlock(&erts_driver_list_lock);
- return (ErlDrvTermData) -1;
+ if (driver->handle) {
+ erts_smp_mtx_lock(&erts_driver_list_lock);
+ erts_ddll_decrement_port_count(driver->handle);
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ }
+ kill_port(port);
+ erts_port_release(port);
+ return ERTS_INVALID_ERL_DRV_PORT;
}
- port_id = make_internal_port(port_num);
- port = &erts_port[port_num & erts_port_tab_index_mask];
+ erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid);
+ erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port->common.id);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
#ifdef ERTS_SMP
- ASSERT(!port->lock);
- port->lock = driver->lock;
- if (!port->lock) {
+ if (!driver_lock) {
ErtsXPortsList *xplp = xports_list_alloc();
xplp->port = port;
xplp->next = creator_port->xports;
creator_port->xports = xplp;
- port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK,
- sizeof(erts_smp_mtx_t));
- erts_smp_mtx_init_locked_x(port->lock, "port_lock", port_id);
- xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK;
}
-
#endif
- if (driver->handle != NULL) {
- erts_ddll_increment_port_count(driver->handle);
- erts_ddll_reference_referenced_driver(driver->handle);
- }
- erts_smp_mtx_unlock(&erts_driver_list_lock);
+ port->drv_data = (UWord) drv_data;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port));
-
- setup_port(port, pid, driver, drv_data, name, xstate);
- port->common.id = port_id;
-
- erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid);
- erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port_id);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- return port_num & erts_port_tab_index_mask;
+ return (ErlDrvPort) port;
}
#ifdef ERTS_SMP
@@ -1282,15 +1230,22 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
}
}
+#ifdef ERTS_SMP
+static void
+release_port(void *vport)
+{
+ erts_port_dec_refc((Port *) vport);
+}
+#endif
+
/* initialize the port array */
void init_io(void)
{
- int i;
ErlDrvEntry** dp;
char maxports[21]; /* enough for any 64-bit integer */
size_t maxportssize = sizeof(maxports);
Uint ports_bits = ERTS_PORTS_BITS;
- Sint port_extra_shift;
+ int port_tab_size;
#ifdef ERTS_SMP
init_xports_list_alloc();
@@ -1299,66 +1254,39 @@ void init_io(void)
pdl_init();
if (erts_sys_getenv("ERL_MAX_PORTS", maxports, &maxportssize) == 0)
- erts_max_ports = atoi(maxports);
+ port_tab_size = atoi(maxports);
else
- erts_max_ports = sys_max_files();
+ port_tab_size = sys_max_files();
- if (erts_max_ports > ERTS_MAX_PORTS)
- erts_max_ports = ERTS_MAX_PORTS;
- if (erts_max_ports < 1024)
- erts_max_ports = 1024;
+ if (port_tab_size > ERTS_MAX_PORTS)
+ port_tab_size = ERTS_MAX_PORTS;
+ if (port_tab_size < 1024)
+ port_tab_size = 1024;
if (erts_use_r9_pids_ports) {
ports_bits = ERTS_R9_PORTS_BITS;
- if (erts_max_ports > ERTS_MAX_R9_PORTS)
- erts_max_ports = ERTS_MAX_R9_PORTS;
+ if (port_tab_size > ERTS_MAX_R9_PORTS)
+ port_tab_size = ERTS_MAX_R9_PORTS;
}
- port_extra_shift = erts_fit_in_bits_int32(erts_max_ports - 1);
- port_num_mask = (1 << ports_bits) - 1;
-
- erts_port_tab_index_mask = ~(~((Uint) 0) << port_extra_shift);
- erts_max_ports = 1 << port_extra_shift;
-
erts_smp_mtx_init(&erts_driver_list_lock,"driver_list");
driver_list = NULL;
erts_smp_tsd_key_create(&driver_list_lock_status_key);
erts_smp_tsd_key_create(&driver_list_last_error_key);
- if (erts_max_ports * sizeof(Port) <= erts_max_ports) {
- /* More memory needed than the whole address space. */
- erts_alloc_enomem(ERTS_ALC_T_PORT_TABLE, ~((Uint) 0));
- }
-
- erts_port = (Port *) erts_alloc(ERTS_ALC_T_PORT_TABLE,
- erts_max_ports * sizeof(Port));
-
- erts_smp_atomic_init_nob(&erts_bytes_out, 0);
- erts_smp_atomic_init_nob(&erts_bytes_in, 0);
-
- for (i = 0; i < erts_max_ports; i++) {
- erts_port_task_init_sched(&erts_port[i].sched);
- erts_smp_atomic32_init_nob(&erts_port[i].common.refc, 0);
+ erts_ptab_init_table(&erts_port,
+ ERTS_ALC_T_PORT_TABLE,
#ifdef ERTS_SMP
- erts_port[i].lock = NULL;
- erts_port[i].xports = NULL;
- erts_smp_spinlock_init_x(&erts_port[i].state_lck, "port_state", make_small(i));
+ release_port,
+#else
+ NULL,
#endif
- ERTS_TRACER_PROC(&erts_port[i]) = NIL;
- ERTS_TRACE_FLAGS(&erts_port[i]) = 0;
-
- erts_port[i].drv_ptr = NULL;
- erts_smp_atomic32_init_nob(&erts_port[i].state, ERTS_PORT_SFLG_FREE);
- erts_port[i].name = NULL;
- ERTS_P_LINKS(&erts_port[i]) = NULL;
- ERTS_P_MONITORS(&erts_port[i]) = NULL;
- erts_port[i].linebuf = NULL;
- erts_port[i].port_data_lock = NULL;
- }
+ (ErtsPTabElementCommon *) &erts_invalid_port.common,
+ port_tab_size,
+ "port_table");
- erts_smp_atomic32_init_nob(&erts_ports_snapshot, (erts_aint32_t) 0);
- last_port_num = 0;
- erts_smp_spinlock_init(&get_free_port_lck, "get_free_port");
+ erts_smp_atomic_init_nob(&erts_bytes_out, 0);
+ erts_smp_atomic_init_nob(&erts_bytes_in, 0);
sys_init_io();
@@ -1769,7 +1697,7 @@ deliver_vec_message(Port* prt, /* Port */
if (!rp)
return;
- state = erts_smp_atomic32_read_nob(&prt->state);
+ state = erts_atomic32_read_nob(&prt->state);
/*
* Calculate the exact number of heap words needed.
*/
@@ -1907,7 +1835,7 @@ static void flush_port(Port *p)
ASSERT(!p->xports);
#endif
}
- if ((erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0
+ if ((erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0
&& is_port_ioq_empty(p)) {
terminate_port(p);
}
@@ -1929,8 +1857,8 @@ terminate_port(Port *prt)
ASSERT(!ERTS_P_MONITORS(prt));
/* state may be altered by kill_port() below */
- state = erts_smp_atomic32_read_band_nob(&prt->state,
- ~ERTS_PORT_SFLG_SEND_CLOSED);
+ state = erts_atomic32_read_band_nob(&prt->state,
+ ~ERTS_PORT_SFLG_SEND_CLOSED);
if (state & ERTS_PORT_SFLG_SEND_CLOSED) {
send_closed_port_id = prt->common.id;
connected_id = prt->connected;
@@ -1981,6 +1909,8 @@ terminate_port(Port *prt)
if (prt->psd)
erts_free(ERTS_ALC_T_PRTSD, prt->psd);
+ ASSERT(prt->dist_entry == NULL);
+
kill_port(prt);
/*
@@ -1989,13 +1919,11 @@ terminate_port(Port *prt)
*/
if ((state & ERTS_PORT_SFLG_HALT)
&& (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) {
- erts_smp_port_unlock(prt); /* We will exit and never return */
+ erts_port_release(prt); /* We will exit and never return */
erl_exit_flush_async(erts_halt_code, "");
}
if (is_internal_port(send_closed_port_id))
deliver_result(send_closed_port_id, connected_id, am_closed);
-
- ASSERT(prt->dist_entry == NULL);
}
void
@@ -2130,7 +2058,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
}
#endif
- state = erts_smp_atomic32_read_nob(&p->state);
+ state = erts_atomic32_read_nob(&p->state);
if ((state & (ERTS_PORT_SFLGS_DEAD
| ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_IMMORTAL))
@@ -2149,12 +2077,12 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
* Setting the port to not busy here, frees the list of pending
* processes and makes them runnable.
*/
- set_busy_port((ErlDrvPort)internal_port_index(p->common.id), 0);
+ set_busy_port((ErlDrvPort) p, 0);
if (p->common.u.alive.reg != NULL)
(void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name);
- state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING);
+ state = erts_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING);
{
SweepContext sc = {p->common.id, rreason};
@@ -2174,16 +2102,16 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
erts_do_net_exits(p->dist_entry, rreason);
erts_deref_dist_entry(p->dist_entry);
p->dist_entry = NULL;
- erts_smp_atomic32_read_band_relb(&p->state,
- ~ERTS_PORT_SFLG_DISTRIBUTION);
+ erts_atomic32_read_band_relb(&p->state,
+ ~ERTS_PORT_SFLG_DISTRIBUTION);
}
if ((reason != am_kill) && !is_port_ioq_empty(p)) {
/* must turn exiting flag off */
- erts_smp_atomic32_read_bset_relb(&p->state,
- (ERTS_PORT_SFLG_EXITING
- | ERTS_PORT_SFLG_CLOSING),
- ERTS_PORT_SFLG_CLOSING);
+ erts_atomic32_read_bset_relb(&p->state,
+ (ERTS_PORT_SFLG_EXITING
+ | ERTS_PORT_SFLG_CLOSING),
+ ERTS_PORT_SFLG_CLOSING);
flush_port(p);
}
else {
@@ -2232,8 +2160,8 @@ void erts_port_command(Process *proc,
if ((pid = port->connected) == tp[1]) {
/* PID must be connected */
if (tp[2] == am_close) {
- erts_smp_atomic32_read_bor_relb(&port->state,
- ERTS_PORT_SFLG_SEND_CLOSED);
+ erts_atomic32_read_bor_relb(&port->state,
+ ERTS_PORT_SFLG_SEND_CLOSED);
erts_do_exit_port(port, pid, am_normal);
#ifdef USE_VM_PROBES
@@ -2456,16 +2384,15 @@ static void prt_one_lnk(ErtsLink *lnk, void *vprtd)
}
void
-print_port_info(int to, void *arg, int i)
+print_port_info(Port *p, int to, void *arg)
{
- Port* p = &erts_port[i];
- erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
+ erts_aint32_t state = erts_atomic32_read_nob(&p->state);
if (state & ERTS_PORT_SFLGS_DEAD)
return;
erts_print(to, arg, "=port:%T\n", p->common.id);
- erts_print(to, arg, "Slot: %d\n", i);
+ erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id));
if (state & ERTS_PORT_SFLG_CONNECTED) {
erts_print(to, arg, "Connected: %T", p->connected);
erts_print(to, arg, "\n");
@@ -2505,43 +2432,46 @@ print_port_info(int to, void *arg, int i)
void
set_busy_port(ErlDrvPort port_num, int on)
{
+ Port *prt;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(port_str, 16);
#endif
ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
+ prt = erts_drvport2port_raw(port_num);
+ if (!prt)
+ return;
if (on) {
- erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state,
- ERTS_PORT_SFLG_PORT_BUSY);
+ erts_atomic32_read_bor_relb(&prt->state,
+ ERTS_PORT_SFLG_PORT_BUSY);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_busy)) {
erts_snprintf(port_str, sizeof(port_str),
- "%T", erts_port[port_num].id);
+ "%T", prt->id);
DTRACE1(port_busy, port_str);
}
#endif
} else {
- ErtsProcList* plp = erts_port[port_num].suspended;
- erts_smp_atomic32_read_band_relb(&erts_port[port_num].state,
- ~ERTS_PORT_SFLG_PORT_BUSY);
- erts_port[port_num].suspended = NULL;
+ ErtsProcList* plp = prt->suspended;
+ erts_atomic32_read_band_relb(&prt->state,
+ ~ERTS_PORT_SFLG_PORT_BUSY);
+ prt->suspended = NULL;
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(port_not_busy)) {
erts_snprintf(port_str, sizeof(port_str),
- "%T", erts_port[port_num].id);
+ "%T", prt->id);
DTRACE1(port_not_busy, port_str);
}
#endif
- if (erts_port[port_num].dist_entry) {
+ if (prt->dist_entry) {
/*
* Processes suspended on distribution ports are
* normally queued on the dist entry.
*/
- erts_dist_port_not_busy(&erts_port[port_num]);
+ erts_dist_port_not_busy(prt);
}
/*
@@ -2587,10 +2517,9 @@ set_busy_port(ErlDrvPort port_num, int on)
void set_port_control_flags(ErlDrvPort port_num, int flags)
{
-
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
-
- erts_port[port_num].control_flags = flags;
+ Port *prt = erts_drvport2port_raw(port_num);
+ if (prt)
+ prt->control_flags = flags;
}
int get_port_flags(ErlDrvPort ix)
@@ -2649,7 +2578,7 @@ int async_ready(Port *p, void* data)
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (p) {
- erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
+ erts_aint32_t state = erts_atomic32_read_nob(&p->state);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
ASSERT(!(state & ERTS_PORT_SFLGS_DEAD));
if (p->drv_ptr->ready_async != NULL) {
@@ -2697,7 +2626,7 @@ erts_stale_drv_select(Eterm port,
int deselect)
{
char *type;
- ErlDrvPort drv_port = internal_port_index(port);
+ ErlDrvPort drv_port = (ErlDrvPort) erts_port_lookup_raw(port);
ErtsPortNames *pnp = erts_get_port_names(port);
erts_dsprintf_buf_t *dsbufp;
@@ -2737,16 +2666,16 @@ erts_stale_drv_select(Eterm port,
ErtsPortNames *
erts_get_port_names(Eterm id)
{
+ Port *prt = erts_port_lookup_raw(id);
ErtsPortNames *pnp;
ASSERT(is_nil(id) || is_internal_port(id));
-
- if (is_not_internal_port(id)) {
+
+ if (!prt) {
pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, sizeof(ErtsPortNames));
pnp->name = NULL;
pnp->driver_name = NULL;
}
else {
- Port* prt = &erts_port[internal_port_index(id)];
int do_realloc = 1;
int len = -1;
size_t pnp_len = sizeof(ErtsPortNames);
@@ -2762,17 +2691,10 @@ erts_get_port_names(Eterm id)
pnp_len = sizeof(ErtsPortNames) + len;
pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len);
}
- erts_smp_port_minor_lock(prt);
- if (id != prt->common.id) {
- len = nlen = 0;
- name = driver_name = NULL;
- }
- else {
- name = prt->name;
- len = nlen = name ? sys_strlen(name) + 1 : 0;
- driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL);
- len += driver_name ? sys_strlen(driver_name) + 1 : 0;
- }
+ name = prt->name;
+ len = nlen = name ? sys_strlen(name) + 1 : 0;
+ driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL);
+ len += driver_name ? sys_strlen(driver_name) + 1 : 0;
if (len <= pnp_len - sizeof(ErtsPortNames)) {
if (!name)
pnp->name = NULL;
@@ -2790,7 +2712,6 @@ erts_get_port_names(Eterm id)
}
do_realloc = 0;
}
- erts_smp_port_minor_unlock(prt);
} while (do_realloc);
}
return pnp;
@@ -2827,7 +2748,7 @@ ErlDrvTermData driver_mk_term_nil(void)
return driver_term_nil;
}
-void driver_report_exit(int ix, int status)
+void driver_report_exit(ErlDrvPort ix, int status)
{
Port* prt = erts_drvport2port(ix, NULL);
Eterm* hp;
@@ -2868,27 +2789,6 @@ void driver_report_exit(int ix, int status)
erts_smp_proc_dec_refc(rp);
}
-
-static ERTS_INLINE int
-deliver_term_check_port(ErlDrvPort drvport)
-{
- int res;
- int ix = (int) drvport;
- if (ix < 0 || erts_max_ports <= ix)
- res = -1; /* invalid */
- else {
- Port* prt = &erts_port[ix];
- erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state);
- if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
- res = 1; /* ok */
- else if (state & ERTS_PORT_SFLG_CLOSING)
- res = 0; /* closing */
- else
- res = -1; /* invalid (dead) */
- }
- return res;
-}
-
#define ERTS_B2T_STATES_DEF_STATES_SZ 5
#define ERTS_B2T_STATES_DEF_STATES_INC 100
@@ -2976,10 +2876,7 @@ cleanup_b2t_states(struct b2t_states__ *b2tsp)
*/
static int
-driver_deliver_term(ErlDrvPort port,
- Eterm to,
- ErlDrvTermData* data,
- int len)
+driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
{
#define ERTS_DDT_FAIL do { res = -1; goto done; } while (0)
Uint need = 0;
@@ -3181,11 +3078,8 @@ driver_deliver_term(ErlDrvPort port,
b2t.ix = 0;
/*
- * The term is OK. Go ahead and validate the port and process.
+ * The term is OK. Go ahead and validate the process.
*/
- res = deliver_term_check_port(port);
- if (res <= 0)
- goto done;
/*
* Increase refc on proc if done from a non-scheduler thread.
@@ -3449,25 +3343,29 @@ driver_deliver_term(ErlDrvPort port,
#undef ERTS_DDT_FAIL
}
-
int
-driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len)
+driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len)
{
- Port* prt = erts_drvport2port(ix, NULL);
-
+ erts_aint32_t state;
+ Port* prt = erts_drvport2port(drvport, &state);
+ if (!prt)
+ return -1; /* invalid (dead) */
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
- if (prt == NULL)
- return -1;
- return driver_deliver_term(ix, prt->connected, data, len);
+ if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP))
+ return driver_deliver_term(prt->connected, data, len);
+ else if (state & ERTS_PORT_SFLG_CLOSING)
+ return 0; /* closing */
+ else
+ return -1; /* invalid (dead) */
}
int
driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len)
{
- return driver_deliver_term(ix, to, data, len);
+ /* driver_send_term() assume port is ok... */
+ return driver_deliver_term(to, data, len);
}
@@ -3817,6 +3715,7 @@ static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl)
{
ERTS_LC_ASSERT(driver_pdl_get_refc(pdl) == 0);
erts_mtx_destroy(&pdl->mtx);
+ erts_port_dec_refc(pdl->prt);
erts_free(ERTS_ALC_T_PORT_DATA_LOCK, pdl);
}
@@ -3861,6 +3760,8 @@ driver_pdl_create(ErlDrvPort dp)
sizeof(struct erl_drv_port_data_lock));
erts_mtx_init(&pdl->mtx, "port_data_lock");
pdl_init_refc(pdl);
+ erts_port_inc_refc(pp);
+ pdl->prt = pp;
pp->port_data_lock = pdl;
#ifdef HARDDEBUG
erts_fprintf(stderr, "driver_pdl_create(%T) -> 0x%08X\r\n",pp->common.id,(unsigned) pdl);
@@ -4317,7 +4218,7 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t)
if (prt == NULL)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+
if (prt->drv_ptr->timeout == NULL)
return -1;
drv_cancel_timer(prt);
@@ -4424,7 +4325,7 @@ static int do_driver_monitor_process(Port *prt,
/*
* This can be called from a non scheduler thread iff a port_data_lock exists
*/
-int driver_monitor_process(ErlDrvPort port,
+int driver_monitor_process(ErlDrvPort drvport,
ErlDrvTermData process,
ErlDrvMonitor *monitor)
{
@@ -4434,15 +4335,12 @@ int driver_monitor_process(ErlDrvPort port,
#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
- int ix = (int) port;
- if (ix < 0 || erts_max_ports <= ix) {
- return -1;
- }
- prt = &erts_port[ix];
+
+ prt = erts_thr_drvport2port_raw(drvport);
DRV_MONITOR_LOCK_PDL(prt);
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
@@ -4509,7 +4407,7 @@ static int do_driver_demonitor_process(Port *prt, Eterm *buf,
return 0;
}
-int driver_demonitor_process(ErlDrvPort port,
+int driver_demonitor_process(ErlDrvPort drvport,
const ErlDrvMonitor *monitor)
{
Port *prt;
@@ -4518,15 +4416,12 @@ int driver_demonitor_process(ErlDrvPort port,
#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
- int ix = (int) port;
- if (ix < 0 || erts_max_ports <= ix) {
- return -1;
- }
- prt = &erts_port[ix];
+
+ prt = erts_thr_drvport2port_raw(drvport);
DRV_MONITOR_LOCK_PDL(prt);
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
@@ -4575,7 +4470,7 @@ static ErlDrvTermData do_driver_get_monitored_process(Port *prt, Eterm *buf,
}
-ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
+ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport,
const ErlDrvMonitor *monitor)
{
Port *prt;
@@ -4584,15 +4479,12 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK))
ErtsSchedulerData *sched = erts_get_scheduler_data();
#endif
- int ix = (int) port;
- if (ix < 0 || erts_max_ports <= ix) {
- return driver_term_nil;
- }
- prt = &erts_port[ix];
+
+ prt = erts_thr_drvport2port_raw(drvport);
DRV_MONITOR_LOCK_PDL(prt);
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
DRV_MONITOR_UNLOCK_PDL(prt);
return driver_term_nil;
@@ -4711,8 +4603,6 @@ int driver_exit(ErlDrvPort ix, int err)
if (prt == NULL)
return -1;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK);
if (rp) {
rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id);
@@ -5007,7 +4897,7 @@ no_event_callback(ErlDrvData drv_data, ErlDrvEvent event, ErlDrvEventData event_
{
Port *prt = get_current_port();
report_missing_drv_callback(prt, "Event", "event()");
- driver_event((ErlDrvPort) internal_port_index(prt->common.id), event, NULL);
+ driver_event((ErlDrvPort) prt, event, NULL);
}
static void
@@ -5015,7 +4905,7 @@ no_ready_input_callback(ErlDrvData drv_data, ErlDrvEvent event)
{
Port *prt = get_current_port();
report_missing_drv_callback(prt, "Input", "ready_input()");
- driver_select((ErlDrvPort) internal_port_index(prt->common.id), event,
+ driver_select((ErlDrvPort) prt, event,
(ERL_DRV_READ | ERL_DRV_USE_NO_CALLBACK), 0);
}
@@ -5024,7 +4914,7 @@ no_ready_output_callback(ErlDrvData drv_data, ErlDrvEvent event)
{
Port *prt = get_current_port();
report_missing_drv_callback(prt, "Output", "ready_output()");
- driver_select((ErlDrvPort) internal_port_index(prt->common.id), event,
+ driver_select((ErlDrvPort) prt, event,
(ERL_DRV_WRITE | ERL_DRV_USE_NO_CALLBACK), 0);
}
@@ -5059,13 +4949,13 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
drv->lock = NULL;
else {
drv->lock = erts_alloc(ERTS_ALC_T_DRIVER_LOCK,
- sizeof(erts_smp_mtx_t));
- erts_smp_mtx_init_x(drv->lock,
- "driver_lock",
+ sizeof(erts_mtx_t));
+ erts_mtx_init_x(drv->lock,
+ "driver_lock",
#if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT)
- am_atom_put(drv->name, sys_strlen(drv->name))
+ am_atom_put(drv->name, sys_strlen(drv->name))
#else
- NIL
+ NIL
#endif
);
}
diff --git a/erts/emulator/beam/register.c b/erts/emulator/beam/register.c
index feae745749..757e2800e6 100644
--- a/erts/emulator/beam/register.c
+++ b/erts/emulator/beam/register.c
@@ -182,7 +182,7 @@ int erts_register_name(Process *c_p, Eterm name, Eterm id)
return res;
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN);
if (is_internal_port(id)) {
- port = erts_id2port(id, NULL, 0);
+ port = erts_id2port(id);
if (!port)
goto done;
}
@@ -241,7 +241,7 @@ int erts_register_name(Process *c_p, Eterm name, Eterm id)
done:
reg_write_unlock();
if (port)
- erts_smp_port_unlock(port);
+ erts_port_release(port);
if (c_p != proc) {
if (proc)
erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_MAIN);
@@ -397,16 +397,14 @@ erts_whereis_name(Process *c_p,
if (!rp || !rp->pt)
*port = NULL;
else {
-#ifndef ERTS_SMP
- erts_smp_atomic32_inc_nob(&rp->pt->common.refc);
-#else
+#ifdef ERTS_SMP
if (pending_port == rp->pt)
pending_port = NULL;
else {
if (pending_port) {
/* Ahh! Registered port changed while reg lock
was unlocked... */
- erts_smp_port_unlock(pending_port);
+ erts_port_release(pending_port);
pending_port = NULL;
}
@@ -418,7 +416,7 @@ erts_whereis_name(Process *c_p,
current_c_p_locks = 0;
}
reg_read_unlock();
- pending_port = erts_id2port(id, NULL, 0);
+ pending_port = erts_id2port(id);
goto restart;
}
}
@@ -432,7 +430,7 @@ erts_whereis_name(Process *c_p,
if (c_p && !current_c_p_locks)
erts_smp_proc_lock(c_p, c_p_locks);
if (pending_port)
- erts_smp_port_unlock(pending_port);
+ erts_port_release(pending_port);
#endif
reg_read_unlock();
@@ -506,12 +504,10 @@ int erts_unregister_name(Process *c_p,
if ((rp = (RegProc*) hash_get(&process_reg, (void*) &r)) != NULL) {
if (rp->pt) {
if (port != rp->pt) {
-#ifndef ERTS_SMP
- erts_smp_atomic32_inc_nob(&rp->pt->common.refc);
-#else
+#ifdef ERTS_SMP
if (port) {
ASSERT(port != c_prt);
- erts_smp_port_unlock(port);
+ erts_port_release(port);
port = NULL;
}
@@ -523,7 +519,7 @@ int erts_unregister_name(Process *c_p,
current_c_p_locks = 0;
}
reg_write_unlock();
- port = erts_id2port(id, NULL, 0);
+ port = erts_id2port(id);
goto restart;
}
#endif
@@ -569,7 +565,7 @@ int erts_unregister_name(Process *c_p,
reg_write_unlock();
if (c_prt != port) {
if (port) {
- erts_smp_port_unlock(port);
+ erts_port_release(port);
}
if (c_prt) {
erts_smp_port_lock(c_prt);
diff --git a/erts/emulator/beam/register.h b/erts/emulator/beam/register.h
index 38e8cfbf28..7170463375 100644
--- a/erts/emulator/beam/register.h
+++ b/erts/emulator/beam/register.h
@@ -24,26 +24,19 @@
#ifndef __REGPROC_H__
#define __REGPROC_H__
-#ifndef __SYS_H__
#include "sys.h"
-#endif
-
-#ifndef __HASH_H__
#include "hash.h"
-#endif
-
-#ifndef __PROCESS_H__
#include "erl_process.h"
-#endif
-
-struct port;
+#define ERL_PORT_GET_PORT_TYPE_ONLY__
+#include "erl_port.h"
+#undef ERL_PORT_GET_PORT_TYPE_ONLY__
typedef struct reg_proc
{
HashBucket bucket; /* MUST BE LOCATED AT TOP OF STRUCT!!! */
Process *p; /* The process registered (only one of this and
'pt' is non-NULL */
- struct port *pt; /* The port registered */
+ Port *pt; /* The port registered */
Eterm name; /* Atom name */
} RegProc;
@@ -55,12 +48,12 @@ int erts_register_name(Process *, Eterm, Eterm);
Eterm erts_whereis_name_to_id(Process *, Eterm);
void erts_whereis_name(Process *, ErtsProcLocks,
Eterm, Process**, ErtsProcLocks, int,
- struct port**);
+ Port**);
Process *erts_whereis_process(Process *,
ErtsProcLocks,
Eterm,
ErtsProcLocks,
int);
-int erts_unregister_name(Process *, ErtsProcLocks, struct port *, Eterm);
+int erts_unregister_name(Process *, ErtsProcLocks, Port *, Eterm);
#endif
diff --git a/erts/emulator/hipe/hipe_bif_list.m4 b/erts/emulator/hipe/hipe_bif_list.m4
index 942fa0c5cb..8a430bf561 100644
--- a/erts/emulator/hipe/hipe_bif_list.m4
+++ b/erts/emulator/hipe/hipe_bif_list.m4
@@ -145,6 +145,7 @@
* Zero-arity BIFs that can fail.
*/
standard_bif_interface_0(nbif_processes_0, processes_0)
+standard_bif_interface_0(nbif_ports_0, ports_0)
/*
* BIFs and primops that may do a GC (change heap limit and walk the native stack).
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c
index 5d814e0164..9ef861bfe4 100644
--- a/erts/emulator/sys/common/erl_check_io.c
+++ b/erts/emulator/sys/common/erl_check_io.c
@@ -201,17 +201,6 @@ static void event_large_fd_error(ErlDrvPort, ErtsSysFdType, ErlDrvEventData);
#endif
static void steal_pending_stop_select(erts_dsprintf_buf_t*, ErlDrvPort,
ErtsDrvEventState*, int mode, int on);
-static ERTS_INLINE Eterm
-drvport2id(ErlDrvPort dp)
-{
- Port *pp = erts_drvport2port(dp, NULL);
- if (pp)
- return pp->common.id;
- else {
- ASSERT(0);
- return am_undefined;
- }
-}
#ifdef ERTS_SMP
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST)
@@ -491,7 +480,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
int on)
{
void (*stop_select_fn)(ErlDrvEvent, void*) = NULL;
- Eterm id = drvport2id(ix);
+ Eterm id = erts_drvport2id(ix);
ErtsSysFdType fd = (ErtsSysFdType) e;
ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
ErtsPollEvents new_events, old_events;
@@ -718,7 +707,7 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
ErtsPollEvents events;
ErtsPollEvents add_events;
ErtsPollEvents remove_events;
- Eterm id = drvport2id(ix);
+ Eterm id = erts_drvport2id(ix);
ErtsDrvEventState *state;
int do_wake = 0;
int ret;
diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c
index 57ffb9f151..ea98af26bb 100644
--- a/erts/emulator/sys/unix/sys.c
+++ b/erts/emulator/sys/unix/sys.c
@@ -1023,7 +1023,8 @@ void fini_getenv_state(GETENV_STATE *state)
/* This data is shared by these drivers - initialized by spawn_init() */
static struct driver_data {
- int port_num, ofd, packet_bytes;
+ ErlDrvPort port_num;
+ int ofd, packet_bytes;
ErtsSysReportExit *report_exit;
int pid;
int alive;
@@ -1137,7 +1138,7 @@ static RETSIGTYPE onchld(int signum)
#endif
}
-static int set_driver_data(int port_num,
+static int set_driver_data(ErlDrvPort port_num,
int ifd,
int ofd,
int packet_bytes,
@@ -1153,7 +1154,7 @@ static int set_driver_data(int port_num,
report_exit = erts_alloc(ERTS_ALC_T_PRT_REP_EXIT,
sizeof(ErtsSysReportExit));
report_exit->next = report_exit_list;
- report_exit->port = erts_port[port_num].common.id;
+ report_exit->port = erts_drvport2id(port_num);
report_exit->pid = pid;
report_exit->ifd = read_write & DO_READ ? ifd : -1;
report_exit->ofd = read_write & DO_WRITE ? ofd : -1;
@@ -1234,7 +1235,7 @@ static void close_pipes(int ifd[2], int ofd[2], int read_write)
}
}
-static void init_fd_data(int fd, int prt)
+static void init_fd_data(int fd, ErlDrvPort port_num)
{
fd_data[fd].buf = NULL;
fd_data[fd].cpos = NULL;
@@ -1922,7 +1923,7 @@ static void clear_fd_data(int fd)
fd_data[fd].psz = 0;
}
-static void nbio_stop_fd(int prt, int fd)
+static void nbio_stop_fd(ErlDrvPort prt, int fd)
{
driver_select(prt,fd,DO_READ|DO_WRITE,0);
clear_fd_data(fd);
@@ -1970,7 +1971,8 @@ static ErlDrvData vanilla_start(ErlDrvPort port_num, char* name,
static void stop(ErlDrvData fd)
{
- int prt, ofd;
+ ErlDrvPort prt;
+ int ofd;
prt = driver_data[(int)(long)fd].port_num;
nbio_stop_fd(prt, (int)(long)fd);
@@ -1983,7 +1985,7 @@ static void stop(ErlDrvData fd)
CHLD_STAT_LOCK;
- /* Mark as unused. Maybe resetting the 'port_num' slot is better? */
+ /* Mark as unused. */
driver_data[(int)(long)fd].pid = -1;
CHLD_STAT_UNLOCK;
@@ -1999,7 +2001,7 @@ static void stop(ErlDrvData fd)
static void outputv(ErlDrvData e, ErlIOVec* ev)
{
int fd = (int)(long)e;
- int ix = driver_data[fd].port_num;
+ ErlDrvPort ix = driver_data[fd].port_num;
int pb = driver_data[fd].packet_bytes;
int ofd = driver_data[fd].ofd;
ssize_t n;
@@ -2049,7 +2051,7 @@ static void outputv(ErlDrvData e, ErlIOVec* ev)
static void output(ErlDrvData e, char* buf, ErlDrvSizeT len)
{
int fd = (int)(long)e;
- int ix = driver_data[fd].port_num;
+ ErlDrvPort ix = driver_data[fd].port_num;
int pb = driver_data[fd].packet_bytes;
int ofd = driver_data[fd].ofd;
ssize_t n;
@@ -2100,7 +2102,7 @@ static void output(ErlDrvData e, char* buf, ErlDrvSizeT len)
return; /* 0; */
}
-static int port_inp_failure(int port_num, int ready_fd, int res)
+static int port_inp_failure(ErlDrvPort port_num, int ready_fd, int res)
/* Result: 0 (eof) or -1 (error) */
{
int err = errno;
@@ -2150,7 +2152,7 @@ static int port_inp_failure(int port_num, int ready_fd, int res)
static void ready_input(ErlDrvData e, ErlDrvEvent ready_fd)
{
int fd = (int)(long)e;
- int port_num;
+ ErlDrvPort port_num;
int packet_bytes;
int res;
Uint h;
@@ -2273,7 +2275,7 @@ static void ready_input(ErlDrvData e, ErlDrvEvent ready_fd)
static void ready_output(ErlDrvData e, ErlDrvEvent ready_fd)
{
int fd = (int)(long)e;
- int ix = driver_data[fd].port_num;
+ ErlDrvPort ix = driver_data[fd].port_num;
int n;
struct iovec* iv;
int vsize;
@@ -2558,19 +2560,20 @@ report_exit_status(ErtsSysReportExit *rep, int status)
Port *pp;
#ifdef ERTS_SMP
CHLD_STAT_UNLOCK;
-#endif
+ pp = erts_thr_id2port_sflgs(rep->port,
+ ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
+ CHLD_STAT_LOCK;
+#else
pp = erts_id2port_sflgs(rep->port,
NULL,
0,
ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
-#ifdef ERTS_SMP
- CHLD_STAT_LOCK;
#endif
if (pp) {
if (rep->ifd >= 0) {
driver_data[rep->ifd].alive = 0;
driver_data[rep->ifd].status = status;
- (void) driver_select((ErlDrvPort) internal_port_index(pp->common.id),
+ (void) driver_select((ErlDrvPort) pp,
rep->ifd,
(ERL_DRV_READ|ERL_DRV_USE),
1);
@@ -2578,12 +2581,16 @@ report_exit_status(ErtsSysReportExit *rep, int status)
if (rep->ofd >= 0) {
driver_data[rep->ofd].alive = 0;
driver_data[rep->ofd].status = status;
- (void) driver_select((ErlDrvPort) internal_port_index(pp->common.id),
+ (void) driver_select((ErlDrvPort) pp,
rep->ofd,
(ERL_DRV_WRITE|ERL_DRV_USE),
1);
}
+#ifdef ERTS_SMP
+ erts_thr_port_release(pp);
+#else
erts_port_release(pp);
+#endif
}
erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep);
}
diff --git a/erts/emulator/sys/vxworks/sys.c b/erts/emulator/sys/vxworks/sys.c
index 739b026fb1..205c123ffc 100644
--- a/erts/emulator/sys/vxworks/sys.c
+++ b/erts/emulator/sys/vxworks/sys.c
@@ -522,7 +522,8 @@ static Uint tmp_buf_size;
/* This data is shared by these drivers - initialized by spawn_init() */
static struct driver_data {
- int port_num, ofd, packet_bytes, report_exit;
+ ErlDrvPort port_num;
+ int ofd, packet_bytes, report_exit;
int exitcode, exit_reported; /* For returning of exit codes. */
} *driver_data; /* indexed by fd */
@@ -678,7 +679,7 @@ static int pre_set_driver_data(int ifd, int ofd,
** Set up the driver_data structure, it may have been initiated
** partly by the function above, but we dont care.
*/
-static int set_driver_data(int port_num, int ifd, int ofd,
+static int set_driver_data(ErlDrvPort port_num, int ifd, int ofd,
int packet_bytes, int read_write,
int report_exit)
{
@@ -908,7 +909,7 @@ static void close_pipes(int ifd[2], int ofd[2], int read_write)
}
}
-static void init_fd_data(int fd, int port_unused_argument)
+static void init_fd_data(int fd, ErlDrvPort port_unused_argument)
{
SET_NONBLOCKING(fd);
fd_data[fd].pending = NULL;
@@ -1062,7 +1063,7 @@ static void clear_fd_data(int fd)
fd_data[fd].cpos = NULL;
}
-static void nbio_stop_fd(int port_num, int fd)
+static void nbio_stop_fd(ErlDrvPort port_num, int fd)
{
Pend *p, *p1;
@@ -1132,7 +1133,8 @@ vanilla_start(ErlDrvPort port_num, char *name, SysDriverOpts* opts)
static void stop(ErlDrvData drv_data)
{
- int port_num, ofd;
+ ErlDrvPort port_num;
+ int ofd;
int fd = (int) drv_data;
port_num = driver_data[fd].port_num;
@@ -1146,7 +1148,7 @@ static void stop(ErlDrvData drv_data)
}
}
-static int sched_write(int port_num,int fd, char *buf, int len, int pb)
+static int sched_write(ErlDrvPort port_num,int fd, char *buf, int len, int pb)
{
Pend *p, *p2, *p3;
int p_bytes = len;
@@ -1189,7 +1191,8 @@ static int sched_write(int port_num,int fd, char *buf, int len, int pb)
/* Fd is the value returned as drv_data by the start func */
static void output(ErlDrvData drv_data, char *buf, ErlDrvSizeT len)
{
- int buf_done, port_num, wval, pb, ofd;
+ ErlDrvPort port_num;
+ int buf_done, wval, pb, ofd;
byte lb[4];
struct iovec iv[2];
int fd = (int) drv_data;
@@ -1271,7 +1274,7 @@ static int ensure_header(int fd,char *buf,int packet_size, int sofar)
return(res);
}
-static int port_inp_failure(int port_num, int ready_fd, int res)
+static int port_inp_failure(ErlDrvPort port_num, int ready_fd, int res)
{
(void) driver_select(port_num, ready_fd, ERL_DRV_READ|ERL_DRV_WRITE, 0);
clear_fd_data(ready_fd);
@@ -1302,7 +1305,8 @@ static int port_inp_failure(int port_num, int ready_fd, int res)
static void ready_input(ErlDrvData drv_data, ErlDrvEvent drv_event)
{
- int port_num, packet_bytes, res;
+ ErlDrvPort port_num;
+ int packet_bytes, res;
Uint h = 0;
char *buf;
int fd = (int) drv_data;
diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c
index b106f0932d..c4861b92a2 100755
--- a/erts/emulator/sys/win32/sys.c
+++ b/erts/emulator/sys/win32/sys.c
@@ -87,9 +87,6 @@ static erts_smp_tsd_key_t win32_errstr_key;
static erts_smp_atomic_t pipe_creation_counter;
-static erts_smp_mtx_t sys_driver_data_lock;
-
-
/* Results from application_type(_w) is one of */
#define APPL_NONE 0
#define APPL_DOS 1
@@ -97,7 +94,6 @@ static erts_smp_mtx_t sys_driver_data_lock;
#define APPL_WIN32 3
static int driver_write(long, HANDLE, byte*, int);
-static void common_stop(int);
static int create_file_thread(struct async_io* aio, int mode);
#ifdef ERTS_SMP
static void close_active_handle(ErlDrvPort, HANDLE handle);
@@ -115,9 +111,6 @@ BOOL WINAPI ctrl_handler(DWORD dwCtrlType);
#define PORT_BUFSIZ 4096
-#define PORT_FREE (-1)
-#define PORT_EXITING (-2)
-
#define DRV_BUF_ALLOC(SZ) \
erts_alloc_fnf(ERTS_ALC_T_DRV_DATA_BUF, (SZ))
#define DRV_BUF_REALLOC(P, SZ) \
@@ -454,7 +447,7 @@ typedef struct driver_data {
byte *inbuf; /* Buffer to use for overlapped read. */
int outBufSize; /* Size of output buffer. */
byte *outbuf; /* Buffer to use for overlapped write. */
- ErlDrvPort port_num; /* The port number. */
+ ErlDrvPort port_num; /* The port handle. */
int packet_bytes; /* 0: continous stream, 1, 2, or 4: the number
* of bytes in the packet header.
*/
@@ -464,8 +457,6 @@ typedef struct driver_data {
int report_exit; /* Do report exit status for the port */
} DriverData;
-static DriverData* driver_data; /* Pointer to array of driver data. */
-
/* Driver interfaces */
static ErlDrvData spawn_start(ErlDrvPort, char*, SysDriverOpts*);
static ErlDrvData fd_start(ErlDrvPort, char*, SysDriverOpts*);
@@ -577,67 +568,53 @@ struct erl_drv_entry vanilla_driver_entry = {
*/
static DriverData*
-new_driver_data(int port_num, int packet_bytes, int wait_objs_required, int use_threads)
+new_driver_data(ErlDrvPort port_num, int packet_bytes, int wait_objs_required, int use_threads)
{
DriverData* dp;
-
- erts_smp_mtx_lock(&sys_driver_data_lock);
- DEBUGF(("new_driver_data(port_num %d, pb %d)\n",
- port_num, packet_bytes));
+ DEBUGF(("new_driver_data(%p, pb %d)\n", port_num, packet_bytes));
+ dp = driver_alloc(sizeof(DriverData));
+ if (!dp)
+ return NULL;
/*
* We used to test first at all that there is enough room in the
* array used by WaitForMultipleObjects(), but that is not necessary
* any more, since driver_select() can't fail.
*/
- /*
- * Search for a free slot.
- */
+ dp->bytesInBuffer = 0;
+ dp->totalNeeded = packet_bytes;
+ dp->inBufSize = PORT_BUFSIZ;
+ dp->inbuf = DRV_BUF_ALLOC(dp->inBufSize);
+ if (dp->inbuf == NULL)
+ goto buf_alloc_error;
+ erts_smp_atomic_add_nob(&sys_misc_mem_sz, dp->inBufSize);
+ dp->outBufSize = 0;
+ dp->outbuf = NULL;
+ dp->port_num = port_num;
+ dp->packet_bytes = packet_bytes;
+ dp->port_pid = INVALID_HANDLE_VALUE;
+ if (init_async_io(&dp->in, use_threads) == -1)
+ goto async_io_error1;
+ if (init_async_io(&dp->out, use_threads) == -1)
+ goto async_io_error2;
- for (dp = driver_data; dp < driver_data+max_files; dp++) {
- if (dp->port_num == PORT_FREE) {
- dp->bytesInBuffer = 0;
- dp->totalNeeded = packet_bytes;
- dp->inBufSize = PORT_BUFSIZ;
- dp->inbuf = DRV_BUF_ALLOC(dp->inBufSize);
- if (dp->inbuf == NULL) {
- erts_smp_mtx_unlock(&sys_driver_data_lock);
- return NULL;
- }
- erts_smp_atomic_add_nob(&sys_misc_mem_sz, dp->inBufSize);
- dp->outBufSize = 0;
- dp->outbuf = NULL;
- dp->port_num = port_num;
- dp->packet_bytes = packet_bytes;
- dp->port_pid = INVALID_HANDLE_VALUE;
- if (init_async_io(&dp->in, use_threads) == -1)
- break;
- if (init_async_io(&dp->out, use_threads) == -1)
- break;
- erts_smp_mtx_unlock(&sys_driver_data_lock);
- return dp;
- }
- }
+ return dp;
- /*
- * Error or no free driver data.
- */
+async_io_error2:
+ release_async_io(&dp->in, dp->port_num);
+async_io_error1:
+ release_async_io(&dp->out, dp->port_num);
- if (dp < driver_data+max_files) {
- release_async_io(&dp->in, dp->port_num);
- release_async_io(&dp->out, dp->port_num);
- }
- erts_smp_mtx_unlock(&sys_driver_data_lock);
+buf_alloc_error:
+ driver_free(dp);
return NULL;
}
static void
release_driver_data(DriverData* dp)
{
- erts_smp_mtx_lock(&sys_driver_data_lock);
-
#ifdef ERTS_SMP
#ifdef USE_CANCELIOEX
if (fpCancelIoEx != NULL) {
@@ -721,8 +698,7 @@ release_driver_data(DriverData* dp)
* the exit thread.
*/
- dp->port_num = PORT_FREE;
- erts_smp_mtx_unlock(&sys_driver_data_lock);
+ driver_free(dp);
}
#ifdef ERTS_SMP
@@ -817,7 +793,6 @@ threaded_handle_closer(LPVOID param)
static ErlDrvData
set_driver_data(DriverData* dp, HANDLE ifd, HANDLE ofd, int read_write, int report_exit)
{
- int index = dp - driver_data;
int result;
dp->in.fd = ifd;
@@ -836,13 +811,12 @@ set_driver_data(DriverData* dp, HANDLE ifd, HANDLE ofd, int read_write, int repo
ERL_DRV_WRITE|ERL_DRV_USE, 1);
ASSERT(result != -1);
}
- return (ErlDrvData)index;
+ return (ErlDrvData) dp;
}
static ErlDrvData
reuse_driver_data(DriverData *dp, HANDLE ifd, HANDLE ofd, int read_write, ErlDrvPort port_num)
{
- int index = dp - driver_data;
int result;
dp->port_num = port_num;
@@ -861,7 +835,7 @@ reuse_driver_data(DriverData *dp, HANDLE ifd, HANDLE ofd, int read_write, ErlDrv
ERL_DRV_WRITE|ERL_DRV_USE, 1);
ASSERT(result != -1);
}
- return (ErlDrvData)index;
+ return (ErlDrvData) dp;
}
/*
@@ -1118,12 +1092,6 @@ spawn_init(void)
((module != NULL) ? GetProcAddress(module,"CancelIoEx") : NULL);
DEBUGF(("fpCancelIoEx = %p\r\n", fpCancelIoEx));
#endif
- driver_data = (struct driver_data *)
- erts_alloc(ERTS_ALC_T_DRV_TAB, max_files * sizeof(struct driver_data));
- erts_smp_atomic_add_nob(&sys_misc_mem_sz,
- max_files*sizeof(struct driver_data));
- for (i = 0; i < max_files; i++)
- driver_data[i].port_num = PORT_FREE;
return 0;
}
@@ -2187,12 +2155,10 @@ fd_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
return ERL_DRV_ERROR_GENERAL;
if (!create_file_thread(&dp->in, DO_READ)) {
- dp->port_num = PORT_FREE;
return ERL_DRV_ERROR_GENERAL;
}
if (!create_file_thread(&dp->out, DO_WRITE)) {
- dp->port_num = PORT_FREE;
return ERL_DRV_ERROR_GENERAL;
}
@@ -2212,10 +2178,9 @@ fd_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
}
}
-static void fd_stop(ErlDrvData d)
+static void fd_stop(ErlDrvData data)
{
- int fd = (int)d;
- DriverData* dp = driver_data+fd;
+ DriverData * dp = (DriverData *) data;
/*
* There's no way we can terminate an fd port in a consistent way.
* Instead we let it live until it's opened again (which it is,
@@ -2276,16 +2241,10 @@ vanilla_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
}
static void
-stop(ErlDrvData index)
-{
- common_stop((int)index);
-}
-
-static void common_stop(int index)
+stop(ErlDrvData data)
{
- DriverData* dp = driver_data+index;
-
- DEBUGF(("common_stop(%d)\n", index));
+ DriverData *dp = (DriverData *) data;
+ DEBUGF(("stop(%p)\n", dp));
if (dp->in.ov.hEvent != NULL) {
(void) driver_select(dp->port_num,
@@ -2307,7 +2266,6 @@ static void common_stop(int index)
*/
HANDLE thread;
DWORD tid;
- dp->port_num = PORT_EXITING;
thread = (HANDLE *) _beginthreadex(NULL, 0, threaded_exiter, dp, 0, &tid);
CloseHandle(thread);
}
@@ -2432,22 +2390,17 @@ threaded_exiter(LPVOID param)
static void
output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
-/* long drv_data; /* The slot to use in the driver data table.
+/* ErlDrvData drv_data; /* The slot to use in the driver data table.
* For Windows NT, this is *NOT* a file handle.
* The handle is found in the driver data.
*/
/* char *buf; /* Pointer to data to write to the port program. */
/* ErlDrvSizeT len; /* Number of bytes to write. */
{
- DriverData* dp;
+ DriverData* dp = (DriverData *) drv_data;
int pb; /* The header size for this port. */
- int port_num; /* The actual port number (for diagnostics). */
char* current;
- dp = driver_data + (int)drv_data;
- if ((port_num = dp->port_num) == -1)
- return ; /*-1;*/
-
pb = dp->packet_bytes;
if ((pb+len) == 0)
@@ -2458,7 +2411,7 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
*/
if ((pb == 2 && len > 65535) || (pb == 1 && len > 255)) {
- driver_failure_posix(port_num, EINVAL);
+ driver_failure_posix(dp->port_num, EINVAL);
return ; /* -1; */
}
@@ -2472,7 +2425,7 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
ASSERT(!dp->outbuf);
dp->outbuf = DRV_BUF_ALLOC(pb+len);
if (!dp->outbuf) {
- driver_failure_posix(port_num, ENOMEM);
+ driver_failure_posix(dp->port_num, ENOMEM);
return ; /* -1; */
}
@@ -2502,7 +2455,7 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
memcpy(current, buf, len);
if (!async_write_file(&dp->out, dp->outbuf, pb+len)) {
- set_busy_port(port_num, 1);
+ set_busy_port(dp->port_num, 1);
} else {
dp->out.ov.Offset += pb+len; /* For vanilla driver. */
/* XXX OffsetHigh should be changed too. */
@@ -2537,10 +2490,9 @@ ready_input(ErlDrvData drv_data, ErlDrvEvent ready_event)
{
int error = 0; /* The error code (assume initially no errors). */
DWORD bytesRead; /* Number of bytes read. */
- DriverData* dp;
+ DriverData* dp = (DriverData *) drv_data;
int pb;
- dp = driver_data+(int)drv_data;
pb = dp->packet_bytes;
#ifdef ERTS_SMP
if(dp->in.thread == (HANDLE) -1) {
@@ -2708,7 +2660,7 @@ static void
ready_output(ErlDrvData drv_data, ErlDrvEvent ready_event)
{
DWORD bytesWritten;
- DriverData* dp = driver_data + (int)drv_data;
+ DriverData *dp = (DriverData *) drv_data;
int error;
#ifdef ERTS_SMP
@@ -2716,7 +2668,7 @@ ready_output(ErlDrvData drv_data, ErlDrvEvent ready_event)
dp->out.async_io_active = 0;
}
#endif
- DEBUGF(("ready_output(%d, 0x%x)\n", drv_data, ready_event));
+ DEBUGF(("ready_output(%p, 0x%x)\n", drv_data, ready_event));
set_busy_port(dp->port_num, 0);
if (!(dp->outbuf)) {
/* Happens because event sometimes get signalled during a successful
@@ -2771,7 +2723,7 @@ sys_init_io(void)
can change our view of the number of open files possible.
We estimate the number to twice the amount of ports.
We really dont know on windows, do we? */
- max_files = 2*erts_max_ports;
+ max_files = 2*erts_ptab_max(&erts_port);
}
#ifdef ERTS_SMP
@@ -3225,9 +3177,6 @@ void erl_sys_init(void)
noinherit_std_handle(STD_INPUT_HANDLE);
noinherit_std_handle(STD_ERROR_HANDLE);
-
- erts_smp_mtx_init(&sys_driver_data_lock, "sys_driver_data_lock");
-
#ifdef ERTS_SMP
erts_smp_tsd_key_create(&win32_errstr_key);
InitializeCriticalSection(&htbc_lock);