aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port.h
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_port.h')
-rw-r--r--erts/emulator/beam/erl_port.h225
1 files changed, 99 insertions, 126 deletions
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index c30bbbca06..9b52b648e5 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 2012-2017. All Rights Reserved.
+ * Copyright Ericsson AB 2012-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -119,9 +119,7 @@ typedef struct {
void *data[ERTS_PRTSD_SIZE];
} ErtsPrtSD;
-#ifdef ERTS_SMP
typedef struct ErtsXPortsList_ ErtsXPortsList;
-#endif
/*
* Port locking:
@@ -146,17 +144,12 @@ struct _erl_drv_port {
ErtsPortTaskSched sched;
ErtsPortTaskHandle timeout_task;
-#ifdef ERTS_SMP
erts_mtx_t *lock;
ErtsXPortsList *xports;
- erts_smp_atomic_t run_queue;
-#else
- erts_atomic32_t refc;
- int cleanup;
-#endif
+ erts_atomic_t run_queue;
erts_atomic_t connected; /* A connected process */
Eterm caller; /* Current caller. */
- erts_smp_atomic_t data; /* Data associated with port. */
+ erts_atomic_t data; /* Data associated with port. */
Uint bytes_in; /* Number of bytes read */
Uint bytes_out; /* Number of bytes written */
@@ -173,7 +166,7 @@ struct _erl_drv_port {
int control_flags; /* Flags for port_control() */
ErlDrvPDL port_data_lock;
- erts_smp_atomic_t psd; /* Port specific data */
+ erts_atomic_t psd; /* Port specific data */
int reds; /* Only used while executing driver callbacks */
struct {
@@ -203,31 +196,53 @@ struct erl_drv_port_data_lock {
Port *prt;
};
+ERTS_GLB_INLINE void erts_init_runq_port(Port *prt, ErtsRunQueue *runq);
+ERTS_GLB_INLINE void erts_set_runq_port(Port *prt, ErtsRunQueue *runq);
+ERTS_GLB_INLINE ErtsRunQueue *erts_get_runq_port(Port *prt);
ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+ERTS_GLB_INLINE void
+erts_init_runq_port(Port *prt, ErtsRunQueue *runq)
+{
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+ erts_atomic_init_nob(&prt->run_queue, (erts_aint_t) runq);
+}
+
+ERTS_GLB_INLINE void
+erts_set_runq_port(Port *prt, ErtsRunQueue *runq)
+{
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+ erts_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq);
+}
+
+ERTS_GLB_INLINE ErtsRunQueue *
+erts_get_runq_port(Port *prt)
+{
+ ErtsRunQueue *runq;
+ runq = (ErtsRunQueue *) erts_atomic_read_nob(&prt->run_queue);
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+ return runq;
+}
+
+
ERTS_GLB_INLINE ErtsRunQueue *
erts_port_runq(Port *prt)
{
-#ifdef ERTS_SMP
ErtsRunQueue *rq1, *rq2;
- rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
- if (!rq1)
- return NULL;
+ rq1 = erts_get_runq_port(prt);
while (1) {
- erts_smp_runq_lock(rq1);
- rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
+ erts_runq_lock(rq1);
+ rq2 = erts_get_runq_port(prt);
if (rq1 == rq2)
return rq1;
- erts_smp_runq_unlock(rq1);
+ erts_runq_unlock(rq1);
rq1 = rq2;
- if (!rq1)
- return NULL;
}
-#else
- return ERTS_RUNQ_IX(0);
-#endif
}
#endif
@@ -241,10 +256,10 @@ ERTS_GLB_INLINE void *erts_prtsd_set(Port *p, int ix, void *new);
ERTS_GLB_INLINE void *
erts_prtsd_get(Port *prt, int ix)
{
- ErtsPrtSD *psd = (ErtsPrtSD *) erts_smp_atomic_read_nob(&prt->psd);
+ ErtsPrtSD *psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
if (!psd)
return NULL;
- ERTS_SMP_DATA_DEPENDENCY_READ_MEMORY_BARRIER;
+ ERTS_THR_DATA_DEPENDENCY_READ_MEMORY_BARRIER;
return psd->data[ix];
}
@@ -255,16 +270,14 @@ erts_prtsd_set(Port *prt, int ix, void *data)
void *old;
int i;
- psd = (ErtsPrtSD *) erts_smp_atomic_read_nob(&prt->psd);
+ psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
if (psd) {
-#ifdef ERTS_SMP
#ifdef ETHR_ORDERED_READ_DEPEND
ETHR_MEMBAR(ETHR_LoadStore|ETHR_StoreStore);
#else
ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore|ETHR_StoreStore);
#endif
-#endif
old = psd->data[ix];
psd->data[ix] = data;
return old;
@@ -276,7 +289,7 @@ erts_prtsd_set(Port *prt, int ix, void *data)
new_psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD));
for (i = 0; i < ERTS_PRTSD_SIZE; i++)
new_psd->data[i] = NULL;
- psd = (ErtsPrtSD *) erts_smp_atomic_cmpxchg_mb(&prt->psd,
+ psd = (ErtsPrtSD *) erts_atomic_cmpxchg_mb(&prt->psd,
(erts_aint_t) new_psd,
(erts_aint_t) NULL);
if (psd)
@@ -360,15 +373,10 @@ Eterm erts_request_io_bytes(Process *c_p);
void print_port_info(Port *, fmtfn_t, void *);
void erts_port_free(Port *);
-#ifndef ERTS_SMP
-void erts_port_cleanup(Port *);
-#endif
-void erts_fire_port_monitor(Port *prt, Eterm ref);
-#ifdef ERTS_SMP
+void erts_fire_port_monitor(Port *prt, ErtsMonitor *tmon);
int erts_port_handle_xports(Port *);
-#endif
-#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
+#if defined(ERTS_ENABLE_LOCK_CHECK)
int erts_lc_is_port_locked(Port *);
#endif
@@ -377,9 +385,9 @@ ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt);
ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc);
ERTS_GLB_INLINE Sint erts_port_read_refc(Port *prt);
-ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt);
-ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt);
-ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt);
+ERTS_GLB_INLINE int erts_port_trylock(Port *prt);
+ERTS_GLB_INLINE void erts_port_lock(Port *prt);
+ERTS_GLB_INLINE void erts_port_unlock(Port *prt);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
@@ -408,35 +416,27 @@ ERTS_GLB_INLINE Sint erts_port_read_refc(Port *prt)
}
ERTS_GLB_INLINE int
-erts_smp_port_trylock(Port *prt)
+erts_port_trylock(Port *prt)
{
-#ifdef ERTS_SMP
/* *Need* to be a managed thread */
- ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
return erts_mtx_trylock(prt->lock);
-#else
- return 0;
-#endif
}
ERTS_GLB_INLINE void
-erts_smp_port_lock(Port *prt)
+erts_port_lock(Port *prt)
{
-#ifdef ERTS_SMP
/* *Need* to be a managed thread */
- ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
erts_mtx_lock(prt->lock);
-#endif
}
ERTS_GLB_INLINE void
-erts_smp_port_unlock(Port *prt)
+erts_port_unlock(Port *prt)
{
-#ifdef ERTS_SMP
/* *Need* to be a managed thread */
- ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
erts_mtx_unlock(prt->lock);
-#endif
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
@@ -467,9 +467,7 @@ extern const Port erts_invalid_port;
int erts_is_port_ioq_empty(Port *);
void erts_terminate_port(Port *);
-#ifdef ERTS_SMP
Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks);
-#endif
ERTS_GLB_INLINE Port *erts_pix2port(int);
ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm);
@@ -477,11 +475,9 @@ ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32);
ERTS_GLB_INLINE Port*erts_id2port(Eterm id);
ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32);
ERTS_GLB_INLINE void erts_port_release(Port *);
-#ifdef ERTS_SMP
ERTS_GLB_INLINE Port *erts_thr_port_lookup(Eterm id, Uint32 invalid_sflgs);
ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs);
ERTS_GLB_INLINE void erts_thr_port_release(Port *prt);
-#endif
ERTS_GLB_INLINE Port *erts_thr_drvport2port(ErlDrvPort, int);
ERTS_GLB_INLINE Port *erts_drvport2port_state(ErlDrvPort, erts_aint32_t *);
ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort);
@@ -489,6 +485,8 @@ ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm);
ERTS_GLB_INLINE int erts_is_port_alive(Eterm);
ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm);
ERTS_GLB_INLINE int erts_port_driver_callback_epilogue(Port *, erts_aint32_t *);
+ERTS_GLB_INLINE Port *erts_get_current_port(void);
+ERTS_GLB_INLINE Eterm erts_get_current_port_id(void);
#define erts_drvport2port(Prt) erts_drvport2port_state((Prt), NULL)
@@ -507,7 +505,7 @@ erts_port_lookup_raw(Eterm id)
{
Port *prt;
- ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying());
+ ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying());
if (is_not_internal_port(id))
return NULL;
@@ -536,7 +534,7 @@ erts_id2port(Eterm id)
Port *prt;
/* Only allowed to be called from managed threads */
- ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
if (is_not_internal_port(id))
return NULL;
@@ -547,10 +545,10 @@ erts_id2port(Eterm id)
if (!prt || prt->common.id != id)
return NULL;
- erts_smp_port_lock(prt);
+ erts_port_lock(prt);
state = erts_atomic32_read_nob(&prt->state);
if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
- erts_smp_port_unlock(prt);
+ erts_port_unlock(prt);
return NULL;
}
@@ -563,14 +561,12 @@ erts_id2port_sflgs(Eterm id,
Process *c_p, ErtsProcLocks c_p_locks,
Uint32 invalid_sflgs)
{
-#ifdef ERTS_SMP
int no_proc_locks = !c_p || !c_p_locks;
-#endif
erts_aint32_t state;
Port *prt;
/* Only allowed to be called from managed threads */
- ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
if (is_not_internal_port(id))
return NULL;
@@ -581,21 +577,17 @@ erts_id2port_sflgs(Eterm id,
if (!prt || prt->common.id != id)
return NULL;
-#ifdef ERTS_SMP
if (no_proc_locks)
- erts_smp_port_lock(prt);
- else if (erts_smp_port_trylock(prt) == EBUSY) {
+ erts_port_lock(prt);
+ else if (erts_port_trylock(prt) == EBUSY) {
/* Unlock process locks, and acquire locks in lock order... */
- erts_smp_proc_unlock(c_p, c_p_locks);
- erts_smp_port_lock(prt);
- erts_smp_proc_lock(c_p, c_p_locks);
+ erts_proc_unlock(c_p, c_p_locks);
+ erts_port_lock(prt);
+ erts_proc_lock(c_p, c_p_locks);
}
-#endif
state = erts_atomic32_read_nob(&prt->state);
if (state & invalid_sflgs) {
-#ifdef ERTS_SMP
- erts_smp_port_unlock(prt);
-#endif
+ erts_port_unlock(prt);
return NULL;
}
@@ -606,18 +598,10 @@ ERTS_GLB_INLINE void
erts_port_release(Port *prt)
{
/* Only allowed to be called from managed threads */
- ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread());
-#ifdef ERTS_SMP
- erts_smp_port_unlock(prt);
-#else
- if (prt->cleanup) {
- prt->cleanup = 0;
- erts_port_cleanup(prt);
- }
-#endif
+ ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread());
+ erts_port_unlock(prt);
}
-#ifdef ERTS_SMP
/*
* erts_thr_id2port_sflgs() and erts_port_dec_refc(prt) can
* be used by unmanaged threads in the SMP case.
@@ -703,13 +687,10 @@ ERTS_GLB_INLINE void
erts_thr_port_release(Port *prt)
{
erts_mtx_unlock(prt->lock);
-#ifdef ERTS_SMP
if (!erts_thr_progress_is_managed_thread())
erts_port_dec_refc(prt);
-#endif
}
-#endif
ERTS_GLB_INLINE Port *
erts_thr_drvport2port(ErlDrvPort drvport, int lock_pdl)
@@ -725,7 +706,7 @@ erts_thr_drvport2port(ErlDrvPort drvport, int lock_pdl)
#ifdef ERTS_ENABLE_LOCK_CHECK
if (!ERTS_IS_CRASH_DUMPING) {
if (erts_lc_is_emu_thr()) {
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
ERTS_LC_ASSERT(!prt->port_data_lock
|| erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
}
@@ -754,7 +735,7 @@ erts_drvport2port_state(ErlDrvPort drvport, erts_aint32_t *statep)
// ERTS_LC_ASSERT(erts_lc_is_emu_thr());
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return ERTS_INVALID_ERL_DRV_PORT;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
/*
* This state check is only needed since a driver callback
@@ -811,23 +792,21 @@ erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep)
int reds = 0;
erts_aint32_t state;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
state = erts_atomic32_read_nob(&prt->state);
if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(prt)) {
reds += ERTS_PORT_REDS_TERMINATE;
erts_terminate_port(prt);
state = erts_atomic32_read_nob(&prt->state);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
}
-#ifdef ERTS_SMP
if (prt->xports) {
reds += erts_port_handle_xports(prt);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT(!prt->xports);
}
-#endif
if (statep)
*statep = state;
@@ -835,6 +814,20 @@ erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep)
return reds;
}
+ERTS_GLB_INLINE
+Port *erts_get_current_port(void)
+{
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ return esdp ? esdp->current_port : NULL;
+}
+
+ERTS_GLB_INLINE
+Eterm erts_get_current_port_id(void)
+{
+ Port *port = erts_get_current_port();
+ return port ? port->common.id : THE_NON_VALUE;
+}
+
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
void erts_port_resume_procs(Port *);
@@ -910,20 +903,20 @@ struct ErtsProc2PortSigData_ {
Eterm item;
} info;
struct {
- Eterm port;
- Eterm to;
+ Eterm port_id;
+ ErtsLink *lnk;
} link;
struct {
- Eterm from;
+ Eterm port_id;
+ ErtsLink *lnk;
} unlink;
struct {
- Eterm origin; /* who receives monitor event, pid */
- Eterm name; /* either name for named monitor, or port id */
+ Eterm port_id;
+ ErtsMonitor *mon;
} monitor;
struct {
- Eterm origin; /* who is at the other end of the monitor, pid */
- Eterm name; /* port id */
- Uint32 ref[ERTS_MAX_REF_NUMBERS]; /* box contents of a ref */
+ Eterm port_id;
+ ErtsMonitor *mon;
} demonitor;
} u;
} ;
@@ -969,7 +962,6 @@ typedef int (*ErtsProc2PortSigCallback)(Port *,
typedef enum {
ERTS_PORT_OP_BADARG,
- ERTS_PORT_OP_CALLER_EXIT,
ERTS_PORT_OP_BUSY,
ERTS_PORT_OP_BUSY_SCHEDULED,
ERTS_PORT_OP_SCHEDULED,
@@ -1005,32 +997,13 @@ ErtsPortOpResult erts_port_command(Process *, int, Port *, Eterm, Eterm *);
ErtsPortOpResult erts_port_output(Process *, int, Port *, Eterm, Eterm, Eterm *);
ErtsPortOpResult erts_port_exit(Process *, int, Port *, Eterm, Eterm, Eterm *);
ErtsPortOpResult erts_port_connect(Process *, int, Port *, Eterm, Eterm, Eterm *);
-ErtsPortOpResult erts_port_link(Process *, Port *, Eterm, Eterm *);
-ErtsPortOpResult erts_port_unlink(Process *, Port *, Eterm, Eterm *);
+ErtsPortOpResult erts_port_link(Process *, Port *, ErtsLink *, Eterm *);
+ErtsPortOpResult erts_port_unlink(Process *, Port *, ErtsLink *, Eterm *);
ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm *);
ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *);
ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *);
-
-/* Creates monitor between Origin and Target. Ref must be initialized to
- * a reference (ref may be rewritten to be used to serve additionally as a
- * signal id). Name is atom if user monitors port by name or NIL */
-ErtsPortOpResult erts_port_monitor(Process *origin, Port *target, Eterm name,
- Eterm *ref);
-
-typedef enum {
- /* Normal demonitor rules apply with locking and reductions bump */
- ERTS_PORT_DEMONITOR_NORMAL = 1,
- /* Relaxed demonitor rules when process is about to die, which means that
- * pid lookup won't work, locks won't work, no reductions bump. */
- ERTS_PORT_DEMONITOR_ORIGIN_ON_DEATHBED = 2,
-} ErtsDemonitorMode;
-
-/* Removes monitor between origin and target, identified by ref.
- * origin_is_dying can be 0 (false, normal locking rules and reductions bump
- * apply) or 1 (true, in case when we avoid origin locking) */
-ErtsPortOpResult erts_port_demonitor(Process *origin, ErtsDemonitorMode mode,
- Port *target, Eterm ref,
- Eterm *trap_ref);
+ErtsPortOpResult erts_port_monitor(Process *, Port *, ErtsMonitor *);
+ErtsPortOpResult erts_port_demonitor(Process *, Port *, ErtsMonitor *);
/* defined in erl_bif_port.c */
Port *erts_sig_lookup_port(Process *c_p, Eterm id_or_name);