/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2012-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
#ifndef ERL_PORT_TYPE__
#define ERL_PORT_TYPE__
typedef struct _erl_drv_port Port;
typedef struct ErtsProc2PortSigData_ ErtsProc2PortSigData;
#endif
#if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__)
#define ERL_PORT_H__
#include "erl_port_task.h"
#include "erl_ptab.h"
#include "erl_thr_progress.h"
#include "erl_trace.h"
#define ERTS_IO_QUEUE_TYPES_ONLY__
#include "erl_io_queue.h"
#undef ERTS_IO_QUEUE_TYPES_ONLY__
#ifndef __WIN32__
#define ERTS_DEFAULT_MAX_PORTS (1 << 16)
#else
/*
* Do not default to as many max ports on Windows
* as there are no os limits to stop system
* from running amok. If allowed to go too high
* windows rarely recovers from the errors and
* other OS processes can be effected.
*/
#define ERTS_DEFAULT_MAX_PORTS (1 << 13)
#endif /* __WIN32__ */
#define ERTS_MIN_PORTS 1024
extern int erts_port_synchronous_ops;
extern int erts_port_schedule_all_ops;
extern int erts_port_parallelism;
typedef struct erts_driver_t_ erts_driver_t;
/*
* It would have been preferred to use NULL as value of
* ERTS_INVALID_ERL_DRV_PORT. That would, however, not be
* backward compatible. In pre-R16 systems, 0 was a valid
* port handle and -1 was used as invalid handle, so we
* are stuck with it.
*/
#define ERTS_INVALID_ERL_DRV_PORT ((struct _erl_drv_port *) ((SWord) -1))
#ifdef DEBUG
/* Make sure we use this api, and do not cast directly */
#define ERTS_ErlDrvPort2Port(PH) \
((PH) == ERTS_INVALID_ERL_DRV_PORT \
? ERTS_INVALID_ERL_DRV_PORT \
: ((Port *) ((PH) - 4711)))
#define ERTS_Port2ErlDrvPort(PH) \
((PH) == ERTS_INVALID_ERL_DRV_PORT \
? ERTS_INVALID_ERL_DRV_PORT \
: ((ErlDrvPort) ((PH) + 4711)))
#else
#define ERTS_ErlDrvPort2Port(PH) ((Port *) (PH))
#define ERTS_Port2ErlDrvPort(PH) ((ErlDrvPort) (PH))
#endif
typedef ErtsIOQueue ErlPortIOQueue;
typedef struct line_buf { /* Buffer used in line oriented I/O */
ErlDrvSizeT bufsiz; /* Size of character buffer */
ErlDrvSizeT ovlen; /* Length of overflow data */
ErlDrvSizeT ovsiz; /* Actual size of overflow buffer */
char data[1]; /* Starting point of buffer data,
data[0] is a flag indicating an unprocess CR,
The rest is the overflow buffer. */
} LineBuf;
/*
* Items part of erlang:port_info/1 result. Note am_registered_name
* *need* to be first.
*/
#define ERTS_PORT_INFO_1_ITEMS \
{ am_registered_name, /* Needs to be first */ \
am_name, \
am_links, \
am_id, \
am_connected, \
am_input, \
am_output, \
am_os_pid }
/*
* Port Specific Data.
*
* Only use PrtSD for very rarely used data.
*/
#define ERTS_PRTSD_SCHED_ID 0
#define ERTS_PRTSD_DIST_ENTRY 1
#define ERTS_PRTSD_CONN_ID 2
#define ERTS_PRTSD_SIZE 3
typedef struct {
void *data[ERTS_PRTSD_SIZE];
} ErtsPrtSD;
typedef struct ErtsXPortsList_ ErtsXPortsList;
/*
* Port locking:
*
* Locking is done either driver specific or port specific. When
* driver specific locking is used, all instances of the driver,
* i.e. ports running the driver, share the same lock. When port
* specific locking is used each instance have its own lock.
*
* Most fields in the Port structure are protected by the lock
* referred to by the 'lock' field. This lock is shared between
* all ports running the same driver when driver specific locking
* is used.
*
* The 'sched' field is protected by the run queue lock that the
* port currently is assigned to.
*
*/
struct _erl_drv_port {
ErtsPTabElementCommon common; /* *Need* to be first in struct */
ErtsPortTaskSched sched;
ErtsPortTaskHandle timeout_task;
erts_mtx_t *lock;
ErtsXPortsList *xports;
erts_atomic_t run_queue;
erts_atomic_t connected; /* A connected process */
Eterm caller; /* Current caller. */
erts_atomic_t data; /* Data associated with port. */
Uint bytes_in; /* Number of bytes read */
Uint bytes_out; /* Number of bytes written */
ErlPortIOQueue ioq; /* driver accessible i/o queue */
char *name; /* String used in the open */
erts_driver_t* drv_ptr;
UWord drv_data;
SWord os_pid; /* Child process ID */
ErtsProcList *suspended; /* List of suspended processes. */
LineBuf *linebuf; /* Buffer to hold data not ready for
process to get (line oriented I/O)*/
erts_atomic32_t state; /* Status and type flags */
int control_flags; /* Flags for port_control() */
ErlDrvPDL port_data_lock;
erts_atomic_t psd; /* Port specific data */
int reds; /* Only used while executing driver callbacks */
struct {
Eterm to;
Uint32 ref[ERTS_MAX_REF_NUMBERS];
} *async_open_port; /* Reference used with async open port */
};
void erts_init_port_data(Port *);
void erts_cleanup_port_data(Port *);
Uint erts_port_data_size(Port *);
ErlOffHeap *erts_port_data_offheap(Port *);
Eterm erts_port_data_read(Port* prt);
#define ERTS_PORT_GET_CONNECTED(PRT) \
((Eterm) erts_atomic_read_nob(&(PRT)->connected))
#define ERTS_PORT_SET_CONNECTED(PRT, PID) \
erts_atomic_set_relb(&(PRT)->connected, (erts_aint_t) (PID))
#define ERTS_PORT_INIT_CONNECTED(PRT, PID) \
erts_atomic_init_nob(&(PRT)->connected, (erts_aint_t) (PID))
struct erl_drv_port_data_lock {
erts_mtx_t mtx;
erts_atomic_t refc;
Port *prt;
};
ERTS_GLB_INLINE void erts_init_runq_port(Port *prt, ErtsRunQueue *runq);
ERTS_GLB_INLINE void erts_set_runq_port(Port *prt, ErtsRunQueue *runq);
ERTS_GLB_INLINE ErtsRunQueue *erts_get_runq_port(Port *prt);
ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void
erts_init_runq_port(Port *prt, ErtsRunQueue *runq)
{
if (!runq)
ERTS_INTERNAL_ERROR("Missing run-queue");
erts_atomic_init_nob(&prt->run_queue, (erts_aint_t) runq);
}
ERTS_GLB_INLINE void
erts_set_runq_port(Port *prt, ErtsRunQueue *runq)
{
if (!runq)
ERTS_INTERNAL_ERROR("Missing run-queue");
erts_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq);
}
ERTS_GLB_INLINE ErtsRunQueue *
erts_get_runq_port(Port *prt)
{
ErtsRunQueue *runq;
runq = (ErtsRunQueue *) erts_atomic_read_nob(&prt->run_queue);
if (!runq)
ERTS_INTERNAL_ERROR("Missing run-queue");
return runq;
}
ERTS_GLB_INLINE ErtsRunQueue *
erts_port_runq(Port *prt)
{
ErtsRunQueue *rq1, *rq2;
rq1 = erts_get_runq_port(prt);
while (1) {
erts_runq_lock(rq1);
rq2 = erts_get_runq_port(prt);
if (rq1 == rq2)
return rq1;
erts_runq_unlock(rq1);
rq1 = rq2;
}
}
#endif
ERTS_GLB_INLINE void *erts_prtsd_get(Port *p, int ix);
ERTS_GLB_INLINE void *erts_prtsd_set(Port *p, int ix, void *new);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void *
erts_prtsd_get(Port *prt, int ix)
{
ErtsPrtSD *psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
ASSERT((unsigned)ix < ERTS_PRTSD_SIZE);
if (!psd)
return NULL;
ERTS_THR_DATA_DEPENDENCY_READ_MEMORY_BARRIER;
return psd->data[ix];
}
ERTS_GLB_INLINE void *
erts_prtsd_set(Port *prt, int ix, void *data)
{
ErtsPrtSD *psd, *new_psd;
void *old;
int i;
psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
ASSERT((unsigned)ix < ERTS_PRTSD_SIZE);
if (psd) {
#ifdef ETHR_ORDERED_READ_DEPEND
ETHR_MEMBAR(ETHR_LoadStore|ETHR_StoreStore);
#else
ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore|ETHR_StoreStore);
#endif
old = psd->data[ix];
psd->data[ix] = data;
return old;
}
if (!data)
return NULL;
new_psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD));
for (i = 0; i < ERTS_PRTSD_SIZE; i++)
new_psd->data[i] = NULL;
psd = (ErtsPrtSD *) erts_atomic_cmpxchg_mb(&prt->psd,
(erts_aint_t) new_psd,
(erts_aint_t) NULL);
if (psd)
erts_free(ERTS_ALC_T_PRTSD, new_psd);
else
psd = new_psd;
old = psd->data[ix];
psd->data[ix] = data;
return old;
}
#endif
Eterm erts_request_io_bytes(Process *c_p);
/* port status flags */
#define ERTS_PORT_SFLG_CONNECTED ((Uint32) (1 << 0))
/* Port have begun exiting */
#define ERTS_PORT_SFLG_EXITING ((Uint32) (1 << 1))
/* Distribution port */
#define ERTS_PORT_SFLG_DISTRIBUTION ((Uint32) (1 << 2))
#define ERTS_PORT_SFLG_BINARY_IO ((Uint32) (1 << 3))
#define ERTS_PORT_SFLG_SOFT_EOF ((Uint32) (1 << 4))
/* Flow control */
/* Port is closing (no i/o accepted) */
#define ERTS_PORT_SFLG_CLOSING ((Uint32) (1 << 5))
/* Send a closed message when terminating */
#define ERTS_PORT_SFLG_SEND_CLOSED ((Uint32) (1 << 6))
/* Line orinted io on port */
#define ERTS_PORT_SFLG_LINEBUF_IO ((Uint32) (1 << 7))
/* Immortal port (only certain system ports) */
#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 8))
#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 9))
/* Port uses port specific locking (opposed to driver specific locking) */
#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 10))
#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 11))
/* Last port to terminate halts the emulator */
#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 12))
/* Check if the event in ready_input should be cleaned */
#define ERTS_PORT_SFLG_CHECK_FD_CLEANUP ((Uint32) (1 << 13))
#ifdef DEBUG
/* Only debug: make sure all flags aren't cleared unintentionally */
#define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31))
#endif
/* Combinations of port status flags */
#define ERTS_PORT_SFLGS_DEAD \
(ERTS_PORT_SFLG_FREE | ERTS_PORT_SFLG_INITIALIZING)
#define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
(ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID)
#define ERTS_PORT_SFLGS_INVALID_LOOKUP \
(ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
| ERTS_PORT_SFLG_EXITING \
| ERTS_PORT_SFLG_CLOSING)
#define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \
(ERTS_PORT_SFLGS_INVALID_LOOKUP \
| ERTS_PORT_SFLG_DISTRIBUTION)
/*
* Costs in reductions for some port operations.
*/
#define ERTS_PORT_REDS_EXECUTE (CONTEXT_REDS/4)
#define ERTS_PORT_REDS_FREE (CONTEXT_REDS/400)
#define ERTS_PORT_REDS_TIMEOUT (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_INPUT (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_OUTPUT (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_EVENT (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_CMD_OUTPUTV (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_CMD_OUTPUT (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_EXIT (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_CONNECT (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_UNLINK (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_LINK (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_MONITOR (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_DEMONITOR (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_BADSIG (CONTEXT_REDS/200)
#define ERTS_PORT_REDS_CONTROL (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_CALL (CONTEXT_REDS/50)
#define ERTS_PORT_REDS_INFO (CONTEXT_REDS/100)
#define ERTS_PORT_REDS_TERMINATE (CONTEXT_REDS/50)
void print_port_info(Port *, fmtfn_t, void *);
void erts_port_free(Port *);
void erts_fire_port_monitor(Port *prt, ErtsMonitor *tmon);
int erts_port_handle_xports(Port *);
#if defined(ERTS_ENABLE_LOCK_CHECK)
int erts_lc_is_port_locked(Port *);
#endif
ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt);
ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt);
ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc);
ERTS_GLB_INLINE Sint erts_port_read_refc(Port *prt);
ERTS_GLB_INLINE int erts_port_trylock(Port *prt);
ERTS_GLB_INLINE void erts_port_lock(Port *prt);
ERTS_GLB_INLINE void erts_port_unlock(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt)
{
erts_ptab_atmc_inc_refc(&prt->common);
}
ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt)
{
int referred = erts_ptab_atmc_dec_test_refc(&prt->common);
if (!referred)
erts_port_free(prt);
}
ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc)
{
int referred = erts_ptab_atmc_add_test_refc(&prt->common, add_refc);
if (!referred)
erts_port_free(prt);
}
ERTS_GLB_INLINE Sint erts_port_read_refc(Port *prt)
{
return erts_ptab_atmc_read_refc(&prt->common);
}
ERTS_GLB_INLINE int
erts_port_trylock(Port *prt)
{
/* *Need* to be a managed thread */
ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
return erts_mtx_trylock(prt->lock);
}
ERTS_GLB_INLINE void
erts_port_lock(Port *prt)
{
/* *Need* to be a managed thread */
ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
erts_mtx_lock(prt->lock);
}
ERTS_GLB_INLINE void
erts_port_unlock(Port *prt)
{
/* *Need* to be a managed thread */
ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
erts_mtx_unlock(prt->lock);
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
#define ERTS_INVALID_PORT_OPT(PP, ID, FLGS) \
(!(PP) \
|| (erts_atomic32_read_nob(&(PP)->state) & (FLGS)) \
|| (PP)->common.id != (ID))
/* port lookup */
#define INVALID_PORT(PP, ID) \
ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_LOOKUP)
/* Invalidate trace port if anything suspicious, for instance
* that the port is a distribution port or it is busy.
*/
#define INVALID_TRACER_PORT(PP, ID) \
ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP)
#define ERTS_PORT_SCHED_ID(P, ID) \
((Uint) (UWord) erts_prtsd_set((P), ERTS_PRTSD_SCHED_ID, (void *) (UWord) (ID)))
extern const Port erts_invalid_port;
#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port)
int erts_is_port_ioq_empty(Port *);
void erts_terminate_port(Port *);
Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks);
ERTS_GLB_INLINE Port *erts_pix2port(int);
ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm);
ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32);
ERTS_GLB_INLINE Port*erts_id2port(Eterm id);
ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
ERTS_GLB_INLINE void erts_port_release(Port *);
ERTS_GLB_INLINE Port *erts_thr_port_lookup(Eterm id, Uint32 invalid_sflgs);
ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs);
ERTS_GLB_INLINE void erts_thr_port_release(Port *prt);
ERTS_GLB_INLINE Port *erts_thr_drvport2port(ErlDrvPort, int);
ERTS_GLB_INLINE Port *erts_drvport2port_state(ErlDrvPort, erts_aint32_t *);
ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort);
ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm);
ERTS_GLB_INLINE int erts_is_port_alive(Eterm);
ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm);
ERTS_GLB_INLINE int erts_port_driver_callback_epilogue(Port *, erts_aint32_t *);
ERTS_GLB_INLINE Port *erts_get_current_port(void);
ERTS_GLB_INLINE Eterm erts_get_current_port_id(void);
#define erts_drvport2port(Prt) erts_drvport2port_state((Prt), NULL)
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE Port *erts_pix2port(int ix)
{
Port *prt;
ASSERT(0 <= ix && ix < erts_ptab_max(&erts_port));
prt = (Port *) erts_ptab_pix2intptr_nob(&erts_port, ix);
return prt == ERTS_PORT_LOCK_BUSY ? NULL : prt;
}
ERTS_GLB_INLINE Port *
erts_port_lookup_raw(Eterm id)
{
Port *prt;
ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying());
if (is_not_internal_port(id))
return NULL;
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
return prt && prt->common.id == id ? prt : NULL;
}
ERTS_GLB_INLINE Port *
erts_port_lookup(Eterm id, Uint32 invalid_sflgs)
{
Port *prt = erts_port_lookup_raw(id);
return (!prt
? NULL
: ((invalid_sflgs & erts_atomic32_read_nob(&prt->state))
? NULL
: prt));
}
ERTS_GLB_INLINE Port*
erts_id2port(Eterm id)
{
erts_aint32_t state;
Port *prt;
/* Only allowed to be called from managed threads */
ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
if (is_not_internal_port(id))
return NULL;
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
if (!prt || prt->common.id != id)
return NULL;
erts_port_lock(prt);
state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
erts_port_unlock(prt);
return NULL;
}
return prt;
}
ERTS_GLB_INLINE Port*
erts_id2port_sflgs(Eterm id,
Process *c_p, ErtsProcLocks c_p_locks,
Uint32 invalid_sflgs)
{
int no_proc_locks = !c_p || !c_p_locks;
erts_aint32_t state;
Port *prt;
/* Only allowed to be called from managed threads */
ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
if (is_not_internal_port(id))
return NULL;
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
if (!prt || prt->common.id != id)
return NULL;
if (no_proc_locks)
erts_port_lock(prt);
else if (erts_port_trylock(prt) == EBUSY) {
/* Unlock process locks, and acquire locks in lock order... */
erts_proc_unlock(c_p, c_p_locks);
erts_port_lock(prt);
erts_proc_lock(c_p, c_p_locks);
}
state = erts_atomic32_read_nob(&prt->state);
if (state & invalid_sflgs) {
erts_port_unlock(prt);
return NULL;
}
return prt;
}
ERTS_GLB_INLINE void
erts_port_release(Port *prt)
{
/* Only allowed to be called from managed threads */
ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
erts_port_unlock(prt);
}
/*
* erts_thr_id2port_sflgs() and erts_port_dec_refc(prt) can
* be used by unmanaged threads in the SMP case.
*/
ERTS_GLB_INLINE Port *
erts_thr_port_lookup(Eterm id, Uint32 invalid_sflgs)
{
Port *prt;
ErtsThrPrgrDelayHandle dhndl;
if (is_not_internal_port(id))
return NULL;
dhndl = erts_thr_progress_unmanaged_delay();
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
if (!prt || prt->common.id != id) {
erts_thr_progress_unmanaged_continue(dhndl);
return NULL;
}
else {
erts_aint32_t state;
erts_port_inc_refc(prt);
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_thr_progress_unmanaged_continue(dhndl);
state = erts_atomic32_read_acqb(&prt->state);
if (state & invalid_sflgs) {
erts_port_dec_refc(prt);
return NULL;
}
return prt;
}
}
/*
* erts_thr_id2port_sflgs() and erts_thr_port_release() can
* be used by unmanaged threads in the SMP case.
*/
ERTS_GLB_INLINE Port *
erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs)
{
Port *prt;
ErtsThrPrgrDelayHandle dhndl;
if (is_not_internal_port(id))
return NULL;
dhndl = erts_thr_progress_unmanaged_delay();
prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port,
internal_port_index(id));
if (!prt || prt->common.id != id) {
erts_thr_progress_unmanaged_continue(dhndl);
prt = NULL;
}
else {
erts_aint32_t state;
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
erts_port_inc_refc(prt);
erts_thr_progress_unmanaged_continue(dhndl);
}
erts_mtx_lock(prt->lock);
state = erts_atomic32_read_nob(&prt->state);
if (state & invalid_sflgs) {
erts_mtx_unlock(prt->lock);
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_port_dec_refc(prt);
prt = NULL;
}
}
return prt;
}
ERTS_GLB_INLINE void
erts_thr_port_release(Port *prt)
{
erts_mtx_unlock(prt->lock);
if (!erts_thr_progress_is_managed_thread())
erts_port_dec_refc(prt);
}
ERTS_GLB_INLINE Port *
erts_thr_drvport2port(ErlDrvPort drvport, int lock_pdl)
{
Port *prt = ERTS_ErlDrvPort2Port(drvport);
ASSERT(prt != NULL);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return ERTS_INVALID_ERL_DRV_PORT;
if (lock_pdl && prt->port_data_lock)
driver_pdl_lock(prt->port_data_lock);
#ifdef ERTS_ENABLE_LOCK_CHECK
if (!ERTS_IS_CRASH_DUMPING) {
if (erts_lc_is_emu_thr()) {
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
ERTS_LC_ASSERT(!prt->port_data_lock
|| erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
}
else {
ERTS_LC_ASSERT(prt->port_data_lock);
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
}
}
#endif
if (erts_atomic32_read_nob(&prt->state)
& ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
if (lock_pdl && prt->port_data_lock)
driver_pdl_unlock(prt->port_data_lock);
return ERTS_INVALID_ERL_DRV_PORT;
}
return prt;
}
ERTS_GLB_INLINE Port *
erts_drvport2port_state(ErlDrvPort drvport, erts_aint32_t *statep)
{
Port *prt = ERTS_ErlDrvPort2Port(drvport);
erts_aint32_t state;
ASSERT(prt);
// ERTS_LC_ASSERT(erts_lc_is_emu_thr());
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return ERTS_INVALID_ERL_DRV_PORT;
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
/*
* This state check is only needed since a driver callback
* might terminate the port, and then call back into the
* emulator. Drivers should preferably have been forbidden
* to call into the emulator after terminating the port,
* but it has been like this for ages. Perhaps forbid this
* in some future major release?
*/
state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
return ERTS_INVALID_ERL_DRV_PORT;
if (statep)
*statep = state;
return prt;
}
ERTS_GLB_INLINE Eterm
erts_drvport2id(ErlDrvPort drvport)
{
Port *prt = erts_drvport2port(drvport);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return am_undefined;
else
return prt->common.id;
}
ERTS_GLB_INLINE Uint32
erts_portid2status(Eterm id)
{
Port *prt = erts_port_lookup_raw(id);
if (prt)
return (Uint32) erts_atomic32_read_acqb(&prt->state);
else
return ERTS_PORT_SFLG_INVALID;
}
ERTS_GLB_INLINE int
erts_is_port_alive(Eterm id)
{
return !(erts_portid2status(id) & (ERTS_PORT_SFLG_INVALID
| ERTS_PORT_SFLGS_DEAD));
}
ERTS_GLB_INLINE int
erts_is_valid_tracer_port(Eterm id)
{
return !(erts_portid2status(id) & ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
}
ERTS_GLB_INLINE int
erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep)
{
int reds = 0;
erts_aint32_t state;
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
state = erts_atomic32_read_nob(&prt->state);
if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(prt)) {
reds += ERTS_PORT_REDS_TERMINATE;
erts_terminate_port(prt);
state = erts_atomic32_read_nob(&prt->state);
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
}
if (prt->xports) {
reds += erts_port_handle_xports(prt);
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT(!prt->xports);
}
if (statep)
*statep = state;
return reds;
}
ERTS_GLB_INLINE
Port *erts_get_current_port(void)
{
ErtsSchedulerData *esdp = erts_get_scheduler_data();
return esdp ? esdp->current_port : NULL;
}
ERTS_GLB_INLINE
Eterm erts_get_current_port_id(void)
{
Port *port = erts_get_current_port();
return port ? port->common.id : THE_NON_VALUE;
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
void erts_port_resume_procs(Port *);
struct binary;
enum {
ERTS_P2P_SIG_TYPE_BAD = 0,
ERTS_P2P_SIG_TYPE_OUTPUT = 1,
ERTS_P2P_SIG_TYPE_OUTPUTV = 2,
ERTS_P2P_SIG_TYPE_CONNECT = 3,
ERTS_P2P_SIG_TYPE_EXIT = 4,
ERTS_P2P_SIG_TYPE_CONTROL = 5,
ERTS_P2P_SIG_TYPE_CALL = 6,
ERTS_P2P_SIG_TYPE_INFO = 7,
ERTS_P2P_SIG_TYPE_LINK = 8,
ERTS_P2P_SIG_TYPE_UNLINK = 9,
ERTS_P2P_SIG_TYPE_MONITOR = 10,
ERTS_P2P_SIG_TYPE_DEMONITOR = 11
};
#define ERTS_P2P_SIG_TYPE_BITS 4
#define ERTS_P2P_SIG_TYPE_MASK \
((1 << ERTS_P2P_SIG_TYPE_BITS) - 1)
#define ERTS_P2P_SIG_DATA_FLG(N) \
(1 << (ERTS_P2P_SIG_TYPE_BITS + (N)))
#define ERTS_P2P_SIG_DATA_FLG_BANG_OP ERTS_P2P_SIG_DATA_FLG(0)
#define ERTS_P2P_SIG_DATA_FLG_REPLY ERTS_P2P_SIG_DATA_FLG(1)
#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND ERTS_P2P_SIG_DATA_FLG(2)
#define ERTS_P2P_SIG_DATA_FLG_FORCE ERTS_P2P_SIG_DATA_FLG(3)
#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT ERTS_P2P_SIG_DATA_FLG(4)
#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK ERTS_P2P_SIG_DATA_FLG(5)
#define ERTS_P2P_SIG_DATA_FLG_SCHED ERTS_P2P_SIG_DATA_FLG(6)
#define ERTS_P2P_SIG_DATA_FLG_ASYNC ERTS_P2P_SIG_DATA_FLG(7)
struct ErtsProc2PortSigData_ {
int flags;
Eterm caller;
Uint32 ref[ERTS_MAX_REF_NUMBERS];
union {
struct {
Eterm from;
ErlIOVec *evp;
ErlDrvBinary *cbinp;
} outputv;
struct {
Eterm from;
char *bufp;
ErlDrvSizeT size;
} output;
struct {
Eterm from;
Eterm connected;
} connect;
struct {
Eterm from;
Eterm reason;
ErlHeapFragment *bp;
} exit;
struct {
struct binary *binp;
unsigned int command;
char *bufp;
ErlDrvSizeT size;
} control;
struct {
unsigned int command;
char *bufp;
ErlDrvSizeT size;
} call;
struct {
Eterm item;
} info;
struct {
Eterm port_id;
ErtsLink *lnk;
} link;
struct {
Eterm port_id;
ErtsLink *lnk;
} unlink;
struct {
Eterm port_id;
ErtsMonitor *mon;
} monitor;
struct {
Eterm port_id;
ErtsMonitor *mon;
} demonitor;
} u;
} ;
ERTS_GLB_INLINE int
erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp);
ERTS_GLB_INLINE ErlDrvSizeT
erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE int
erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp)
{
switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) {
case ERTS_P2P_SIG_TYPE_OUTPUT: return !0;
case ERTS_P2P_SIG_TYPE_OUTPUTV: return !0;
default: return 0;
}
}
ERTS_GLB_INLINE ErlDrvSizeT
erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp)
{
switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) {
case ERTS_P2P_SIG_TYPE_OUTPUT: return sigdp->u.output.size;
case ERTS_P2P_SIG_TYPE_OUTPUTV: return sigdp->u.outputv.evp->size;
default: return (ErlDrvSizeT) 0;
}
}
#endif
#define ERTS_PROC2PORT_SIG_EXEC 0
#define ERTS_PROC2PORT_SIG_ABORT 1
#define ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND 2
#define ERTS_PROC2PORT_SIG_ABORT_CLOSED 3
typedef int (*ErtsProc2PortSigCallback)(Port *,
erts_aint32_t,
int,
ErtsProc2PortSigData *);
typedef enum {
ERTS_PORT_OP_BADARG,
ERTS_PORT_OP_BUSY,
ERTS_PORT_OP_BUSY_SCHEDULED,
ERTS_PORT_OP_SCHEDULED,
ERTS_PORT_OP_DROPPED,
ERTS_PORT_OP_DONE
} ErtsPortOpResult;
int erts_deliver_port_exit(Port *, Eterm, Eterm, int, int);
/*
* Port signal flags
*/
#define ERTS_PORT_SIG_FLG_BANG_OP ERTS_P2P_SIG_DATA_FLG_BANG_OP
#define ERTS_PORT_SIG_FLG_NOSUSPEND ERTS_P2P_SIG_DATA_FLG_NOSUSPEND
#define ERTS_PORT_SIG_FLG_FORCE ERTS_P2P_SIG_DATA_FLG_FORCE
#define ERTS_PORT_SIG_FLG_BROKEN_LINK ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK
#define ERTS_PORT_SIG_FLG_BAD_OUTPUT ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT
#define ERTS_PORT_SIG_FLG_FORCE_SCHED ERTS_P2P_SIG_DATA_FLG_SCHED
#define ERTS_PORT_SIG_FLG_ASYNC ERTS_P2P_SIG_DATA_FLG_ASYNC
/* ERTS_PORT_SIG_FLG_FORCE_IMM_CALL only when crash dumping... */
#define ERTS_PORT_SIG_FLG_FORCE_IMM_CALL ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT
/*
* Port ! {Owner, {command, Data}}
* Port ! {Owner, {connect, NewOwner}}
* Port ! {Owner, close}
*/
ErtsPortOpResult erts_port_command(Process *, int, Port *, Eterm, Eterm *);
/*
* Signals from processes to ports.
*/
ErtsPortOpResult erts_port_output(Process *, int, Port *, Eterm, Eterm, Eterm *);
ErtsPortOpResult erts_port_exit(Process *, int, Port *, Eterm, Eterm, Eterm *);
ErtsPortOpResult erts_port_connect(Process *, int, Port *, Eterm, Eterm, Eterm *);
ErtsPortOpResult erts_port_link(Process *, Port *, ErtsLink *, Eterm *);
ErtsPortOpResult erts_port_unlink(Process *, Port *, ErtsLink *, Eterm *);
ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm *);
ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *);
ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *);
ErtsPortOpResult erts_port_monitor(Process *, Port *, ErtsMonitor *);
ErtsPortOpResult erts_port_demonitor(Process *, Port *, ErtsMonitor *);
/* defined in erl_bif_port.c */
Port *erts_sig_lookup_port(Process *c_p, Eterm id_or_name);
int erts_port_output_async(Port *, Eterm, Eterm);
/*
* Signals from ports to ports. Used by sys drivers.
*/
int erl_drv_port_control(Eterm, unsigned int, char*, ErlDrvSizeT);
#endif