/* * %CopyrightBegin% * * Copyright Ericsson AB 2012. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ #ifndef ERL_PORT_H__ #define ERL_PORT_H__ typedef struct port Port; #include "erl_port_task.h" #include "erl_ptab.h" typedef struct erts_driver_t_ erts_driver_t; #define SMALL_IO_QUEUE 5 /* Number of fixed elements */ typedef struct { ErlDrvSizeT size; /* total size in bytes */ SysIOVec* v_start; SysIOVec* v_end; SysIOVec* v_head; SysIOVec* v_tail; SysIOVec v_small[SMALL_IO_QUEUE]; ErlDrvBinary** b_start; ErlDrvBinary** b_end; ErlDrvBinary** b_head; ErlDrvBinary** b_tail; ErlDrvBinary* b_small[SMALL_IO_QUEUE]; } ErlIOQueue; typedef struct line_buf { /* Buffer used in line oriented I/O */ ErlDrvSizeT bufsiz; /* Size of character buffer */ ErlDrvSizeT ovlen; /* Length of overflow data */ ErlDrvSizeT ovsiz; /* Actual size of overflow buffer */ char data[1]; /* Starting point of buffer data, data[0] is a flag indicating an unprocess CR, The rest is the overflow buffer. */ } LineBuf; /* * Port Specific Data. * * Only use PrtSD for very rarely used data. */ #define ERTS_PRTSD_SCHED_ID 0 #define ERTS_PRTSD_SIZE 1 typedef struct { void *data[ERTS_PRTSD_SIZE]; } ErtsPrtSD; #ifdef ERTS_SMP typedef struct ErtsXPortsList_ ErtsXPortsList; #endif /* * Port locking: * * Locking is done either driver specific or port specific. When * driver specific locking is used, all instances of the driver, * i.e. ports running the driver, share the same lock. When port * specific locking is used each instance have its own lock. * * Most fields in the Port structure are protected by the lock * referred to by the 'lock' field. This lock is shared between * all ports running the same driver when driver specific locking * is used. * * The 'sched' field is protected by the run queue lock that the * port currently is assigned to. * */ struct port { ErtsPTabElementCommon common; /* *Need* to be first in struct */ ErtsPortTaskSched sched; ErtsPortTaskHandle timeout_task; #ifdef ERTS_SMP erts_smp_mtx_t *lock; ErtsXPortsList *xports; erts_smp_atomic_t run_queue; erts_smp_spinlock_t state_lck; /* protects: id, status, snapshot */ #endif Eterm connected; /* A connected process */ Eterm caller; /* Current caller. */ Eterm data; /* Data associated with port. */ ErlHeapFragment* bp; /* Heap fragment holding data (NULL if imm data). */ Uint bytes_in; /* Number of bytes read */ Uint bytes_out; /* Number of bytes written */ ErlIOQueue ioq; /* driver accessible i/o queue */ DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */ char *name; /* String used in the open */ erts_driver_t* drv_ptr; UWord drv_data; ErtsProcList *suspended; /* List of suspended processes. */ LineBuf *linebuf; /* Buffer to hold data not ready for process to get (line oriented I/O)*/ erts_smp_atomic32_t state; /* Status and type flags */ int control_flags; /* Flags for port_control() */ erts_aint32_t snapshot; /* Next snapshot that port should be part of */ ErlDrvPDL port_data_lock; ErtsPrtSD *psd; /* Port specific data */ }; ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE ErtsRunQueue * erts_port_runq(Port *prt) { #ifdef ERTS_SMP ErtsRunQueue *rq1, *rq2; rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue); if (!rq1) return NULL; while (1) { erts_smp_runq_lock(rq1); rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue); if (rq1 == rq2) return rq1; erts_smp_runq_unlock(rq1); rq1 = rq2; if (!rq1) return NULL; } #else return ERTS_RUNQ_IX(0); #endif } #endif ERTS_GLB_INLINE void *erts_prtsd_get(Port *p, int ix); ERTS_GLB_INLINE void *erts_prtsd_set(Port *p, int ix, void *new); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE void * erts_prtsd_get(Port *prt, int ix) { return prt->psd ? prt->psd->data[ix] : NULL; } ERTS_GLB_INLINE void * erts_prtsd_set(Port *prt, int ix, void *data) { if (prt->psd) { void *old = prt->psd->data[ix]; prt->psd->data[ix] = data; return old; } else { prt->psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD)); prt->psd->data[ix] = data; return NULL; } } #endif /* arrays that get malloced at startup */ extern Port* erts_port; extern Uint erts_max_ports; extern Uint erts_port_tab_index_mask; extern erts_smp_atomic32_t erts_ports_snapshot; extern erts_smp_atomic_t erts_dead_ports_ptr; ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt) { ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck)); if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) { /* Dead ports are added from the end of the snapshot buffer */ Eterm* tombstone; tombstone = (Eterm*) erts_smp_atomic_add_read_nob(&erts_dead_ports_ptr, -(erts_aint_t)sizeof(Eterm)); ASSERT(tombstone+1 != NULL); ASSERT(prt->snapshot == erts_smp_atomic32_read_nob(&erts_ports_snapshot) - 1); *tombstone = prt->common.id; } /*else no ongoing snapshot or port was already included or created after snapshot */ } #endif extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ /* port status flags */ #define ERTS_PORT_SFLG_CONNECTED ((Uint32) (1 << 0)) /* Port have begun exiting */ #define ERTS_PORT_SFLG_EXITING ((Uint32) (1 << 1)) /* Distribution port */ #define ERTS_PORT_SFLG_DISTRIBUTION ((Uint32) (1 << 2)) #define ERTS_PORT_SFLG_BINARY_IO ((Uint32) (1 << 3)) #define ERTS_PORT_SFLG_SOFT_EOF ((Uint32) (1 << 4)) /* Flow control */ #define ERTS_PORT_SFLG_PORT_BUSY ((Uint32) (1 << 5)) /* Port is closing (no i/o accepted) */ #define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 6)) /* Send a closed message when terminating */ #define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 7)) /* Line orinted io on port */ #define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 8)) /* Immortal port (only certain system ports) */ #define ERTS_PORT_SFLG_IMMORTAL ((Uint32) (1 << 9)) #define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 10)) #define ERTS_PORT_SFLG_FREE_SCHEDULED ((Uint32) (1 << 11)) #define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 12)) /* Port uses port specific locking (opposed to driver specific locking) */ #define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 13)) #define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 14)) /* Last port to terminate halts the emulator */ #define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 15)) #ifdef DEBUG /* Only debug: make sure all flags aren't cleared unintentionally */ #define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31)) #endif /* Combinations of port status flags */ #define ERTS_PORT_SFLGS_DEAD \ (ERTS_PORT_SFLG_FREE \ | ERTS_PORT_SFLG_FREE_SCHEDULED \ | ERTS_PORT_SFLG_INITIALIZING) #define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \ (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID) #define ERTS_PORT_SFLGS_INVALID_LOOKUP \ (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \ | ERTS_PORT_SFLG_CLOSING) #define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \ (ERTS_PORT_SFLGS_INVALID_LOOKUP \ | ERTS_PORT_SFLG_PORT_BUSY \ | ERTS_PORT_SFLG_DISTRIBUTION) void erts_port_cleanup(Port *); void erts_fire_port_monitor(Port *prt, Eterm ref); #ifdef ERTS_SMP void erts_smp_xports_unlock(Port *); #endif #if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) int erts_lc_is_port_locked(Port *); #endif ERTS_GLB_INLINE void erts_smp_port_minor_lock(Port*); ERTS_GLB_INLINE void erts_smp_port_minor_unlock(Port*); ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt); ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt); ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE void erts_smp_port_minor_lock(Port* prt) { #ifdef ERTS_SMP erts_smp_spin_lock(&prt->state_lck); #endif } ERTS_GLB_INLINE void erts_smp_port_minor_unlock(Port *prt) { #ifdef ERTS_SMP erts_smp_spin_unlock(&prt->state_lck); #endif } ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt) { int res; ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0); erts_smp_atomic32_inc_nob(&prt->common.refc); #ifdef ERTS_SMP res = erts_smp_mtx_trylock(prt->lock); if (res == EBUSY) { erts_smp_atomic32_dec_nob(&prt->common.refc); } #else res = 0; #endif return res; } ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt) { ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0); erts_smp_atomic32_inc_nob(&prt->common.refc); #ifdef ERTS_SMP erts_smp_mtx_lock(prt->lock); #endif } ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt) { erts_aint32_t refc; #ifdef ERTS_SMP erts_smp_mtx_unlock(prt->lock); #endif refc = erts_smp_atomic32_dec_read_nob(&prt->common.refc); ASSERT(refc >= 0); if (refc == 0) erts_port_cleanup(prt); } #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ #define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \ (!(PP) \ || (erts_smp_atomic32_read_nob(&(PP)->state) & (FLGS)) \ || (PP)->common.id != (ID)) /* port lookup */ #define INVALID_PORT(PP, ID) \ ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_LOOKUP) /* Invalidate trace port if anything suspicious, for instance * that the port is a distribution port or it is busy. */ #define INVALID_TRACER_PORT(PP, ID) \ ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP) #define ERTS_PORT_SCHED_ID(P, ID) \ ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID))) #ifdef ERTS_SMP Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks); #endif #define erts_id2port(ID, P, PL) \ erts_id2port_sflgs((ID), (P), (PL), ERTS_PORT_SFLGS_INVALID_LOOKUP) ERTS_GLB_INLINE Port*erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32); ERTS_GLB_INLINE void erts_port_release(Port *); ERTS_GLB_INLINE Port*erts_drvport2port(ErlDrvPort, erts_aint32_t *); ERTS_GLB_INLINE Port*erts_drvportid2port(Eterm); ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id); ERTS_GLB_INLINE int erts_is_port_alive(Eterm id); ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm id); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE Port* erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs) { #ifdef ERTS_SMP int no_proc_locks = !c_p || !c_p_locks; #endif Port *prt; if (is_not_internal_port(id)) return NULL; prt = &erts_port[internal_port_index(id)]; erts_smp_port_minor_lock(prt); if (ERTS_INVALID_PORT_OPT(prt, id, sflgs)) { erts_smp_port_minor_unlock(prt); prt = NULL; } else { erts_smp_atomic32_inc_nob(&prt->common.refc); erts_smp_port_minor_unlock(prt); #ifdef ERTS_SMP if (no_proc_locks) erts_smp_mtx_lock(prt->lock); else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) { /* Unlock process locks, and acquire locks in lock order... */ erts_smp_proc_unlock(c_p, c_p_locks); erts_smp_mtx_lock(prt->lock); erts_smp_proc_lock(c_p, c_p_locks); } /* The id may not have changed... */ ERTS_SMP_LC_ASSERT(prt->common.id == id); /* ... but state may have... */ if (erts_smp_atomic32_read_nob(&prt->state) & sflgs) { erts_smp_port_unlock(prt); /* Also decrements common.refc... */ prt = NULL; } #endif } return prt; } ERTS_GLB_INLINE void erts_port_release(Port *prt) { erts_smp_port_unlock(prt); } ERTS_GLB_INLINE Port* erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep) { int ix = (int) drvport; erts_aint32_t state; if (ix < 0 || erts_max_ports <= ix) return NULL; state = erts_smp_atomic32_read_nob(&erts_port[ix].state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) return NULL; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); if (statep) *statep = state; return &erts_port[ix]; } ERTS_GLB_INLINE Port* erts_drvportid2port(Eterm id) { int ix; erts_aint32_t state; if (is_not_internal_port(id)) return NULL; ix = (int) internal_port_index(id); if (erts_max_ports <= ix) return NULL; state = erts_smp_atomic32_read_nob(&erts_port[ix].state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) return NULL; if (erts_port[ix].common.id != id) return NULL; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); return &erts_port[ix]; } ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id) { if (is_not_internal_port(id)) return ERTS_PORT_SFLG_INVALID; else { erts_aint32_t state; int ix = internal_port_index(id); if (erts_max_ports <= ix) return ERTS_PORT_SFLG_INVALID; state = erts_smp_atomic32_read_ddrb(&erts_port[ix].state); if (erts_port[ix].common.id != id) return ERTS_PORT_SFLG_INVALID; return state; } } ERTS_GLB_INLINE int erts_is_port_alive(Eterm id) { return !(erts_portid2status(id) & (ERTS_PORT_SFLG_INVALID | ERTS_PORT_SFLGS_DEAD)); } ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm id) { return !(erts_portid2status(id) & ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); } #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ #endif