aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port.h
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-08-08 02:20:05 +0200
committerRickard Green <[email protected]>2012-12-03 21:18:05 +0100
commitb434a3ab242dde66e23a72122474854f51a61eff (patch)
treef205873230fb86943e6667c0b0de2e2563c82d1e /erts/emulator/beam/erl_port.h
parent3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1 (diff)
downloadotp-b434a3ab242dde66e23a72122474854f51a61eff.tar.gz
otp-b434a3ab242dde66e23a72122474854f51a61eff.tar.bz2
otp-b434a3ab242dde66e23a72122474854f51a61eff.zip
Prepare for use of ptab functionality also for ports
Diffstat (limited to 'erts/emulator/beam/erl_port.h')
-rw-r--r--erts/emulator/beam/erl_port.h503
1 files changed, 503 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
new file mode 100644
index 0000000000..1a5bc636e2
--- /dev/null
+++ b/erts/emulator/beam/erl_port.h
@@ -0,0 +1,503 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2012. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+#ifndef ERL_PORT_H__
+#define ERL_PORT_H__
+
+typedef struct port Port;
+#include "erl_port_task.h"
+#include "erl_ptab.h"
+
+typedef struct erts_driver_t_ erts_driver_t;
+
+#define SMALL_IO_QUEUE 5 /* Number of fixed elements */
+
+typedef struct {
+ ErlDrvSizeT size; /* total size in bytes */
+
+ SysIOVec* v_start;
+ SysIOVec* v_end;
+ SysIOVec* v_head;
+ SysIOVec* v_tail;
+ SysIOVec v_small[SMALL_IO_QUEUE];
+
+ ErlDrvBinary** b_start;
+ ErlDrvBinary** b_end;
+ ErlDrvBinary** b_head;
+ ErlDrvBinary** b_tail;
+ ErlDrvBinary* b_small[SMALL_IO_QUEUE];
+} ErlIOQueue;
+
+typedef struct line_buf { /* Buffer used in line oriented I/O */
+ ErlDrvSizeT bufsiz; /* Size of character buffer */
+ ErlDrvSizeT ovlen; /* Length of overflow data */
+ ErlDrvSizeT ovsiz; /* Actual size of overflow buffer */
+ char data[1]; /* Starting point of buffer data,
+ data[0] is a flag indicating an unprocess CR,
+ The rest is the overflow buffer. */
+} LineBuf;
+
+
+/*
+ * Port Specific Data.
+ *
+ * Only use PrtSD for very rarely used data.
+ */
+
+#define ERTS_PRTSD_SCHED_ID 0
+
+#define ERTS_PRTSD_SIZE 1
+
+typedef struct {
+ void *data[ERTS_PRTSD_SIZE];
+} ErtsPrtSD;
+
+#ifdef ERTS_SMP
+typedef struct ErtsXPortsList_ ErtsXPortsList;
+#endif
+
+/*
+ * Port locking:
+ *
+ * Locking is done either driver specific or port specific. When
+ * driver specific locking is used, all instances of the driver,
+ * i.e. ports running the driver, share the same lock. When port
+ * specific locking is used each instance have its own lock.
+ *
+ * Most fields in the Port structure are protected by the lock
+ * referred to by the 'lock' field. This lock is shared between
+ * all ports running the same driver when driver specific locking
+ * is used.
+ *
+ * The 'sched' field is protected by the run queue lock that the
+ * port currently is assigned to.
+ *
+ */
+
+struct port {
+ ErtsPTabElementCommon common; /* *Need* to be first in struct */
+
+ ErtsPortTaskSched sched;
+ ErtsPortTaskHandle timeout_task;
+#ifdef ERTS_SMP
+ erts_smp_mtx_t *lock;
+ ErtsXPortsList *xports;
+ erts_smp_atomic_t run_queue;
+ erts_smp_spinlock_t state_lck; /* protects: id, status, snapshot */
+#endif
+ Eterm connected; /* A connected process */
+ Eterm caller; /* Current caller. */
+ Eterm data; /* Data associated with port. */
+ ErlHeapFragment* bp; /* Heap fragment holding data (NULL if imm data). */
+ Uint bytes_in; /* Number of bytes read */
+ Uint bytes_out; /* Number of bytes written */
+
+ ErlIOQueue ioq; /* driver accessible i/o queue */
+ DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */
+ char *name; /* String used in the open */
+ erts_driver_t* drv_ptr;
+ UWord drv_data;
+ ErtsProcList *suspended; /* List of suspended processes. */
+ LineBuf *linebuf; /* Buffer to hold data not ready for
+ process to get (line oriented I/O)*/
+ erts_smp_atomic32_t state; /* Status and type flags */
+ int control_flags; /* Flags for port_control() */
+ erts_aint32_t snapshot; /* Next snapshot that port should be part of */
+ ErlDrvPDL port_data_lock;
+
+ ErtsPrtSD *psd; /* Port specific data */
+};
+
+
+ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE ErtsRunQueue *
+erts_port_runq(Port *prt)
+{
+#ifdef ERTS_SMP
+ ErtsRunQueue *rq1, *rq2;
+ rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
+ if (!rq1)
+ return NULL;
+ while (1) {
+ erts_smp_runq_lock(rq1);
+ rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
+ if (rq1 == rq2)
+ return rq1;
+ erts_smp_runq_unlock(rq1);
+ rq1 = rq2;
+ if (!rq1)
+ return NULL;
+ }
+#else
+ return ERTS_RUNQ_IX(0);
+#endif
+}
+
+#endif
+
+
+ERTS_GLB_INLINE void *erts_prtsd_get(Port *p, int ix);
+ERTS_GLB_INLINE void *erts_prtsd_set(Port *p, int ix, void *new);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE void *
+erts_prtsd_get(Port *prt, int ix)
+{
+ return prt->psd ? prt->psd->data[ix] : NULL;
+}
+
+ERTS_GLB_INLINE void *
+erts_prtsd_set(Port *prt, int ix, void *data)
+{
+ if (prt->psd) {
+ void *old = prt->psd->data[ix];
+ prt->psd->data[ix] = data;
+ return old;
+ }
+ else {
+ prt->psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD));
+ prt->psd->data[ix] = data;
+ return NULL;
+ }
+}
+
+#endif
+
+/* arrays that get malloced at startup */
+extern Port* erts_port;
+
+extern Uint erts_max_ports;
+extern Uint erts_port_tab_index_mask;
+extern erts_smp_atomic32_t erts_ports_snapshot;
+extern erts_smp_atomic_t erts_dead_ports_ptr;
+
+ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt)
+{
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck));
+ if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) {
+ /* Dead ports are added from the end of the snapshot buffer */
+ Eterm* tombstone;
+ tombstone = (Eterm*) erts_smp_atomic_add_read_nob(&erts_dead_ports_ptr,
+ -(erts_aint_t)sizeof(Eterm));
+ ASSERT(tombstone+1 != NULL);
+ ASSERT(prt->snapshot == erts_smp_atomic32_read_nob(&erts_ports_snapshot) - 1);
+ *tombstone = prt->common.id;
+ }
+ /*else no ongoing snapshot or port was already included or created after snapshot */
+}
+
+#endif
+
+extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */
+extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
+
+
+/* port status flags */
+
+#define ERTS_PORT_SFLG_CONNECTED ((Uint32) (1 << 0))
+/* Port have begun exiting */
+#define ERTS_PORT_SFLG_EXITING ((Uint32) (1 << 1))
+/* Distribution port */
+#define ERTS_PORT_SFLG_DISTRIBUTION ((Uint32) (1 << 2))
+#define ERTS_PORT_SFLG_BINARY_IO ((Uint32) (1 << 3))
+#define ERTS_PORT_SFLG_SOFT_EOF ((Uint32) (1 << 4))
+/* Flow control */
+#define ERTS_PORT_SFLG_PORT_BUSY ((Uint32) (1 << 5))
+/* Port is closing (no i/o accepted) */
+#define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 6))
+/* Send a closed message when terminating */
+#define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 7))
+/* Line orinted io on port */
+#define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 8))
+/* Immortal port (only certain system ports) */
+#define ERTS_PORT_SFLG_IMMORTAL ((Uint32) (1 << 9))
+#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 10))
+#define ERTS_PORT_SFLG_FREE_SCHEDULED ((Uint32) (1 << 11))
+#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 12))
+/* Port uses port specific locking (opposed to driver specific locking) */
+#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 13))
+#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 14))
+/* Last port to terminate halts the emulator */
+#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 15))
+#ifdef DEBUG
+/* Only debug: make sure all flags aren't cleared unintentionally */
+#define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31))
+#endif
+
+/* Combinations of port status flags */
+#define ERTS_PORT_SFLGS_DEAD \
+ (ERTS_PORT_SFLG_FREE \
+ | ERTS_PORT_SFLG_FREE_SCHEDULED \
+ | ERTS_PORT_SFLG_INITIALIZING)
+#define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
+ (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID)
+#define ERTS_PORT_SFLGS_INVALID_LOOKUP \
+ (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
+ | ERTS_PORT_SFLG_CLOSING)
+#define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \
+ (ERTS_PORT_SFLGS_INVALID_LOOKUP \
+ | ERTS_PORT_SFLG_PORT_BUSY \
+ | ERTS_PORT_SFLG_DISTRIBUTION)
+
+
+void erts_port_cleanup(Port *);
+void erts_fire_port_monitor(Port *prt, Eterm ref);
+#ifdef ERTS_SMP
+void erts_smp_xports_unlock(Port *);
+#endif
+
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
+int erts_lc_is_port_locked(Port *);
+#endif
+
+ERTS_GLB_INLINE void erts_smp_port_minor_lock(Port*);
+ERTS_GLB_INLINE void erts_smp_port_minor_unlock(Port*);
+
+ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt);
+ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt);
+ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE void
+erts_smp_port_minor_lock(Port* prt)
+{
+#ifdef ERTS_SMP
+ erts_smp_spin_lock(&prt->state_lck);
+#endif
+}
+
+ERTS_GLB_INLINE void
+erts_smp_port_minor_unlock(Port *prt)
+{
+#ifdef ERTS_SMP
+ erts_smp_spin_unlock(&prt->state_lck);
+#endif
+}
+
+
+ERTS_GLB_INLINE int
+erts_smp_port_trylock(Port *prt)
+{
+ int res;
+
+ ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0);
+ erts_smp_atomic32_inc_nob(&prt->common.refc);
+
+#ifdef ERTS_SMP
+ res = erts_smp_mtx_trylock(prt->lock);
+ if (res == EBUSY) {
+ erts_smp_atomic32_dec_nob(&prt->common.refc);
+ }
+#else
+ res = 0;
+#endif
+
+ return res;
+}
+
+ERTS_GLB_INLINE void
+erts_smp_port_lock(Port *prt)
+{
+ ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) > 0);
+ erts_smp_atomic32_inc_nob(&prt->common.refc);
+#ifdef ERTS_SMP
+ erts_smp_mtx_lock(prt->lock);
+#endif
+}
+
+ERTS_GLB_INLINE void
+erts_smp_port_unlock(Port *prt)
+{
+ erts_aint32_t refc;
+#ifdef ERTS_SMP
+ erts_smp_mtx_unlock(prt->lock);
+#endif
+ refc = erts_smp_atomic32_dec_read_nob(&prt->common.refc);
+ ASSERT(refc >= 0);
+ if (refc == 0)
+ erts_port_cleanup(prt);
+}
+
+#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
+
+
+#define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \
+ (!(PP) \
+ || (erts_smp_atomic32_read_nob(&(PP)->state) & (FLGS)) \
+ || (PP)->common.id != (ID))
+
+/* port lookup */
+
+#define INVALID_PORT(PP, ID) \
+ ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_LOOKUP)
+
+/* Invalidate trace port if anything suspicious, for instance
+ * that the port is a distribution port or it is busy.
+ */
+#define INVALID_TRACER_PORT(PP, ID) \
+ ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP)
+
+#define ERTS_PORT_SCHED_ID(P, ID) \
+ ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID)))
+
+#ifdef ERTS_SMP
+Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks);
+#endif
+
+#define erts_id2port(ID, P, PL) \
+ erts_id2port_sflgs((ID), (P), (PL), ERTS_PORT_SFLGS_INVALID_LOOKUP)
+
+ERTS_GLB_INLINE Port*erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
+ERTS_GLB_INLINE void erts_port_release(Port *);
+ERTS_GLB_INLINE Port*erts_drvport2port(ErlDrvPort, erts_aint32_t *);
+ERTS_GLB_INLINE Port*erts_drvportid2port(Eterm);
+ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm id);
+ERTS_GLB_INLINE int erts_is_port_alive(Eterm id);
+ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm id);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE Port*
+erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs)
+{
+#ifdef ERTS_SMP
+ int no_proc_locks = !c_p || !c_p_locks;
+#endif
+ Port *prt;
+
+ if (is_not_internal_port(id))
+ return NULL;
+
+ prt = &erts_port[internal_port_index(id)];
+
+ erts_smp_port_minor_lock(prt);
+ if (ERTS_INVALID_PORT_OPT(prt, id, sflgs)) {
+ erts_smp_port_minor_unlock(prt);
+ prt = NULL;
+ }
+ else {
+ erts_smp_atomic32_inc_nob(&prt->common.refc);
+ erts_smp_port_minor_unlock(prt);
+
+#ifdef ERTS_SMP
+ if (no_proc_locks)
+ erts_smp_mtx_lock(prt->lock);
+ else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) {
+ /* Unlock process locks, and acquire locks in lock order... */
+ erts_smp_proc_unlock(c_p, c_p_locks);
+ erts_smp_mtx_lock(prt->lock);
+ erts_smp_proc_lock(c_p, c_p_locks);
+ }
+
+ /* The id may not have changed... */
+ ERTS_SMP_LC_ASSERT(prt->common.id == id);
+ /* ... but state may have... */
+ if (erts_smp_atomic32_read_nob(&prt->state) & sflgs) {
+ erts_smp_port_unlock(prt); /* Also decrements common.refc... */
+ prt = NULL;
+ }
+#endif
+
+ }
+
+ return prt;
+}
+
+ERTS_GLB_INLINE void
+erts_port_release(Port *prt)
+{
+ erts_smp_port_unlock(prt);
+}
+
+ERTS_GLB_INLINE Port*
+erts_drvport2port(ErlDrvPort drvport, erts_aint32_t *statep)
+{
+ int ix = (int) drvport;
+ erts_aint32_t state;
+ if (ix < 0 || erts_max_ports <= ix)
+ return NULL;
+ state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ return NULL;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
+ if (statep)
+ *statep = state;
+ return &erts_port[ix];
+}
+
+ERTS_GLB_INLINE Port*
+erts_drvportid2port(Eterm id)
+{
+ int ix;
+ erts_aint32_t state;
+ if (is_not_internal_port(id))
+ return NULL;
+ ix = (int) internal_port_index(id);
+ if (erts_max_ports <= ix)
+ return NULL;
+ state = erts_smp_atomic32_read_nob(&erts_port[ix].state);
+ if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ return NULL;
+ if (erts_port[ix].common.id != id)
+ return NULL;
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix]));
+ return &erts_port[ix];
+}
+
+ERTS_GLB_INLINE Uint32
+erts_portid2status(Eterm id)
+{
+ if (is_not_internal_port(id))
+ return ERTS_PORT_SFLG_INVALID;
+ else {
+ erts_aint32_t state;
+ int ix = internal_port_index(id);
+ if (erts_max_ports <= ix)
+ return ERTS_PORT_SFLG_INVALID;
+ state = erts_smp_atomic32_read_ddrb(&erts_port[ix].state);
+ if (erts_port[ix].common.id != id)
+ return ERTS_PORT_SFLG_INVALID;
+ return state;
+ }
+}
+
+ERTS_GLB_INLINE int
+erts_is_port_alive(Eterm id)
+{
+ return !(erts_portid2status(id) & (ERTS_PORT_SFLG_INVALID
+ | ERTS_PORT_SFLGS_DEAD));
+}
+
+ERTS_GLB_INLINE int
+erts_is_valid_tracer_port(Eterm id)
+{
+ return !(erts_portid2status(id) & ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
+}
+#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
+
+#endif