diff options
author | Rickard Green <[email protected]> | 2012-08-08 02:20:05 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2012-12-03 21:18:05 +0100 |
commit | b434a3ab242dde66e23a72122474854f51a61eff (patch) | |
tree | f205873230fb86943e6667c0b0de2e2563c82d1e /erts/emulator/beam/erl_port.h | |
parent | 3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1 (diff) | |
download | otp-b434a3ab242dde66e23a72122474854f51a61eff.tar.gz otp-b434a3ab242dde66e23a72122474854f51a61eff.tar.bz2 otp-b434a3ab242dde66e23a72122474854f51a61eff.zip |
Prepare for use of ptab functionality also for ports
Diffstat (limited to 'erts/emulator/beam/erl_port.h')
-rw-r--r-- | erts/emulator/beam/erl_port.h | 503 |
1 files changed, 503 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h new file mode 100644 index 0000000000..1a5bc636e2 --- /dev/null +++ b/erts/emulator/beam/erl_port.h @@ -0,0 +1,503 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2012. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +#ifndef ERL_PORT_H__ +#define ERL_PORT_H__ + +typedef struct port Port; +#include "erl_port_task.h" +#include "erl_ptab.h" + +typedef struct erts_driver_t_ erts_driver_t; + +#define SMALL_IO_QUEUE 5 /* Number of fixed elements */ + +typedef struct { + ErlDrvSizeT size; /* total size in bytes */ + + SysIOVec* v_start; + SysIOVec* v_end; + SysIOVec* v_head; + SysIOVec* v_tail; + SysIOVec v_small[SMALL_IO_QUEUE]; + + ErlDrvBinary** b_start; + ErlDrvBinary** b_end; + ErlDrvBinary** b_head; + ErlDrvBinary** b_tail; + ErlDrvBinary* b_small[SMALL_IO_QUEUE]; +} ErlIOQueue; + +typedef struct line_buf { /* Buffer used in line oriented I/O */ + ErlDrvSizeT bufsiz; /* Size of character buffer */ + ErlDrvSizeT ovlen; /* Length of overflow data */ + ErlDrvSizeT ovsiz; /* Actual size of overflow buffer */ + char data[1]; /* Starting point of buffer data, + data[0] is a flag indicating an unprocess CR, + The rest is the overflow buffer. */ +} LineBuf; + + +/* + * Port Specific Data. + * + * Only use PrtSD for very rarely used data. + */ + +#define ERTS_PRTSD_SCHED_ID 0 + +#define ERTS_PRTSD_SIZE 1 + +typedef struct { + void *data[ERTS_PRTSD_SIZE]; +} ErtsPrtSD; + +#ifdef ERTS_SMP +typedef struct ErtsXPortsList_ ErtsXPortsList; +#endif + +/* + * Port locking: + * + * Locking is done either driver specific or port specific. When + * driver specific locking is used, all instances of the driver, + * i.e. ports running the driver, share the same lock. When port + * specific locking is used each instance have its own lock. + * + * Most fields in the Port structure are protected by the lock + * referred to by the 'lock' field. This lock is shared between + * all ports running the same driver when driver specific locking + * is used. + * + * The 'sched' field is protected by the run queue lock that the + * port currently is assigned to. + * + */ + +struct port { + ErtsPTabElementCommon common; /* *Need* to be first in struct */ + + ErtsPortTaskSched sched; + ErtsPortTaskHandle timeout_task; +#ifdef ERTS_SMP + erts_smp_mtx_t *lock; + ErtsXPortsList *xports; + erts_smp_atomic_t run_queue; + erts_smp_spinlock_t state_lck; /* protects: id, status, snapshot */ +#endif + Eterm connected; /* A connected process */ + Eterm caller; /* Current caller. */ + Eterm data; /* Data associated with port. */ + ErlHeapFragment* bp; /* Heap fragment holding data (NULL if imm data). */ + Uint bytes_in; /* Number of bytes read */ + Uint bytes_out; /* Number of bytes written */ + + ErlIOQueue ioq; /* driver accessible i/o queue */ + DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */ + char *name; /* String used in the open */ + erts_driver_t* drv_ptr; + UWord drv_data; + ErtsProcList *suspended; /* List of suspended processes. */ + LineBuf *linebuf; /* Buffer to hold data not ready for + process to get (line oriented I/O)*/ + erts_smp_atomic32_t state; /* Status and type flags */ + int control_flags; /* Flags for port_control() */ + erts_aint32_t snapshot; /* Next snapshot that port should be part of */ + ErlDrvPDL port_data_lock; + + ErtsPrtSD *psd; /* Port specific data */ +}; + + +ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt); + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE ErtsRunQueue * +erts_port_runq(Port *prt) +{ +#ifdef ERTS_SMP + ErtsRunQueue *rq1, *rq2; + rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue); + if (!rq1) + return NULL; + while (1) { + erts_smp_runq_lock(rq1); + rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue); + if (rq1 == rq2) + return rq1; + erts_smp_runq_unlock(rq1); + rq1 = rq2; + if (!rq1) + return NULL; + } +#else + return ERTS_RUNQ_IX(0); +#endif +} + +#endif + + +ERTS_GLB_INLINE void *erts_prtsd_get(Port *p, int ix); +ERTS_GLB_INLINE void *erts_prtsd_set(Port *p, int ix, void *new); + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE void * +erts_prtsd_get(Port *prt, int ix) +{ + return prt->psd ? prt->psd->data[ix] : NULL; +} + +ERTS_GLB_INLINE void * +erts_prtsd_set(Port *prt, int ix, void *data) +{ + if (prt->psd) { + void *old = prt->psd->data[ix]; + prt->psd->data[ix] = data; + return old; + } + else { + prt->psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD)); + prt->psd->data[ix] = data; + return NULL; + } +} + +#endif + +/* arrays that get malloced at startup */ +extern Port* erts_port; + +extern Uint erts_max_ports; +extern Uint erts_port_tab_index_mask; +extern erts_smp_atomic32_t erts_ports_snapshot; +extern erts_smp_atomic_t erts_dead_ports_ptr; + +ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt); + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt) +{ + ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck)); + if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) { + /* Dead ports are added from the end of the snapshot buffer */ + Eterm* tombstone; + tombstone = (Eterm*) erts_smp_atomic_add_read_nob(&erts_dead_ports_ptr, + -(erts_aint_t)sizeof(Eterm)); + ASSERT(tombstone+1 != NULL); + ASSERT(prt->snapshot == erts_smp_atomic32_read_nob(&erts_ports_snapshot) - 1); + *tombstone = prt->common.id; + } + /*else no ongoing snapshot or port was already included or created after snapshot */ +} + +#endif + +extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */ +extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ + + +/* port status flags */ + +#define ERTS_PORT_SFLG_CONNECTED ((Uint32) (1 << 0)) +/* Port have begun exiting */ +#define ERTS_PORT_SFLG_EXITING ((Uint32) (1 << 1)) +/* Distribution port */ +#define ERTS_PORT_SFLG_DISTRIBUTION ((Uint32) (1 << 2)) +#define ERTS_PORT_SFLG_BINARY_IO ((Uint32) (1 << 3)) +#define ERTS_PORT_SFLG_SOFT_EOF ((Uint32) (1 << 4)) +/* Flow control */ +#define ERTS_PORT_SFLG_PORT_BUSY ((Uint32) (1 << 5)) +/* Port is closing (no i/o accepted) */ +#define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 6)) +/* Send a closed message when terminating */ +#define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 7)) +/* Line orinted io on port */ +#define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 8)) +/* Immortal port (only certain system ports) */ +#define ERTS_PORT_SFLG_IMMORTAL ((Uint32) (1 << 9)) +#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 10)) +#define ERTS_PORT_SFLG_FREE_SCHEDULED ((Uint32) (1 << 11)) +#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 12)) +/* Port uses port specific locking (opposed to driver specific locking) */ +#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 13)) +#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 14)) +/* Last port to terminate halts the emulator */ +#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 15)) +#ifdef DEBUG +/* Only debug: make sure all flags aren't cleared unintentionally */ +#define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31)) +#endif + +/* Combinations of port status flags */ +#define ERTS_PORT_SFLGS_DEAD \ + (ERTS_PORT_SFLG_FREE \ + | ERTS_PORT_SFLG_FREE_SCHEDULED \ + | ERTS_PORT_SFLG_INITIALIZING) +#define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \ + (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID) +#define ERTS_PORT_SFLGS_INVALID_LOOKUP \ + (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \ + | ERTS_PORT_SFLG_CLOSING) +#define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \ + (ERTS_PORT_SFLGS_INVALID_LOOKUP \ + | ERTS_PORT_SFLG_PORT_BUSY \ + | ERTS_PORT_SFLG_DISTRIBUTION) + + +void erts_port_cleanup(Port *); +void erts_fire_port_monitor(Port *prt, Eterm ref); +#ifdef ERTS_SMP +void erts_smp_xports_unlock(Port *); +#endif + +#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) +int erts_lc_is_port_locked(Port *); +#endif + +ERTS_GLB_INLINE void erts_smp_port_minor_lock(Port*); +ERTS_GLB_INLINE void erts_smp_port_minor_unlock(Port*); + +ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt); +ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt); +ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt); + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE void +erts_smp_port_minor_lock(Port* prt) +{ +#ifdef ERTS_SMP + erts_smp_spin_lock(&prt->state_lck); +#endif +} + +ERTS_GLB_INLINE void +erts_smp_port_minor_unlock(Port *prt) +{ +#ifdef ERTS_SMP + erts_smp_spin_unlock(&prt->state_lck); +#endif +} + + +ERTS_GLB_INLINE int +erts_smp_port_trylock(Port *prt) +{ + int res; + + ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0); + erts_smp_atomic32_inc_nob(&prt->common.refc); + +#ifdef ERTS_SMP + res = erts_smp_mtx_trylock(prt->lock); + if (res == EBUSY) { + erts_smp_atomic32_dec_nob(&prt->common.refc); + } +#else + res = 0; +#endif + + return res; +} + +ERTS_GLB_INLINE void +erts_smp_port_lock(Port *prt) +{ + ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0); + erts_smp_atomic32_inc_nob(&prt->common.refc); +#ifdef ERTS_SMP + erts_smp_mtx_lock(prt->lock); +#endif +} + +ERTS_GLB_INLINE void +erts_smp_port_unlock(Port *prt) +{ + erts_aint32_t refc; +#ifdef ERTS_SMP + erts_smp_mtx_unlock(prt->lock); +#endif + refc = erts_smp_atomic32_dec_read_nob(&prt->common.refc); + ASSERT(refc >= 0); + if (refc == 0) + erts_port_cleanup(prt); +} + +#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ + + +#define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \ + (!(PP) \ + || (erts_smp_atomic32_read_nob(&(PP)->state) & (FLGS)) \ + || (PP)->common.id != (ID)) + +/* port lookup */ + +#define INVALID_PORT(PP, ID) \ + ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_LOOKUP) + +/* Invalidate trace port if anything suspicious, for instance + * that the port is a distribution port or it is busy. + */ +#define INVALID_TRACER_PORT(PP, ID) \ + ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP) + +#define ERTS_PORT_SCHED_ID(P, ID) \ + ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID))) + +#ifdef ERTS_SMP +Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks); +#endif + +#define erts_id2port(ID, P, PL) \ + erts_id2port_sflgs((ID), (P), (PL), ERTS_PORT_SFLGS_INVALID_LOOKUP) + +ERTS_GLB_INLINE Port*erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32); +ERTS_GLB_INLINE void erts_port_release(Port *); +ERTS_GLB_INLINE Port*erts_drvport2port(ErlDrvPort, erts_aint32_t *); +ERTS_GLB_INLINE Port*erts_drvportid2port(Eterm); +ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id); +ERTS_GLB_INLINE int erts_is_port_alive(Eterm id); +ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm id); + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE Port* +erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs) +{ +#ifdef ERTS_SMP + int no_proc_locks = !c_p || !c_p_locks; +#endif + Port *prt; + + if (is_not_internal_port(id)) + return NULL; + + prt = &erts_port[internal_port_index(id)]; + + erts_smp_port_minor_lock(prt); + if (ERTS_INVALID_PORT_OPT(prt, id, sflgs)) { + erts_smp_port_minor_unlock(prt); + prt = NULL; + } + else { + erts_smp_atomic32_inc_nob(&prt->common.refc); + erts_smp_port_minor_unlock(prt); + +#ifdef ERTS_SMP + if (no_proc_locks) + erts_smp_mtx_lock(prt->lock); + else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) { + /* Unlock process locks, and acquire locks in lock order... */ + erts_smp_proc_unlock(c_p, c_p_locks); + erts_smp_mtx_lock(prt->lock); + erts_smp_proc_lock(c_p, c_p_locks); + } + + /* The id may not have changed... */ + ERTS_SMP_LC_ASSERT(prt->common.id == id); + /* ... but state may have... */ + if (erts_smp_atomic32_read_nob(&prt->state) & sflgs) { + erts_smp_port_unlock(prt); /* Also decrements common.refc... */ + prt = NULL; + } +#endif + + } + + return prt; +} + +ERTS_GLB_INLINE void +erts_port_release(Port *prt) +{ + erts_smp_port_unlock(prt); +} + +ERTS_GLB_INLINE Port* +erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep) +{ + int ix = (int) drvport; + erts_aint32_t state; + if (ix < 0 || erts_max_ports <= ix) + return NULL; + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return NULL; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); + if (statep) + *statep = state; + return &erts_port[ix]; +} + +ERTS_GLB_INLINE Port* +erts_drvportid2port(Eterm id) +{ + int ix; + erts_aint32_t state; + if (is_not_internal_port(id)) + return NULL; + ix = (int) internal_port_index(id); + if (erts_max_ports <= ix) + return NULL; + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return NULL; + if (erts_port[ix].common.id != id) + return NULL; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); + return &erts_port[ix]; +} + +ERTS_GLB_INLINE Uint32 +erts_portid2status(Eterm id) +{ + if (is_not_internal_port(id)) + return ERTS_PORT_SFLG_INVALID; + else { + erts_aint32_t state; + int ix = internal_port_index(id); + if (erts_max_ports <= ix) + return ERTS_PORT_SFLG_INVALID; + state = erts_smp_atomic32_read_ddrb(&erts_port[ix].state); + if (erts_port[ix].common.id != id) + return ERTS_PORT_SFLG_INVALID; + return state; + } +} + +ERTS_GLB_INLINE int +erts_is_port_alive(Eterm id) +{ + return !(erts_portid2status(id) & (ERTS_PORT_SFLG_INVALID + | ERTS_PORT_SFLGS_DEAD)); +} + +ERTS_GLB_INLINE int +erts_is_valid_tracer_port(Eterm id) +{ + return !(erts_portid2status(id) & ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); +} +#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ + +#endif |