diff options
Diffstat (limited to 'erts/emulator/beam/erl_port.h')
-rw-r--r-- | erts/emulator/beam/erl_port.h | 421 |
1 files changed, 296 insertions, 125 deletions
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 1a5bc636e2..fb34a6da2d 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -17,15 +17,21 @@ * %CopyrightEnd% */ -#ifndef ERL_PORT_H__ +#ifndef ERL_PORT_TYPE__ +#define ERL_PORT_TYPE__ +typedef struct _erl_drv_port Port; +#endif + +#if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__) #define ERL_PORT_H__ -typedef struct port Port; #include "erl_port_task.h" #include "erl_ptab.h" +#include "erl_thr_progress.h" typedef struct erts_driver_t_ erts_driver_t; +#define ERTS_INVALID_ERL_DRV_PORT ((ErlDrvPort) (SWord) -1) #define SMALL_IO_QUEUE 5 /* Number of fixed elements */ typedef struct { @@ -90,16 +96,18 @@ typedef struct ErtsXPortsList_ ErtsXPortsList; * */ -struct port { +struct _erl_drv_port { ErtsPTabElementCommon common; /* *Need* to be first in struct */ ErtsPortTaskSched sched; ErtsPortTaskHandle timeout_task; #ifdef ERTS_SMP - erts_smp_mtx_t *lock; + erts_mtx_t *lock; ErtsXPortsList *xports; erts_smp_atomic_t run_queue; - erts_smp_spinlock_t state_lck; /* protects: id, status, snapshot */ +#else + erts_atomic32_t refc; + int cleanup; #endif Eterm connected; /* A connected process */ Eterm caller; /* Current caller. */ @@ -116,15 +124,20 @@ struct port { ErtsProcList *suspended; /* List of suspended processes. */ LineBuf *linebuf; /* Buffer to hold data not ready for process to get (line oriented I/O)*/ - erts_smp_atomic32_t state; /* Status and type flags */ + erts_atomic32_t state; /* Status and type flags */ int control_flags; /* Flags for port_control() */ - erts_aint32_t snapshot; /* Next snapshot that port should be part of */ ErlDrvPDL port_data_lock; ErtsPrtSD *psd; /* Port specific data */ }; +struct erl_drv_port_data_lock { + erts_mtx_t mtx; + erts_atomic_t refc; + Port *prt; +}; + ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF @@ -183,35 +196,6 @@ erts_prtsd_set(Port *prt, int ix, void *data) #endif -/* arrays that get malloced at startup */ -extern Port* erts_port; - -extern Uint erts_max_ports; -extern Uint erts_port_tab_index_mask; -extern erts_smp_atomic32_t erts_ports_snapshot; -extern erts_smp_atomic_t erts_dead_ports_ptr; - -ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt); - -#if ERTS_GLB_INLINE_INCL_FUNC_DEF - -ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt) -{ - ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck)); - if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) { - /* Dead ports are added from the end of the snapshot buffer */ - Eterm* tombstone; - tombstone = (Eterm*) erts_smp_atomic_add_read_nob(&erts_dead_ports_ptr, - -(erts_aint_t)sizeof(Eterm)); - ASSERT(tombstone+1 != NULL); - ASSERT(prt->snapshot == erts_smp_atomic32_read_nob(&erts_ports_snapshot) - 1); - *tombstone = prt->common.id; - } - /*else no ongoing snapshot or port was already included or created after snapshot */ -} - -#endif - extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ @@ -263,8 +247,11 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ | ERTS_PORT_SFLG_PORT_BUSY \ | ERTS_PORT_SFLG_DISTRIBUTION) - +void print_port_info(Port *, int, void *); +void erts_port_free(Port *); +#ifndef ERTS_SMP void erts_port_cleanup(Port *); +#endif void erts_fire_port_monitor(Port *prt, Eterm ref); #ifdef ERTS_SMP void erts_smp_xports_unlock(Port *); @@ -274,8 +261,9 @@ void erts_smp_xports_unlock(Port *); int erts_lc_is_port_locked(Port *); #endif -ERTS_GLB_INLINE void erts_smp_port_minor_lock(Port*); -ERTS_GLB_INLINE void erts_smp_port_minor_unlock(Port*); +ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt); +ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt); +ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc); ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt); ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt); @@ -283,64 +271,71 @@ ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -ERTS_GLB_INLINE void -erts_smp_port_minor_lock(Port* prt) +ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt) { #ifdef ERTS_SMP - erts_smp_spin_lock(&prt->state_lck); + erts_ptab_inc_refc(&prt->common); +#else + erts_atomic32_inc_nob(&prt->refc); #endif } -ERTS_GLB_INLINE void -erts_smp_port_minor_unlock(Port *prt) +ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt) { #ifdef ERTS_SMP - erts_smp_spin_unlock(&prt->state_lck); + int referred = erts_ptab_dec_test_refc(&prt->common); + if (!referred) + erts_port_free(prt); +#else + int refc = erts_atomic32_dec_read_nob(&prt->refc); + if (refc == 0) + erts_port_free(prt); #endif } +ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc) +{ +#ifdef ERTS_SMP + int referred = erts_ptab_add_test_refc(&prt->common, add_refc); + if (!referred) + erts_port_free(prt); +#else + int refc = erts_atomic32_add_read_nob(&prt->refc, add_refc); + if (refc == 0) + erts_port_free(prt); +#endif +} ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt) { - int res; - - ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0); - erts_smp_atomic32_inc_nob(&prt->common.refc); - #ifdef ERTS_SMP - res = erts_smp_mtx_trylock(prt->lock); - if (res == EBUSY) { - erts_smp_atomic32_dec_nob(&prt->common.refc); - } + /* *Need* to be a managed thread */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + return erts_mtx_trylock(prt->lock); #else - res = 0; + return 0; #endif - - return res; } ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt) { - ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0); - erts_smp_atomic32_inc_nob(&prt->common.refc); #ifdef ERTS_SMP - erts_smp_mtx_lock(prt->lock); + /* *Need* to be a managed thread */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + erts_mtx_lock(prt->lock); #endif } ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt) { - erts_aint32_t refc; #ifdef ERTS_SMP - erts_smp_mtx_unlock(prt->lock); + /* *Need* to be a managed thread */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + erts_mtx_unlock(prt->lock); #endif - refc = erts_smp_atomic32_dec_read_nob(&prt->common.refc); - ASSERT(refc >= 0); - if (refc == 0) - erts_port_cleanup(prt); } #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ @@ -348,7 +343,7 @@ erts_smp_port_unlock(Port *prt) #define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \ (!(PP) \ - || (erts_smp_atomic32_read_nob(&(PP)->state) & (FLGS)) \ + || (erts_atomic32_read_nob(&(PP)->state) & (FLGS)) \ || (PP)->common.id != (ID)) /* port lookup */ @@ -365,125 +360,301 @@ erts_smp_port_unlock(Port *prt) #define ERTS_PORT_SCHED_ID(P, ID) \ ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID))) +extern const Port erts_invalid_port; +#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port) + #ifdef ERTS_SMP Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks); #endif -#define erts_id2port(ID, P, PL) \ - erts_id2port_sflgs((ID), (P), (PL), ERTS_PORT_SFLGS_INVALID_LOOKUP) - -ERTS_GLB_INLINE Port*erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32); +ERTS_GLB_INLINE Port *erts_pix2port(int); +ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm); +ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32); +ERTS_GLB_INLINE Port*erts_id2port(Eterm id); +ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32); ERTS_GLB_INLINE void erts_port_release(Port *); -ERTS_GLB_INLINE Port*erts_drvport2port(ErlDrvPort, erts_aint32_t *); -ERTS_GLB_INLINE Port*erts_drvportid2port(Eterm); -ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id); -ERTS_GLB_INLINE int erts_is_port_alive(Eterm id); -ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm id); +#ifdef ERTS_SMP +ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs); +ERTS_GLB_INLINE void erts_thr_port_release(Port *prt); +#endif +ERTS_GLB_INLINE Port *erts_thr_drvport2port_raw(ErlDrvPort); +ERTS_GLB_INLINE Port *erts_drvport2port_raw(ErlDrvPort drvport); +ERTS_GLB_INLINE Port *erts_drvport2port(ErlDrvPort, erts_aint32_t *); +ERTS_GLB_INLINE Port *erts_drvportid2port(Eterm); +ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort); +ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm); +ERTS_GLB_INLINE int erts_is_port_alive(Eterm); +ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm); #if ERTS_GLB_INLINE_INCL_FUNC_DEF +ERTS_GLB_INLINE Port *erts_pix2port(int ix) +{ + Port *prt; + ASSERT(0 <= ix && ix < erts_ptab_max(&erts_port)); + prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, ix); + return prt == ERTS_PORT_LOCK_BUSY ? NULL : prt; +} + +ERTS_GLB_INLINE Port * +erts_port_lookup_raw(Eterm id) +{ + Port *prt; + + ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying()); + + if (is_not_internal_port(id)) + return NULL; + + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); + return prt && prt->common.id == id ? prt : NULL; +} + +ERTS_GLB_INLINE Port * +erts_port_lookup(Eterm id, Uint32 invalid_sflgs) +{ + Port *prt = erts_port_lookup_raw(id); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); + return (state & invalid_sflgs) ? NULL : prt; +} + + ERTS_GLB_INLINE Port* -erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs) +erts_id2port(Eterm id) +{ + erts_aint32_t state; + Port *prt; + + /* Only allowed to be called from managed threads */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + + if (is_not_internal_port(id)) + return NULL; + + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); + + if (!prt || prt->common.id != id) + return NULL; + + erts_smp_port_lock(prt); + state = erts_atomic32_read_nob(&prt->state); + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + erts_smp_port_unlock(prt); + return NULL; + } + + return prt; +} + + +ERTS_GLB_INLINE Port* +erts_id2port_sflgs(Eterm id, + Process *c_p, ErtsProcLocks c_p_locks, + Uint32 invalid_sflgs) { #ifdef ERTS_SMP int no_proc_locks = !c_p || !c_p_locks; #endif + erts_aint32_t state; Port *prt; + /* Only allowed to be called from managed threads */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + if (is_not_internal_port(id)) return NULL; - prt = &erts_port[internal_port_index(id)]; + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); - erts_smp_port_minor_lock(prt); - if (ERTS_INVALID_PORT_OPT(prt, id, sflgs)) { - erts_smp_port_minor_unlock(prt); - prt = NULL; + if (!prt || prt->common.id != id) + return NULL; + +#ifdef ERTS_SMP + if (no_proc_locks) + erts_smp_port_lock(prt); + else if (erts_smp_port_trylock(prt) == EBUSY) { + /* Unlock process locks, and acquire locks in lock order... */ + erts_smp_proc_unlock(c_p, c_p_locks); + erts_smp_port_lock(prt); + erts_smp_proc_lock(c_p, c_p_locks); + } +#endif + state = erts_atomic32_read_nob(&prt->state); + if (state & invalid_sflgs) { +#ifdef ERTS_SMP + erts_smp_port_unlock(prt); +#endif + return NULL; } - else { - erts_smp_atomic32_inc_nob(&prt->common.refc); - erts_smp_port_minor_unlock(prt); + return prt; +} + +ERTS_GLB_INLINE void +erts_port_release(Port *prt) +{ + /* Only allowed to be called from managed threads */ + ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); #ifdef ERTS_SMP - if (no_proc_locks) - erts_smp_mtx_lock(prt->lock); - else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) { - /* Unlock process locks, and acquire locks in lock order... */ - erts_smp_proc_unlock(c_p, c_p_locks); - erts_smp_mtx_lock(prt->lock); - erts_smp_proc_lock(c_p, c_p_locks); + erts_smp_port_unlock(prt); +#else + if (prt->cleanup) { + prt->cleanup = 0; + erts_port_cleanup(prt); + } +#endif +} + +#ifdef ERTS_SMP + +/* + * erts_thr_id2port_sflgs() and erts_thr_port_release() can + * be used by unmanaged threads in the SMP case. + */ +ERTS_GLB_INLINE Port * +erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs) +{ + Port *prt; + ErtsThrPrgrDelayHandle dhndl; + + if (is_not_internal_port(id)) + return NULL; + + dhndl = erts_thr_progress_unmanaged_delay(); + + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); + + if (!prt || prt->common.id != id) { + erts_thr_progress_unmanaged_continue(dhndl); + prt = NULL; + } + else { + erts_aint32_t state; + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) { + erts_port_inc_refc(prt); + erts_thr_progress_unmanaged_continue(dhndl); } - /* The id may not have changed... */ - ERTS_SMP_LC_ASSERT(prt->common.id == id); - /* ... but state may have... */ - if (erts_smp_atomic32_read_nob(&prt->state) & sflgs) { - erts_smp_port_unlock(prt); /* Also decrements common.refc... */ + erts_mtx_lock(prt->lock); + state = erts_atomic32_read_nob(&prt->state); + if (state & invalid_sflgs) { + erts_mtx_unlock(prt->lock); + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_port_dec_refc(prt); prt = NULL; } -#endif - } return prt; } ERTS_GLB_INLINE void -erts_port_release(Port *prt) +erts_thr_port_release(Port *prt) { - erts_smp_port_unlock(prt); + erts_mtx_unlock(prt->lock); +#ifdef ERTS_SMP + if (!erts_thr_progress_is_managed_thread()) + erts_port_dec_refc(prt); +#endif +} + +#endif + +ERTS_GLB_INLINE Port* +erts_thr_drvport2port_raw(ErlDrvPort drvport) +{ +#if ERTS_ENABLE_LOCK_CHECK + int emu_thread = erts_lc_is_emu_thr(); +#endif + if (drvport == ERTS_INVALID_ERL_DRV_PORT) + return NULL; + else { + Port *prt = (Port *) drvport; +#if ERTS_ENABLE_LOCK_CHECK + if (emu_thread) { + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(!prt->port_data_lock + || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); + } + else { + ERTS_LC_ASSERT(prt->port_data_lock); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); + } +#endif + return prt; + } +} + +ERTS_GLB_INLINE Port* +erts_drvport2port_raw(ErlDrvPort drvport) +{ + ERTS_LC_ASSERT(erts_lc_is_emu_thr()); + if (drvport == ERTS_INVALID_ERL_DRV_PORT) + return NULL; + else { + Port *prt = (Port *) drvport; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + return prt; + } } ERTS_GLB_INLINE Port* erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep) { - int ix = (int) drvport; + Port *prt = erts_drvport2port_raw(drvport); erts_aint32_t state; - if (ix < 0 || erts_max_ports <= ix) + if (!prt) return NULL; - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) return NULL; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); if (statep) *statep = state; - return &erts_port[ix]; + return prt; } ERTS_GLB_INLINE Port* erts_drvportid2port(Eterm id) { - int ix; + Port *prt; erts_aint32_t state; if (is_not_internal_port(id)) return NULL; - ix = (int) internal_port_index(id); - if (erts_max_ports <= ix) + prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, + internal_port_index(id)); + if (!prt) return NULL; - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); - if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + if (prt->common.id != id) return NULL; - if (erts_port[ix].common.id != id) + state = erts_atomic32_read_nob(&prt->state); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) return NULL; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); - return &erts_port[ix]; + return prt; +} + +ERTS_GLB_INLINE Eterm +erts_drvport2id(ErlDrvPort drvport) +{ + Port *prt = erts_drvport2port_raw(drvport); + if (!prt) + return am_undefined; + else + return prt->common.id; } ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id) { - if (is_not_internal_port(id)) + Port *prt = erts_port_lookup_raw(id); + if (prt) + return (Uint32) erts_atomic32_read_acqb(&prt->state); + else return ERTS_PORT_SFLG_INVALID; - else { - erts_aint32_t state; - int ix = internal_port_index(id); - if (erts_max_ports <= ix) - return ERTS_PORT_SFLG_INVALID; - state = erts_smp_atomic32_read_ddrb(&erts_port[ix].state); - if (erts_port[ix].common.id != id) - return ERTS_PORT_SFLG_INVALID; - return state; - } } ERTS_GLB_INLINE int |