diff options
Diffstat (limited to 'erts/emulator/beam/erl_port.h')
-rw-r--r-- | erts/emulator/beam/erl_port.h | 418 |
1 files changed, 242 insertions, 176 deletions
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index ad3f104a68..25976d38cc 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -1,18 +1,19 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2012-2013. All Rights Reserved. + * Copyright Ericsson AB 2012-2018. All Rights Reserved. * - * The contents of this file are subject to the Erlang Public License, - * Version 1.1, (the "License"); you may not use this file except in - * compliance with the License. You should have received a copy of the - * Erlang Public License along with this software. If not, it can be - * retrieved online at http://www.erlang.org/. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * * %CopyrightEnd% */ @@ -30,6 +31,9 @@ typedef struct ErtsProc2PortSigData_ ErtsProc2PortSigData; #include "erl_ptab.h" #include "erl_thr_progress.h" #include "erl_trace.h" +#define ERTS_IO_QUEUE_TYPES_ONLY__ +#include "erl_io_queue.h" +#undef ERTS_IO_QUEUE_TYPES_ONLY__ #ifndef __WIN32__ #define ERTS_DEFAULT_MAX_PORTS (1 << 16) @@ -74,23 +78,8 @@ typedef struct erts_driver_t_ erts_driver_t; #define ERTS_Port2ErlDrvPort(PH) ((ErlDrvPort) (PH)) #endif -#define SMALL_IO_QUEUE 5 /* Number of fixed elements */ - -typedef struct { - ErlDrvSizeT size; /* total size in bytes */ - - SysIOVec* v_start; - SysIOVec* v_end; - SysIOVec* v_head; - SysIOVec* v_tail; - SysIOVec v_small[SMALL_IO_QUEUE]; +typedef ErtsIOQueue ErlPortIOQueue; - ErlDrvBinary** b_start; - ErlDrvBinary** b_end; - ErlDrvBinary** b_head; - ErlDrvBinary** b_tail; - ErlDrvBinary* b_small[SMALL_IO_QUEUE]; -} ErlIOQueue; typedef struct line_buf { /* Buffer used in line oriented I/O */ ErlDrvSizeT bufsiz; /* Size of character buffer */ @@ -123,16 +112,16 @@ typedef struct line_buf { /* Buffer used in line oriented I/O */ */ #define ERTS_PRTSD_SCHED_ID 0 +#define ERTS_PRTSD_DIST_ENTRY 1 +#define ERTS_PRTSD_CONN_ID 2 -#define ERTS_PRTSD_SIZE 1 +#define ERTS_PRTSD_SIZE 3 typedef struct { void *data[ERTS_PRTSD_SIZE]; } ErtsPrtSD; -#ifdef ERTS_SMP typedef struct ErtsXPortsList_ ErtsXPortsList; -#endif /* * Port locking: @@ -157,22 +146,16 @@ struct _erl_drv_port { ErtsPortTaskSched sched; ErtsPortTaskHandle timeout_task; -#ifdef ERTS_SMP erts_mtx_t *lock; ErtsXPortsList *xports; - erts_smp_atomic_t run_queue; -#else - erts_atomic32_t refc; - int cleanup; -#endif + erts_atomic_t run_queue; erts_atomic_t connected; /* A connected process */ Eterm caller; /* Current caller. */ - erts_smp_atomic_t data; /* Data associated with port. */ + erts_atomic_t data; /* Data associated with port. */ Uint bytes_in; /* Number of bytes read */ Uint bytes_out; /* Number of bytes written */ - ErlIOQueue ioq; /* driver accessible i/o queue */ - DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */ + ErlPortIOQueue ioq; /* driver accessible i/o queue */ char *name; /* String used in the open */ erts_driver_t* drv_ptr; UWord drv_data; @@ -184,8 +167,13 @@ struct _erl_drv_port { int control_flags; /* Flags for port_control() */ ErlDrvPDL port_data_lock; - ErtsPrtSD *psd; /* Port specific data */ + erts_atomic_t psd; /* Port specific data */ int reds; /* Only used while executing driver callbacks */ + + struct { + Eterm to; + Uint32 ref[ERTS_MAX_REF_NUMBERS]; + } *async_open_port; /* Reference used with async open port */ }; @@ -193,6 +181,7 @@ void erts_init_port_data(Port *); void erts_cleanup_port_data(Port *); Uint erts_port_data_size(Port *); ErlOffHeap *erts_port_data_offheap(Port *); +Eterm erts_port_data_read(Port* prt); #define ERTS_PORT_GET_CONNECTED(PRT) \ ((Eterm) erts_atomic_read_nob(&(PRT)->connected)) @@ -208,31 +197,53 @@ struct erl_drv_port_data_lock { Port *prt; }; +ERTS_GLB_INLINE void erts_init_runq_port(Port *prt, ErtsRunQueue *runq); +ERTS_GLB_INLINE void erts_set_runq_port(Port *prt, ErtsRunQueue *runq); +ERTS_GLB_INLINE ErtsRunQueue *erts_get_runq_port(Port *prt); ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF +ERTS_GLB_INLINE void +erts_init_runq_port(Port *prt, ErtsRunQueue *runq) +{ + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + erts_atomic_init_nob(&prt->run_queue, (erts_aint_t) runq); +} + +ERTS_GLB_INLINE void +erts_set_runq_port(Port *prt, ErtsRunQueue *runq) +{ + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + erts_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); +} + +ERTS_GLB_INLINE ErtsRunQueue * +erts_get_runq_port(Port *prt) +{ + ErtsRunQueue *runq; + runq = (ErtsRunQueue *) erts_atomic_read_nob(&prt->run_queue); + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + return runq; +} + + ERTS_GLB_INLINE ErtsRunQueue * erts_port_runq(Port *prt) { -#ifdef ERTS_SMP ErtsRunQueue *rq1, *rq2; - rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue); - if (!rq1) - return NULL; + rq1 = erts_get_runq_port(prt); while (1) { - erts_smp_runq_lock(rq1); - rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue); + erts_runq_lock(rq1); + rq2 = erts_get_runq_port(prt); if (rq1 == rq2) return rq1; - erts_smp_runq_unlock(rq1); + erts_runq_unlock(rq1); rq1 = rq2; - if (!rq1) - return NULL; } -#else - return ERTS_RUNQ_IX(0); -#endif } #endif @@ -246,28 +257,57 @@ ERTS_GLB_INLINE void *erts_prtsd_set(Port *p, int ix, void *new); ERTS_GLB_INLINE void * erts_prtsd_get(Port *prt, int ix) { - return prt->psd ? prt->psd->data[ix] : NULL; + ErtsPrtSD *psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd); + + ASSERT((unsigned)ix < ERTS_PRTSD_SIZE); + if (!psd) + return NULL; + ERTS_THR_DATA_DEPENDENCY_READ_MEMORY_BARRIER; + return psd->data[ix]; } ERTS_GLB_INLINE void * erts_prtsd_set(Port *prt, int ix, void *data) { - if (prt->psd) { - void *old = prt->psd->data[ix]; - prt->psd->data[ix] = data; + ErtsPrtSD *psd, *new_psd; + void *old; + int i; + + psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd); + + ASSERT((unsigned)ix < ERTS_PRTSD_SIZE); + if (psd) { +#ifdef ETHR_ORDERED_READ_DEPEND + ETHR_MEMBAR(ETHR_LoadStore|ETHR_StoreStore); +#else + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore|ETHR_StoreStore); +#endif + old = psd->data[ix]; + psd->data[ix] = data; return old; } - else { - prt->psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD)); - prt->psd->data[ix] = data; + + if (!data) return NULL; - } + + new_psd = erts_alloc(ERTS_ALC_T_PRTSD, sizeof(ErtsPrtSD)); + for (i = 0; i < ERTS_PRTSD_SIZE; i++) + new_psd->data[i] = NULL; + psd = (ErtsPrtSD *) erts_atomic_cmpxchg_mb(&prt->psd, + (erts_aint_t) new_psd, + (erts_aint_t) NULL); + if (psd) + erts_free(ERTS_ALC_T_PRTSD, new_psd); + else + psd = new_psd; + old = psd->data[ix]; + psd->data[ix] = data; + return old; } #endif -extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */ -extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ +Eterm erts_request_io_bytes(Process *c_p); /* port status flags */ @@ -294,6 +334,8 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ #define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 11)) /* Last port to terminate halts the emulator */ #define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 12)) +/* Check if the event in ready_input should be cleaned */ +#define ERTS_PORT_SFLG_CHECK_FD_CLEANUP ((Uint32) (1 << 13)) #ifdef DEBUG /* Only debug: make sure all flags aren't cleared unintentionally */ #define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31)) @@ -327,101 +369,80 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ #define ERTS_PORT_REDS_CONNECT (CONTEXT_REDS/200) #define ERTS_PORT_REDS_UNLINK (CONTEXT_REDS/200) #define ERTS_PORT_REDS_LINK (CONTEXT_REDS/200) +#define ERTS_PORT_REDS_MONITOR (CONTEXT_REDS/200) +#define ERTS_PORT_REDS_DEMONITOR (CONTEXT_REDS/200) #define ERTS_PORT_REDS_BADSIG (CONTEXT_REDS/200) #define ERTS_PORT_REDS_CONTROL (CONTEXT_REDS/100) #define ERTS_PORT_REDS_CALL (CONTEXT_REDS/50) #define ERTS_PORT_REDS_INFO (CONTEXT_REDS/100) #define ERTS_PORT_REDS_TERMINATE (CONTEXT_REDS/50) -void print_port_info(Port *, int, void *); +void print_port_info(Port *, fmtfn_t, void *); void erts_port_free(Port *); -#ifndef ERTS_SMP -void erts_port_cleanup(Port *); -#endif -void erts_fire_port_monitor(Port *prt, Eterm ref); -#ifdef ERTS_SMP +void erts_fire_port_monitor(Port *prt, ErtsMonitor *tmon); int erts_port_handle_xports(Port *); -#endif -#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) +#if defined(ERTS_ENABLE_LOCK_CHECK) int erts_lc_is_port_locked(Port *); #endif ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt); ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt); ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc); +ERTS_GLB_INLINE Sint erts_port_read_refc(Port *prt); -ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt); -ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt); -ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt); +ERTS_GLB_INLINE int erts_port_trylock(Port *prt); +ERTS_GLB_INLINE void erts_port_lock(Port *prt); +ERTS_GLB_INLINE void erts_port_unlock(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE void erts_port_inc_refc(Port *prt) { -#ifdef ERTS_SMP - erts_ptab_inc_refc(&prt->common); -#else - erts_atomic32_inc_nob(&prt->refc); -#endif + erts_ptab_atmc_inc_refc(&prt->common); } ERTS_GLB_INLINE void erts_port_dec_refc(Port *prt) { -#ifdef ERTS_SMP - int referred = erts_ptab_dec_test_refc(&prt->common); + int referred = erts_ptab_atmc_dec_test_refc(&prt->common); if (!referred) erts_port_free(prt); -#else - int refc = erts_atomic32_dec_read_nob(&prt->refc); - if (refc == 0) - erts_port_free(prt); -#endif } ERTS_GLB_INLINE void erts_port_add_refc(Port *prt, Sint32 add_refc) { -#ifdef ERTS_SMP - int referred = erts_ptab_add_test_refc(&prt->common, add_refc); + int referred = erts_ptab_atmc_add_test_refc(&prt->common, add_refc); if (!referred) erts_port_free(prt); -#else - int refc = erts_atomic32_add_read_nob(&prt->refc, add_refc); - if (refc == 0) - erts_port_free(prt); -#endif +} + +ERTS_GLB_INLINE Sint erts_port_read_refc(Port *prt) +{ + return erts_ptab_atmc_read_refc(&prt->common); } ERTS_GLB_INLINE int -erts_smp_port_trylock(Port *prt) +erts_port_trylock(Port *prt) { -#ifdef ERTS_SMP /* *Need* to be a managed thread */ - ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread()); return erts_mtx_trylock(prt->lock); -#else - return 0; -#endif } ERTS_GLB_INLINE void -erts_smp_port_lock(Port *prt) +erts_port_lock(Port *prt) { -#ifdef ERTS_SMP /* *Need* to be a managed thread */ - ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread()); erts_mtx_lock(prt->lock); -#endif } ERTS_GLB_INLINE void -erts_smp_port_unlock(Port *prt) +erts_port_unlock(Port *prt) { -#ifdef ERTS_SMP /* *Need* to be a managed thread */ - ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread()); erts_mtx_unlock(prt->lock); -#endif } #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ @@ -444,7 +465,7 @@ erts_smp_port_unlock(Port *prt) ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP) #define ERTS_PORT_SCHED_ID(P, ID) \ - ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID))) + ((Uint) (UWord) erts_prtsd_set((P), ERTS_PRTSD_SCHED_ID, (void *) (UWord) (ID))) extern const Port erts_invalid_port; #define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port) @@ -452,9 +473,7 @@ extern const Port erts_invalid_port; int erts_is_port_ioq_empty(Port *); void erts_terminate_port(Port *); -#ifdef ERTS_SMP Port *erts_de2port(DistEntry *, Process *, ErtsProcLocks); -#endif ERTS_GLB_INLINE Port *erts_pix2port(int); ERTS_GLB_INLINE Port *erts_port_lookup_raw(Eterm); @@ -462,10 +481,9 @@ ERTS_GLB_INLINE Port *erts_port_lookup(Eterm, Uint32); ERTS_GLB_INLINE Port*erts_id2port(Eterm id); ERTS_GLB_INLINE Port *erts_id2port_sflgs(Eterm, Process *, ErtsProcLocks, Uint32); ERTS_GLB_INLINE void erts_port_release(Port *); -#ifdef ERTS_SMP +ERTS_GLB_INLINE Port *erts_thr_port_lookup(Eterm id, Uint32 invalid_sflgs); ERTS_GLB_INLINE Port *erts_thr_id2port_sflgs(Eterm id, Uint32 invalid_sflgs); ERTS_GLB_INLINE void erts_thr_port_release(Port *prt); -#endif ERTS_GLB_INLINE Port *erts_thr_drvport2port(ErlDrvPort, int); ERTS_GLB_INLINE Port *erts_drvport2port_state(ErlDrvPort, erts_aint32_t *); ERTS_GLB_INLINE Eterm erts_drvport2id(ErlDrvPort); @@ -473,6 +491,8 @@ ERTS_GLB_INLINE Uint32 erts_portid2status(Eterm); ERTS_GLB_INLINE int erts_is_port_alive(Eterm); ERTS_GLB_INLINE int erts_is_valid_tracer_port(Eterm); ERTS_GLB_INLINE int erts_port_driver_callback_epilogue(Port *, erts_aint32_t *); +ERTS_GLB_INLINE Port *erts_get_current_port(void); +ERTS_GLB_INLINE Eterm erts_get_current_port_id(void); #define erts_drvport2port(Prt) erts_drvport2port_state((Prt), NULL) @@ -491,7 +511,7 @@ erts_port_lookup_raw(Eterm id) { Port *prt; - ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying()); + ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying()); if (is_not_internal_port(id)) return NULL; @@ -520,7 +540,7 @@ erts_id2port(Eterm id) Port *prt; /* Only allowed to be called from managed threads */ - ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread()); if (is_not_internal_port(id)) return NULL; @@ -531,10 +551,10 @@ erts_id2port(Eterm id) if (!prt || prt->common.id != id) return NULL; - erts_smp_port_lock(prt); + erts_port_lock(prt); state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { - erts_smp_port_unlock(prt); + erts_port_unlock(prt); return NULL; } @@ -547,14 +567,12 @@ erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 invalid_sflgs) { -#ifdef ERTS_SMP int no_proc_locks = !c_p || !c_p_locks; -#endif erts_aint32_t state; Port *prt; /* Only allowed to be called from managed threads */ - ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); + ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread()); if (is_not_internal_port(id)) return NULL; @@ -565,21 +583,17 @@ erts_id2port_sflgs(Eterm id, if (!prt || prt->common.id != id) return NULL; -#ifdef ERTS_SMP if (no_proc_locks) - erts_smp_port_lock(prt); - else if (erts_smp_port_trylock(prt) == EBUSY) { + erts_port_lock(prt); + else if (erts_port_trylock(prt) == EBUSY) { /* Unlock process locks, and acquire locks in lock order... */ - erts_smp_proc_unlock(c_p, c_p_locks); - erts_smp_port_lock(prt); - erts_smp_proc_lock(c_p, c_p_locks); + erts_proc_unlock(c_p, c_p_locks); + erts_port_lock(prt); + erts_proc_lock(c_p, c_p_locks); } -#endif state = erts_atomic32_read_nob(&prt->state); if (state & invalid_sflgs) { -#ifdef ERTS_SMP - erts_smp_port_unlock(prt); -#endif + erts_port_unlock(prt); return NULL; } @@ -590,18 +604,48 @@ ERTS_GLB_INLINE void erts_port_release(Port *prt) { /* Only allowed to be called from managed threads */ - ERTS_SMP_LC_ASSERT(erts_thr_progress_is_managed_thread()); -#ifdef ERTS_SMP - erts_smp_port_unlock(prt); -#else - if (prt->cleanup) { - prt->cleanup = 0; - erts_port_cleanup(prt); - } -#endif + ERTS_LC_ASSERT(erts_thr_progress_is_managed_thread()); + erts_port_unlock(prt); } -#ifdef ERTS_SMP +/* + * erts_thr_id2port_sflgs() and erts_port_dec_refc(prt) can + * be used by unmanaged threads in the SMP case. + */ +ERTS_GLB_INLINE Port * +erts_thr_port_lookup(Eterm id, Uint32 invalid_sflgs) +{ + Port *prt; + ErtsThrPrgrDelayHandle dhndl; + + if (is_not_internal_port(id)) + return NULL; + + dhndl = erts_thr_progress_unmanaged_delay(); + + prt = (Port *) erts_ptab_pix2intptr_ddrb(&erts_port, + internal_port_index(id)); + + if (!prt || prt->common.id != id) { + erts_thr_progress_unmanaged_continue(dhndl); + return NULL; + } + else { + erts_aint32_t state; + erts_port_inc_refc(prt); + + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_thr_progress_unmanaged_continue(dhndl); + + state = erts_atomic32_read_acqb(&prt->state); + if (state & invalid_sflgs) { + erts_port_dec_refc(prt); + return NULL; + } + + return prt; + } +} /* * erts_thr_id2port_sflgs() and erts_thr_port_release() can @@ -649,13 +693,10 @@ ERTS_GLB_INLINE void erts_thr_port_release(Port *prt) { erts_mtx_unlock(prt->lock); -#ifdef ERTS_SMP if (!erts_thr_progress_is_managed_thread()) erts_port_dec_refc(prt); -#endif } -#endif ERTS_GLB_INLINE Port * erts_thr_drvport2port(ErlDrvPort drvport, int lock_pdl) @@ -668,10 +709,10 @@ erts_thr_drvport2port(ErlDrvPort drvport, int lock_pdl) if (lock_pdl && prt->port_data_lock) driver_pdl_lock(prt->port_data_lock); -#if ERTS_ENABLE_LOCK_CHECK +#ifdef ERTS_ENABLE_LOCK_CHECK if (!ERTS_IS_CRASH_DUMPING) { if (erts_lc_is_emu_thr()) { - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); ERTS_LC_ASSERT(!prt->port_data_lock || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); } @@ -697,10 +738,10 @@ erts_drvport2port_state(ErlDrvPort drvport, erts_aint32_t *statep) Port *prt = ERTS_ErlDrvPort2Port(drvport); erts_aint32_t state; ASSERT(prt); - ERTS_LC_ASSERT(erts_lc_is_emu_thr()); +// ERTS_LC_ASSERT(erts_lc_is_emu_thr()); if (prt == ERTS_INVALID_ERL_DRV_PORT) return ERTS_INVALID_ERL_DRV_PORT; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt) || ERTS_IS_CRASH_DUMPING); /* * This state check is only needed since a driver callback @@ -757,23 +798,21 @@ erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep) int reds = 0; erts_aint32_t state; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); state = erts_atomic32_read_nob(&prt->state); if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(prt)) { reds += ERTS_PORT_REDS_TERMINATE; erts_terminate_port(prt); state = erts_atomic32_read_nob(&prt->state); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); } -#ifdef ERTS_SMP if (prt->xports) { reds += erts_port_handle_xports(prt); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); ASSERT(!prt->xports); } -#endif if (statep) *statep = state; @@ -781,22 +820,40 @@ erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep) return reds; } +ERTS_GLB_INLINE +Port *erts_get_current_port(void) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + return esdp ? esdp->current_port : NULL; +} + +ERTS_GLB_INLINE +Eterm erts_get_current_port_id(void) +{ + Port *port = erts_get_current_port(); + return port ? port->common.id : THE_NON_VALUE; +} + #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ void erts_port_resume_procs(Port *); struct binary; -#define ERTS_P2P_SIG_TYPE_BAD 0 -#define ERTS_P2P_SIG_TYPE_OUTPUT 1 -#define ERTS_P2P_SIG_TYPE_OUTPUTV 2 -#define ERTS_P2P_SIG_TYPE_CONNECT 3 -#define ERTS_P2P_SIG_TYPE_EXIT 4 -#define ERTS_P2P_SIG_TYPE_CONTROL 5 -#define ERTS_P2P_SIG_TYPE_CALL 6 -#define ERTS_P2P_SIG_TYPE_INFO 7 -#define ERTS_P2P_SIG_TYPE_LINK 8 -#define ERTS_P2P_SIG_TYPE_UNLINK 9 +enum { + ERTS_P2P_SIG_TYPE_BAD = 0, + ERTS_P2P_SIG_TYPE_OUTPUT = 1, + ERTS_P2P_SIG_TYPE_OUTPUTV = 2, + ERTS_P2P_SIG_TYPE_CONNECT = 3, + ERTS_P2P_SIG_TYPE_EXIT = 4, + ERTS_P2P_SIG_TYPE_CONTROL = 5, + ERTS_P2P_SIG_TYPE_CALL = 6, + ERTS_P2P_SIG_TYPE_INFO = 7, + ERTS_P2P_SIG_TYPE_LINK = 8, + ERTS_P2P_SIG_TYPE_UNLINK = 9, + ERTS_P2P_SIG_TYPE_MONITOR = 10, + ERTS_P2P_SIG_TYPE_DEMONITOR = 11 +}; #define ERTS_P2P_SIG_TYPE_BITS 4 #define ERTS_P2P_SIG_TYPE_MASK \ @@ -852,12 +909,21 @@ struct ErtsProc2PortSigData_ { Eterm item; } info; struct { - Eterm port; - Eterm to; + Eterm port_id; + ErtsLink *lnk; } link; struct { - Eterm from; + Eterm port_id; + ErtsLink *lnk; } unlink; + struct { + Eterm port_id; + ErtsMonitor *mon; + } monitor; + struct { + Eterm port_id; + ErtsMonitor *mon; + } demonitor; } u; } ; @@ -902,7 +968,6 @@ typedef int (*ErtsProc2PortSigCallback)(Port *, typedef enum { ERTS_PORT_OP_BADARG, - ERTS_PORT_OP_CALLER_EXIT, ERTS_PORT_OP_BUSY, ERTS_PORT_OP_BUSY_SCHEDULED, ERTS_PORT_OP_SCHEDULED, @@ -910,17 +975,7 @@ typedef enum { ERTS_PORT_OP_DONE } ErtsPortOpResult; -ErtsPortOpResult -erts_schedule_proc2port_signal(Process *, - Port *, - Eterm, - Eterm *, - ErtsProc2PortSigData *, - int, - ErtsPortTaskHandle *, - ErtsProc2PortSigCallback); - -int erts_deliver_port_exit(Port *, Eterm, Eterm, int); +int erts_deliver_port_exit(Port *, Eterm, Eterm, int, int); /* * Port signal flags @@ -948,10 +1003,21 @@ ErtsPortOpResult erts_port_command(Process *, int, Port *, Eterm, Eterm *); ErtsPortOpResult erts_port_output(Process *, int, Port *, Eterm, Eterm, Eterm *); ErtsPortOpResult erts_port_exit(Process *, int, Port *, Eterm, Eterm, Eterm *); ErtsPortOpResult erts_port_connect(Process *, int, Port *, Eterm, Eterm, Eterm *); -ErtsPortOpResult erts_port_link(Process *, Port *, Eterm, Eterm *); -ErtsPortOpResult erts_port_unlink(Process *, Port *, Eterm, Eterm *); +ErtsPortOpResult erts_port_link(Process *, Port *, ErtsLink *, Eterm *); +ErtsPortOpResult erts_port_unlink(Process *, Port *, ErtsLink *, Eterm *); ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm *); ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *); ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *); +ErtsPortOpResult erts_port_monitor(Process *, Port *, ErtsMonitor *); +ErtsPortOpResult erts_port_demonitor(Process *, Port *, ErtsMonitor *); +/* defined in erl_bif_port.c */ +Port *erts_sig_lookup_port(Process *c_p, Eterm id_or_name); + +int erts_port_output_async(Port *, Eterm, Eterm); + +/* + * Signals from ports to ports. Used by sys drivers. + */ +int erl_drv_port_control(Eterm, char, char*, ErlDrvSizeT); #endif |