/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2012. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
#ifndef ERL_PORT_TYPE__
#define ERL_PORT_TYPE__
typedef struct _erl_drv_port Port;
#endif
#if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__)
#define ERL_PORT_H__
#include "erl_port_task.h"
#include "erl_ptab.h"
#include "erl_thr_progress.h"
#include "erl_trace.h"
#define ERTS_DEFAULT_MAX_PORTS (1 << 16)
#define ERTS_MIN_PORTS 1024
typedef struct erts_driver_t_ erts_driver_t;
#define ERTS_INVALID_ERL_DRV_PORT ((ErlDrvPort) (SWord) -1)
#define SMALL_IO_QUEUE 5 /* Number of fixed elements */
typedef struct {
ErlDrvSizeT size; /* total size in bytes */
SysIOVec* v_start;
SysIOVec* v_end;
SysIOVec* v_head;
SysIOVec* v_tail;
SysIOVec v_small[SMALL_IO_QUEUE];
ErlDrvBinary** b_start;
ErlDrvBinary** b_end;
ErlDrvBinary** b_head;
ErlDrvBinary** b_tail;
ErlDrvBinary* b_small[SMALL_IO_QUEUE];
} ErlIOQueue;
typedef struct line_buf { /* Buffer used in line oriented I/O */
ErlDrvSizeT bufsiz; /* Size of character buffer */
ErlDrvSizeT ovlen; /* Length of overflow data */
ErlDrvSizeT ovsiz; /* Actual size of overflow buffer */
char data[1]; /* Starting point of buffer data,
data[0] is a flag indicating an unprocess CR,
The rest is the overflow buffer. */
} LineBuf;
/*
* Port Specific Data.
*
* Only use PrtSD for very rarely used data.
*/
#define ERTS_PRTSD_SCHED_ID 0
#define ERTS_PRTSD_SIZE 1
typedef struct {
void *data[ERTS_PRTSD_SIZE];
} ErtsPrtSD;
#ifdef ERTS_SMP
typedef struct ErtsXPortsList_ ErtsXPortsList;
#endif
/*
* Port locking:
*
* Locking is done either driver specific or port specific. When
* driver specific locking is used, all instances of the driver,
* i.e. ports running the driver, share the same lock. When port
* specific locking is used each instance have its own lock.
*
* Most fields in the Port structure are protected by the lock
* referred to by the 'lock' field. This lock is shared between
* all ports running the same driver when driver specific locking
* is used.
*
* The 'sched' field is protected by the run queue lock that the
* port currently is assigned to.
*
*/
struct _erl_drv_port {
ErtsPTabElementCommon common; /* *Need* to be first in struct */
ErtsPortTaskSched sched;
ErtsPortTaskHandle timeout_task;
#ifdef ERTS_SMP
erts_mtx_t *lock;
ErtsXPortsList *xports;
erts_smp_atomic_t run_queue;
#else
erts_atomic32_t refc;
int cleanup;
#endif
Eterm connected; /* A connected process */
Eterm caller; /* Current caller. */
Eterm data; /* Data associated with port. */
ErlHeapFragment* bp; /* Heap fragment holding data (NULL if imm data). */
Uint bytes_in; /* Number of bytes read */
Uint bytes_out; /* Number of bytes written */
ErlIOQueue ioq; /* driver accessible i/o queue */
DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */
char *name; /* String used in the open */
erts_driver_t* drv_ptr;
UWord drv_data;
ErtsProcList *suspended; /* List of suspended processes. */
LineBuf *linebuf; /* Buffer to hold data not ready for
process to get (line oriented I/O)*/
erts_atomic32_t state; /* Status and type flags */
int control_flags; /* Flags for port_control() */
ErlDrvPDL port_data_lock;
ErtsPrtSD *psd; /* Port specific data */
};
struct erl_drv_port_data_lock {
erts_mtx_t mtx;
erts_atomic_t refc;
Port *prt;
};
ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE ErtsRunQueue *
erts_port_runq(Port *prt)
{
#ifdef ERTS_SMP
ErtsRunQueue *rq1, *rq2;
rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
if (!rq1)
return NULL;
while (1) {
erts_smp_runq_lock(rq1);
rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
if (rq1 == rq2)
return rq1;
erts_smp_runq_unlock(rq1);
rq1 = rq2;
if (!rq1)
return NULL;
}
#else
return ERTS_RUNQ_IX(0);
#endif
}
#endif
ERTS_GLB_INLINE void *erts_prtsd_get(Port *p, int ix);
ERTS_GLB_INLINE void *erts_prtsd_set(Port *p, int ix, void *new);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void *
erts_prtsd_get(Port *prt, int ix)
{
return prt->psd ? prt->psd->data[ix] : NULL;
}
ERTS_GLB_INLINE void *
erts_prtsd_set(Port *prt, int ix, void *data)
{
if (prt->psd) {
void *old = prt->psd->data[ix];
prt->psd->data[ix] = data;
return old;
}
else {
prt->psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD));
prt->psd->data[ix] = data;
return NULL;
}
}
#endif
extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */
extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
/* port status flags */
#define ERTS_PORT_SFLG_CONNECTED ((Uint32) (1 << 0))
/* Port have begun exiting */
#define ERTS_PORT_SFLG_EXITING ((Uint32) (1 << 1))
/* Distribution port */
#define ERTS_PORT_SFLG_DISTRIBUTION ((Uint32) (1 << 2))
#define ERTS_PORT_SFLG_BINARY_IO ((Uint32) (1 << 3))
#define ERTS_PORT_SFLG_SOFT_EOF ((Uint32) (1 << 4))
/* Flow control */
/* Port is closing (no i/o accepted) */
#define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 5))
/* Send a closed message when terminating */
#define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 6))
/* Line orinted io on port */
#define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 7))
/* Immortal port (only certain system ports) */
#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 8))
#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 9))
/* Port uses port specific locking (opposed to driver specific locking) */
#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 10))
#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 11))
/* Last port to terminate halts the emulator */
#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 12))
#ifdef DEBUG
/* Only debug: make sure all flags aren't cleared unintentionally */
#define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31))
#endif
/* Combinations of port status flags */
#define ERTS_PORT_SFLGS_DEAD \
(ERTS_PORT_SFLG_FREE | ERTS_PORT_SFLG_INITIALIZING)
#define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
(ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID)
#define ERTS_PORT_SFLGS_INVALID_LOOKUP \
(ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
| ERTS_PORT_SFLG_CLOSING)
#define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \
(ERTS_PORT_SFLGS_INVALID_LOOKUP \
| ERTS_PORT_SFLG_DISTRIBUTION)
void print_port_info(Port *, int, void *);
void erts_port_free(Port *);
#ifndef ERTS_SMP
void erts_port_cleanup(Port *);
#endif
void erts_fire_port_monitor(Port *prt, Eterm ref);
#ifdef ERTS_SMP
void erts_smp_xports_unlock(Port *);
#endif
#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
int erts_lc_is_port_locked(Port *);
#endif
ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt);
ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt);
ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc);
ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt);
ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt);
ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt)
{
#ifdef ERTS_SMP
erts_ptab_inc_refc(&prt->common);
#else
erts_atomic32_inc_nob(&prt->refc);
#endif
}
ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt)
{
#ifdef ERTS_SMP
int referred = erts_ptab_dec_test_refc(&prt->common);
if (!referred)
erts_port_free(prt);
#else
int refc = erts_atomic32_dec_read_nob(&prt->refc);
if (refc == 0)
erts_port_free(prt);
#endif
}
ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc)
{
#ifdef ERTS_SMP
int referred = erts_ptab_add_test_refc(&prt->common, add_refc);
if (!referred)
erts_port_free(prt);
#else
int refc = erts_atomic32_add_read_nob(&prt->refc, add_refc);
if (refc == 0)
erts_port_free(prt);
#endif
}
ERTS_GLB_INLINE int
erts_smp_port_trylock(Port *prt)
{
#ifdef ERTS_SMP
/* *Need* to be a managed thread */
ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
return erts_mtx_trylock(prt->lock);
#else
return 0;
#endif
}
ERTS_GLB_INLINE void
erts_smp_port_lock(Port *prt)
{
#ifdef ERTS_SMP
/* *Need* to be a managed thread */
ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
erts_mtx_lock(prt->lock);
#endif
}
ERTS_GLB_INLINE void
erts_smp_port_unlock(Port *prt)
{
#ifdef ERTS_SMP
/* *Need* to be a managed thread */
ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
erts_mtx_unlock(prt->lock);
#endif
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
#define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \
(!(PP) \
|| (erts_atomic32_read_nob(&(PP)->state) & (FLGS)) \
|| (PP)->common.id != (ID))
/* port lookup */
#define INVALID_PORT(PP, ID) \
ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_LOOKUP)
/* Invalidate trace port if anything suspicious, for instance
* that the port is a distribution port or it is busy.
*/
#define INVALID_TRACER_PORT(PP, ID) \
ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP)
#define ERTS_PORT_SCHED_ID(P, ID) \
((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID)))
extern const Port erts_invalid_port;
#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port)
#ifdef ERTS_SMP
Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks);
#endif
ERTS_GLB_INLINE Port *erts_pix2port(int);
ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm);
ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32);
ERTS_GLB_INLINE Port*erts_id2port(Eterm id);
ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
ERTS_GLB_INLINE void erts_port_release(Port *);
#ifdef ERTS_SMP
ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs);
ERTS_GLB_INLINE void erts_thr_port_release(Port *prt);
#endif
ERTS_GLB_INLINE Port *erts_thr_drvport2port_raw(ErlDrvPort);
ERTS_GLB_INLINE Port *erts_drvport2port_raw(ErlDrvPort drvport);
ERTS_GLB_INLINE Port *erts_drvport2port(ErlDrvPort, erts_aint32_t *);
ERTS_GLB_INLINE Port *erts_drvportid2port(Eterm);
ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort);
ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm);
ERTS_GLB_INLINE int erts_is_port_alive(Eterm);
ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE Port *erts_pix2port(int ix)
{
Port *prt;
ASSERT(0 <= ix && ix < erts_ptab_max(&erts_port));
prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, ix);
return prt == ERTS_PORT_LOCK_BUSY ? NULL : prt;
}
ERTS_GLB_INLINE Port *
erts_port_lookup_raw(Eterm id)
{
Port *prt;
ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying());
if (is_not_internal_port(id))
return NULL;
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
return prt && prt->common.id == id ? prt : NULL;
}
ERTS_GLB_INLINE Port *
erts_port_lookup(Eterm id, Uint32 invalid_sflgs)
{
Port *prt = erts_port_lookup_raw(id);
erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
return (state & invalid_sflgs) ? NULL : prt;
}
ERTS_GLB_INLINE Port*
erts_id2port(Eterm id)
{
erts_aint32_t state;
Port *prt;
/* Only allowed to be called from managed threads */
ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
if (is_not_internal_port(id))
return NULL;
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
if (!prt || prt->common.id != id)
return NULL;
erts_smp_port_lock(prt);
state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
erts_smp_port_unlock(prt);
return NULL;
}
return prt;
}
ERTS_GLB_INLINE Port*
erts_id2port_sflgs(Eterm id,
Process *c_p, ErtsProcLocks c_p_locks,
Uint32 invalid_sflgs)
{
#ifdef ERTS_SMP
int no_proc_locks = !c_p || !c_p_locks;
#endif
erts_aint32_t state;
Port *prt;
/* Only allowed to be called from managed threads */
ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
if (is_not_internal_port(id))
return NULL;
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
if (!prt || prt->common.id != id)
return NULL;
#ifdef ERTS_SMP
if (no_proc_locks)
erts_smp_port_lock(prt);
else if (erts_smp_port_trylock(prt) == EBUSY) {
/* Unlock process locks, and acquire locks in lock order... */
erts_smp_proc_unlock(c_p, c_p_locks);
erts_smp_port_lock(prt);
erts_smp_proc_lock(c_p, c_p_locks);
}
#endif
state = erts_atomic32_read_nob(&prt->state);
if (state & invalid_sflgs) {
#ifdef ERTS_SMP
erts_smp_port_unlock(prt);
#endif
return NULL;
}
return prt;
}
ERTS_GLB_INLINE void
erts_port_release(Port *prt)
{
/* Only allowed to be called from managed threads */
ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
#ifdef ERTS_SMP
erts_smp_port_unlock(prt);
#else
if (prt->cleanup) {
prt->cleanup = 0;
erts_port_cleanup(prt);
}
#endif
}
#ifdef ERTS_SMP
/*
* erts_thr_id2port_sflgs() and erts_thr_port_release() can
* be used by unmanaged threads in the SMP case.
*/
ERTS_GLB_INLINE Port *
erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs)
{
Port *prt;
ErtsThrPrgrDelayHandle dhndl;
if (is_not_internal_port(id))
return NULL;
dhndl = erts_thr_progress_unmanaged_delay();
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
if (!prt || prt->common.id != id) {
erts_thr_progress_unmanaged_continue(dhndl);
prt = NULL;
}
else {
erts_aint32_t state;
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
erts_port_inc_refc(prt);
erts_thr_progress_unmanaged_continue(dhndl);
}
erts_mtx_lock(prt->lock);
state = erts_atomic32_read_nob(&prt->state);
if (state & invalid_sflgs) {
erts_mtx_unlock(prt->lock);
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_port_dec_refc(prt);
prt = NULL;
}
}
return prt;
}
ERTS_GLB_INLINE void
erts_thr_port_release(Port *prt)
{
erts_mtx_unlock(prt->lock);
#ifdef ERTS_SMP
if (!erts_thr_progress_is_managed_thread())
erts_port_dec_refc(prt);
#endif
}
#endif
ERTS_GLB_INLINE Port*
erts_thr_drvport2port_raw(ErlDrvPort drvport)
{
#if ERTS_ENABLE_LOCK_CHECK
int emu_thread = erts_lc_is_emu_thr();
#endif
if (drvport == ERTS_INVALID_ERL_DRV_PORT)
return NULL;
else {
Port *prt = (Port *) drvport;
#if ERTS_ENABLE_LOCK_CHECK
if (emu_thread) {
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ERTS_LC_ASSERT(!prt->port_data_lock
|| erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
}
else {
ERTS_LC_ASSERT(prt->port_data_lock);
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
}
#endif
return prt;
}
}
ERTS_GLB_INLINE Port*
erts_drvport2port_raw(ErlDrvPort drvport)
{
ERTS_LC_ASSERT(erts_lc_is_emu_thr());
if (drvport == ERTS_INVALID_ERL_DRV_PORT)
return NULL;
else {
Port *prt = (Port *) drvport;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
return prt;
}
}
ERTS_GLB_INLINE Port*
erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep)
{
Port *prt = erts_drvport2port_raw(drvport);
erts_aint32_t state;
if (!prt)
return NULL;
state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return NULL;
if (statep)
*statep = state;
return prt;
}
ERTS_GLB_INLINE Port*
erts_drvportid2port(Eterm id)
{
Port *prt;
erts_aint32_t state;
if (is_not_internal_port(id))
return NULL;
prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port,
internal_port_index(id));
if (!prt)
return NULL;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
if (prt->common.id != id)
return NULL;
state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return NULL;
return prt;
}
ERTS_GLB_INLINE Eterm
erts_drvport2id(ErlDrvPort drvport)
{
Port *prt = erts_drvport2port_raw(drvport);
if (!prt)
return am_undefined;
else
return prt->common.id;
}
ERTS_GLB_INLINE Uint32
erts_portid2status(Eterm id)
{
Port *prt = erts_port_lookup_raw(id);
if (prt)
return (Uint32) erts_atomic32_read_acqb(&prt->state);
else
return ERTS_PORT_SFLG_INVALID;
}
ERTS_GLB_INLINE int
erts_is_port_alive(Eterm id)
{
return !(erts_portid2status(id) & (ERTS_PORT_SFLG_INVALID
| ERTS_PORT_SFLGS_DEAD));
}
ERTS_GLB_INLINE int
erts_is_valid_tracer_port(Eterm id)
{
return !(erts_portid2status(id) & ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
#endif