aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port.h
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_port.h')
-rw-r--r--erts/emulator/beam/erl_port.h421
1 files changed, 296 insertions, 125 deletions
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 1a5bc636e2..fb34a6da2d 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -17,15 +17,21 @@
* %CopyrightEnd%
*/
-#ifndef ERL_PORT_H__
+#ifndef ERL_PORT_TYPE__
+#define ERL_PORT_TYPE__
+typedef struct _erl_drv_port Port;
+#endif
+
+#if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__)
#define ERL_PORT_H__
-typedef struct port Port;
#include "erl_port_task.h"
#include "erl_ptab.h"
+#include "erl_thr_progress.h"
typedef struct erts_driver_t_ erts_driver_t;
+#define ERTS_INVALID_ERL_DRV_PORT ((ErlDrvPort) (SWord) -1)
#define SMALL_IO_QUEUE 5 /* Number of fixed elements */
typedef struct {
@@ -90,16 +96,18 @@ typedef struct ErtsXPortsList_ ErtsXPortsList;
*
*/
-struct port {
+struct _erl_drv_port {
ErtsPTabElementCommon common; /* *Need* to be first in struct */
ErtsPortTaskSched sched;
ErtsPortTaskHandle timeout_task;
#ifdef ERTS_SMP
- erts_smp_mtx_t *lock;
+ erts_mtx_t *lock;
ErtsXPortsList *xports;
erts_smp_atomic_t run_queue;
- erts_smp_spinlock_t state_lck; /* protects: id, status, snapshot */
+#else
+ erts_atomic32_t refc;
+ int cleanup;
#endif
Eterm connected; /* A connected process */
Eterm caller; /* Current caller. */
@@ -116,15 +124,20 @@ struct port {
ErtsProcList *suspended; /* List of suspended processes. */
LineBuf *linebuf; /* Buffer to hold data not ready for
process to get (line oriented I/O)*/
- erts_smp_atomic32_t state; /* Status and type flags */
+ erts_atomic32_t state; /* Status and type flags */
int control_flags; /* Flags for port_control() */
- erts_aint32_t snapshot; /* Next snapshot that port should be part of */
ErlDrvPDL port_data_lock;
ErtsPrtSD *psd; /* Port specific data */
};
+struct erl_drv_port_data_lock {
+ erts_mtx_t mtx;
+ erts_atomic_t refc;
+ Port *prt;
+};
+
ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
@@ -183,35 +196,6 @@ erts_prtsd_set(Port *prt, int ix, void *data)
#endif
-/* arrays that get malloced at startup */
-extern Port* erts_port;
-
-extern Uint erts_max_ports;
-extern Uint erts_port_tab_index_mask;
-extern erts_smp_atomic32_t erts_ports_snapshot;
-extern erts_smp_atomic_t erts_dead_ports_ptr;
-
-ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt);
-
-#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-
-ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt)
-{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck));
- if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) {
- /* Dead ports are added from the end of the snapshot buffer */
- Eterm* tombstone;
- tombstone = (Eterm*) erts_smp_atomic_add_read_nob(&erts_dead_ports_ptr,
- -(erts_aint_t)sizeof(Eterm));
- ASSERT(tombstone+1 != NULL);
- ASSERT(prt->snapshot == erts_smp_atomic32_read_nob(&erts_ports_snapshot) - 1);
- *tombstone = prt->common.id;
- }
- /*else no ongoing snapshot or port was already included or created after snapshot */
-}
-
-#endif
-
extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */
extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
@@ -263,8 +247,11 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
| ERTS_PORT_SFLG_PORT_BUSY \
| ERTS_PORT_SFLG_DISTRIBUTION)
-
+void print_port_info(Port *, int, void *);
+void erts_port_free(Port *);
+#ifndef ERTS_SMP
void erts_port_cleanup(Port *);
+#endif
void erts_fire_port_monitor(Port *prt, Eterm ref);
#ifdef ERTS_SMP
void erts_smp_xports_unlock(Port *);
@@ -274,8 +261,9 @@ void erts_smp_xports_unlock(Port *);
int erts_lc_is_port_locked(Port *);
#endif
-ERTS_GLB_INLINE void erts_smp_port_minor_lock(Port*);
-ERTS_GLB_INLINE void erts_smp_port_minor_unlock(Port*);
+ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt);
+ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt);
+ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc);
ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt);
ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt);
@@ -283,64 +271,71 @@ ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-ERTS_GLB_INLINE void
-erts_smp_port_minor_lock(Port* prt)
+ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt)
{
#ifdef ERTS_SMP
- erts_smp_spin_lock(&prt->state_lck);
+ erts_ptab_inc_refc(&prt->common);
+#else
+ erts_atomic32_inc_nob(&prt->refc);
#endif
}
-ERTS_GLB_INLINE void
-erts_smp_port_minor_unlock(Port *prt)
+ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt)
{
#ifdef ERTS_SMP
- erts_smp_spin_unlock(&prt->state_lck);
+ int referred = erts_ptab_dec_test_refc(&prt->common);
+ if (!referred)
+ erts_port_free(prt);
+#else
+ int refc = erts_atomic32_dec_read_nob(&prt->refc);
+ if (refc == 0)
+ erts_port_free(prt);
#endif
}
+ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc)
+{
+#ifdef ERTS_SMP
+ int referred = erts_ptab_add_test_refc(&prt->common, add_refc);
+ if (!referred)
+ erts_port_free(prt);
+#else
+ int refc = erts_atomic32_add_read_nob(&prt->refc, add_refc);
+ if (refc == 0)
+ erts_port_free(prt);
+#endif
+}
ERTS_GLB_INLINE int
erts_smp_port_trylock(Port *prt)
{
- int res;
-
- ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0);
- erts_smp_atomic32_inc_nob(&prt->common.refc);
-
#ifdef ERTS_SMP
- res = erts_smp_mtx_trylock(prt->lock);
- if (res == EBUSY) {
- erts_smp_atomic32_dec_nob(&prt->common.refc);
- }
+ /* *Need* to be a managed thread */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ return erts_mtx_trylock(prt->lock);
#else
- res = 0;
+ return 0;
#endif
-
- return res;
}
ERTS_GLB_INLINE void
erts_smp_port_lock(Port *prt)
{
- ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0);
- erts_smp_atomic32_inc_nob(&prt->common.refc);
#ifdef ERTS_SMP
- erts_smp_mtx_lock(prt->lock);
+ /* *Need* to be a managed thread */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ erts_mtx_lock(prt->lock);
#endif
}
ERTS_GLB_INLINE void
erts_smp_port_unlock(Port *prt)
{
- erts_aint32_t refc;
#ifdef ERTS_SMP
- erts_smp_mtx_unlock(prt->lock);
+ /* *Need* to be a managed thread */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ erts_mtx_unlock(prt->lock);
#endif
- refc = erts_smp_atomic32_dec_read_nob(&prt->common.refc);
- ASSERT(refc >= 0);
- if (refc == 0)
- erts_port_cleanup(prt);
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
@@ -348,7 +343,7 @@ erts_smp_port_unlock(Port *prt)
#define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \
(!(PP) \
- || (erts_smp_atomic32_read_nob(&(PP)->state) & (FLGS)) \
+ || (erts_atomic32_read_nob(&(PP)->state) & (FLGS)) \
|| (PP)->common.id != (ID))
/* port lookup */
@@ -365,125 +360,301 @@ erts_smp_port_unlock(Port *prt)
#define ERTS_PORT_SCHED_ID(P, ID) \
((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID)))
+extern const Port erts_invalid_port;
+#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port)
+
#ifdef ERTS_SMP
Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks);
#endif
-#define erts_id2port(ID, P, PL) \
- erts_id2port_sflgs((ID), (P), (PL), ERTS_PORT_SFLGS_INVALID_LOOKUP)
-
-ERTS_GLB_INLINE Port*erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
+ERTS_GLB_INLINE Port *erts_pix2port(int);
+ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm);
+ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32);
+ERTS_GLB_INLINE Port*erts_id2port(Eterm id);
+ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
ERTS_GLB_INLINE void erts_port_release(Port *);
-ERTS_GLB_INLINE Port*erts_drvport2port(ErlDrvPort, erts_aint32_t *);
-ERTS_GLB_INLINE Port*erts_drvportid2port(Eterm);
-ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id);
-ERTS_GLB_INLINE int erts_is_port_alive(Eterm id);
-ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm id);
+#ifdef ERTS_SMP
+ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs);
+ERTS_GLB_INLINE void erts_thr_port_release(Port *prt);
+#endif
+ERTS_GLB_INLINE Port *erts_thr_drvport2port_raw(ErlDrvPort);
+ERTS_GLB_INLINE Port *erts_drvport2port_raw(ErlDrvPort drvport);
+ERTS_GLB_INLINE Port *erts_drvport2port(ErlDrvPort, erts_aint32_t *);
+ERTS_GLB_INLINE Port *erts_drvportid2port(Eterm);
+ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort);
+ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm);
+ERTS_GLB_INLINE int erts_is_port_alive(Eterm);
+ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+ERTS_GLB_INLINE Port *erts_pix2port(int ix)
+{
+ Port *prt;
+ ASSERT(0 <= ix && ix < erts_ptab_max(&erts_port));
+ prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, ix);
+ return prt == ERTS_PORT_LOCK_BUSY ? NULL : prt;
+}
+
+ERTS_GLB_INLINE Port *
+erts_port_lookup_raw(Eterm id)
+{
+ Port *prt;
+
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying());
+
+ if (is_not_internal_port(id))
+ return NULL;
+
+ prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
+ internal_port_index(id));
+ return prt && prt->common.id == id ? prt : NULL;
+}
+
+ERTS_GLB_INLINE Port *
+erts_port_lookup(Eterm id, Uint32 invalid_sflgs)
+{
+ Port *prt = erts_port_lookup_raw(id);
+ erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
+ return (state & invalid_sflgs) ? NULL : prt;
+}
+
+
ERTS_GLB_INLINE Port*
-erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs)
+erts_id2port(Eterm id)
+{
+ erts_aint32_t state;
+ Port *prt;
+
+ /* Only allowed to be called from managed threads */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+
+ if (is_not_internal_port(id))
+ return NULL;
+
+ prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
+ internal_port_index(id));
+
+ if (!prt || prt->common.id != id)
+ return NULL;
+
+ erts_smp_port_lock(prt);
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
+ erts_smp_port_unlock(prt);
+ return NULL;
+ }
+
+ return prt;
+}
+
+
+ERTS_GLB_INLINE Port*
+erts_id2port_sflgs(Eterm id,
+ Process *c_p, ErtsProcLocks c_p_locks,
+ Uint32 invalid_sflgs)
{
#ifdef ERTS_SMP
int no_proc_locks = !c_p || !c_p_locks;
#endif
+ erts_aint32_t state;
Port *prt;
+ /* Only allowed to be called from managed threads */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+
if (is_not_internal_port(id))
return NULL;
- prt = &erts_port[internal_port_index(id)];
+ prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
+ internal_port_index(id));
- erts_smp_port_minor_lock(prt);
- if (ERTS_INVALID_PORT_OPT(prt, id, sflgs)) {
- erts_smp_port_minor_unlock(prt);
- prt = NULL;
+ if (!prt || prt->common.id != id)
+ return NULL;
+
+#ifdef ERTS_SMP
+ if (no_proc_locks)
+ erts_smp_port_lock(prt);
+ else if (erts_smp_port_trylock(prt) == EBUSY) {
+ /* Unlock process locks, and acquire locks in lock order... */
+ erts_smp_proc_unlock(c_p, c_p_locks);
+ erts_smp_port_lock(prt);
+ erts_smp_proc_lock(c_p, c_p_locks);
+ }
+#endif
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & invalid_sflgs) {
+#ifdef ERTS_SMP
+ erts_smp_port_unlock(prt);
+#endif
+ return NULL;
}
- else {
- erts_smp_atomic32_inc_nob(&prt->common.refc);
- erts_smp_port_minor_unlock(prt);
+ return prt;
+}
+
+ERTS_GLB_INLINE void
+erts_port_release(Port *prt)
+{
+ /* Only allowed to be called from managed threads */
+ ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
#ifdef ERTS_SMP
- if (no_proc_locks)
- erts_smp_mtx_lock(prt->lock);
- else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) {
- /* Unlock process locks, and acquire locks in lock order... */
- erts_smp_proc_unlock(c_p, c_p_locks);
- erts_smp_mtx_lock(prt->lock);
- erts_smp_proc_lock(c_p, c_p_locks);
+ erts_smp_port_unlock(prt);
+#else
+ if (prt->cleanup) {
+ prt->cleanup = 0;
+ erts_port_cleanup(prt);
+ }
+#endif
+}
+
+#ifdef ERTS_SMP
+
+/*
+ * erts_thr_id2port_sflgs() and erts_thr_port_release() can
+ * be used by unmanaged threads in the SMP case.
+ */
+ERTS_GLB_INLINE Port *
+erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs)
+{
+ Port *prt;
+ ErtsThrPrgrDelayHandle dhndl;
+
+ if (is_not_internal_port(id))
+ return NULL;
+
+ dhndl = erts_thr_progress_unmanaged_delay();
+
+ prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
+ internal_port_index(id));
+
+ if (!prt || prt->common.id != id) {
+ erts_thr_progress_unmanaged_continue(dhndl);
+ prt = NULL;
+ }
+ else {
+ erts_aint32_t state;
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
+ erts_port_inc_refc(prt);
+ erts_thr_progress_unmanaged_continue(dhndl);
}
- /* The id may not have changed... */
- ERTS_SMP_LC_ASSERT(prt->common.id == id);
- /* ... but state may have... */
- if (erts_smp_atomic32_read_nob(&prt->state) & sflgs) {
- erts_smp_port_unlock(prt); /* Also decrements common.refc... */
+ erts_mtx_lock(prt->lock);
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & invalid_sflgs) {
+ erts_mtx_unlock(prt->lock);
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(prt);
prt = NULL;
}
-#endif
-
}
return prt;
}
ERTS_GLB_INLINE void
-erts_port_release(Port *prt)
+erts_thr_port_release(Port *prt)
{
- erts_smp_port_unlock(prt);
+ erts_mtx_unlock(prt->lock);
+#ifdef ERTS_SMP
+ if (!erts_thr_progress_is_managed_thread())
+ erts_port_dec_refc(prt);
+#endif
+}
+
+#endif
+
+ERTS_GLB_INLINE Port*
+erts_thr_drvport2port_raw(ErlDrvPort drvport)
+{
+#if ERTS_ENABLE_LOCK_CHECK
+ int emu_thread = erts_lc_is_emu_thr();
+#endif
+ if (drvport == ERTS_INVALID_ERL_DRV_PORT)
+ return NULL;
+ else {
+ Port *prt = (Port *) drvport;
+#if ERTS_ENABLE_LOCK_CHECK
+ if (emu_thread) {
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(!prt->port_data_lock
+ || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
+ }
+ else {
+ ERTS_LC_ASSERT(prt->port_data_lock);
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
+ }
+#endif
+ return prt;
+ }
+}
+
+ERTS_GLB_INLINE Port*
+erts_drvport2port_raw(ErlDrvPort drvport)
+{
+ ERTS_LC_ASSERT(erts_lc_is_emu_thr());
+ if (drvport == ERTS_INVALID_ERL_DRV_PORT)
+ return NULL;
+ else {
+ Port *prt = (Port *) drvport;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ return prt;
+ }
}
ERTS_GLB_INLINE Port*
erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep)
{
- int ix = (int) drvport;
+ Port *prt = erts_drvport2port_raw(drvport);
erts_aint32_t state;
- if (ix < 0 || erts_max_ports <= ix)
+ if (!prt)
return NULL;
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return NULL;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
if (statep)
*statep = state;
- return &erts_port[ix];
+ return prt;
}
ERTS_GLB_INLINE Port*
erts_drvportid2port(Eterm id)
{
- int ix;
+ Port *prt;
erts_aint32_t state;
if (is_not_internal_port(id))
return NULL;
- ix = (int) internal_port_index(id);
- if (erts_max_ports <= ix)
+ prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port,
+ internal_port_index(id));
+ if (!prt)
return NULL;
- state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
- if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ if (prt->common.id != id)
return NULL;
- if (erts_port[ix].common.id != id)
+ state = erts_atomic32_read_nob(&prt->state);
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return NULL;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
- return &erts_port[ix];
+ return prt;
+}
+
+ERTS_GLB_INLINE Eterm
+erts_drvport2id(ErlDrvPort drvport)
+{
+ Port *prt = erts_drvport2port_raw(drvport);
+ if (!prt)
+ return am_undefined;
+ else
+ return prt->common.id;
}
ERTS_GLB_INLINE Uint32
erts_portid2status(Eterm id)
{
- if (is_not_internal_port(id))
+ Port *prt = erts_port_lookup_raw(id);
+ if (prt)
+ return (Uint32) erts_atomic32_read_acqb(&prt->state);
+ else
return ERTS_PORT_SFLG_INVALID;
- else {
- erts_aint32_t state;
- int ix = internal_port_index(id);
- if (erts_max_ports <= ix)
- return ERTS_PORT_SFLG_INVALID;
- state = erts_smp_atomic32_read_ddrb(&erts_port[ix].state);
- if (erts_port[ix].common.id != id)
- return ERTS_PORT_SFLG_INVALID;
- return state;
- }
}
ERTS_GLB_INLINE int