diff options
author | Rickard Green <[email protected]> | 2012-09-21 15:12:07 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2012-12-07 00:24:26 +0100 |
commit | 6e01408aba71e26884c5db81b8e4fa89bd803576 (patch) | |
tree | 709bc0a2da80ffdc73fb7426a3de80a55774ff58 /erts/emulator/beam/erl_port.h | |
parent | 23c6f9e07a3cae7c05e55abd01ff798384241538 (diff) | |
download | otp-6e01408aba71e26884c5db81b8e4fa89bd803576.tar.gz otp-6e01408aba71e26884c5db81b8e4fa89bd803576.tar.bz2 otp-6e01408aba71e26884c5db81b8e4fa89bd803576.zip |
Implement true asynchronous signaling between processes and ports
Diffstat (limited to 'erts/emulator/beam/erl_port.h')
-rw-r--r-- | erts/emulator/beam/erl_port.h | 207 |
1 files changed, 183 insertions, 24 deletions
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 6ad92dcd7d..beeddd09a0 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -20,6 +20,7 @@ #ifndef ERL_PORT_TYPE__ #define ERL_PORT_TYPE__ typedef struct _erl_drv_port Port; +typedef struct ErtsProc2PortSigData_ ErtsProc2PortSigData; #endif #if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__) @@ -33,6 +34,10 @@ typedef struct _erl_drv_port Port; #define ERTS_DEFAULT_MAX_PORTS (1 << 16) #define ERTS_MIN_PORTS 1024 +extern int erts_port_synchronous_ops; +extern int erts_port_schedule_all_ops; +extern int erts_port_parallelism; + typedef struct erts_driver_t_ erts_driver_t; #define ERTS_INVALID_ERL_DRV_PORT ((ErlDrvPort) (SWord) -1) @@ -63,6 +68,19 @@ typedef struct line_buf { /* Buffer used in line oriented I/O */ The rest is the overflow buffer. */ } LineBuf; +/* + * Items part of erlang:port_info/1 result. Note am_registered_name + * *need* to be first. + */ + +#define ERTS_PORT_INFO_1_ITEMS \ + { am_registered_name, /* Needs to be first */ \ + am_name, \ + am_links, \ + am_id, \ + am_connected, \ + am_input, \ + am_output } /* * Port Specific Data. @@ -113,7 +131,7 @@ struct _erl_drv_port { erts_atomic32_t refc; int cleanup; #endif - erts_smp_atomic_t connected;/* A connected process */ + erts_atomic_t connected; /* A connected process */ Eterm caller; /* Current caller. */ Eterm data; /* Data associated with port. */ ErlHeapFragment* bp; /* Heap fragment holding data (NULL if imm data). */ @@ -135,6 +153,13 @@ struct _erl_drv_port { ErtsPrtSD *psd; /* Port specific data */ }; +#define ERTS_PORT_GET_CONNECTED(PRT) \ + ((Eterm) erts_atomic_read_nob(&(PRT)->connected)) +#define ERTS_PORT_SET_CONNECTED(PRT, PID) \ + erts_atomic_set_relb(&(PRT)->connected, (erts_aint_t) (PID)) +#define ERTS_PORT_INIT_CONNECTED(PRT, PID) \ + erts_atomic_init_nob(&(PRT)->connected, (erts_aint_t) (PID)) + struct erl_drv_port_data_lock { erts_mtx_t mtx; @@ -240,6 +265,7 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID) #define ERTS_PORT_SFLGS_INVALID_LOOKUP \ (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \ + | ERTS_PORT_SFLG_EXITING \ | ERTS_PORT_SFLG_CLOSING) #define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \ (ERTS_PORT_SFLGS_INVALID_LOOKUP \ @@ -358,17 +384,6 @@ erts_smp_port_unlock(Port *prt) #define ERTS_PORT_SCHED_ID(P, ID) \ ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID))) -#define ERTS_PORT_INIT_CONNECTED(PRT, PID) \ - erts_smp_atomic_init_nob(&(PRT)->connected, (erts_aint_t) (PID)) -#define ERTS_PORT_SET_CONNECTED(PRT, PID) \ - erts_smp_atomic_set_nob(&(PRT)->connected, (erts_aint_t) (PID)) -#define ERTS_PORT_SET_CONNECTED_RELB(PRT, PID) \ - erts_smp_atomic_set_relb(&(PRT)->connected, (erts_aint_t) (PID)) -#define ERTS_PORT_GET_CONNECTED(PRT) \ - ((Eterm) erts_smp_atomic_read_nob(&(PRT)->connected)) -#define ERTS_PORT_GET_CONNECTED_ACQB(PRT) \ - ((Eterm) erts_smp_atomic_read_acqb(&(PRT)->connected)) - extern const Port erts_invalid_port; #define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port) @@ -424,8 +439,11 @@ ERTS_GLB_INLINE Port * erts_port_lookup(Eterm id, Uint32 invalid_sflgs) { Port *prt = erts_port_lookup_raw(id); - erts_aint32_t state = erts_atomic32_read_nob(&prt->state); - return (state & invalid_sflgs) ? NULL : prt; + return (!prt + ? NULL + : ((invalid_sflgs & erts_atomic32_read_nob(&prt->state)) + ? NULL + : prt)); } @@ -584,14 +602,16 @@ erts_thr_drvport2port_raw(ErlDrvPort drvport) else { Port *prt = (Port *) drvport; #if ERTS_ENABLE_LOCK_CHECK - if (emu_thread) { - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - ERTS_LC_ASSERT(!prt->port_data_lock - || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); - } - else { - ERTS_LC_ASSERT(prt->port_data_lock); - ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); + if (!ERTS_IS_CRASH_DUMPING) { + if (emu_thread) { + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(!prt->port_data_lock + || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); + } + else { + ERTS_LC_ASSERT(prt->port_data_lock); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx)); + } } #endif return prt; @@ -606,7 +626,8 @@ erts_drvport2port_raw(ErlDrvPort drvport) return NULL; else { Port *prt = (Port *) drvport; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) + || ERTS_IS_CRASH_DUMPING); return prt; } } @@ -637,7 +658,8 @@ erts_drvportid2port(Eterm id) internal_port_index(id)); if (!prt) return NULL; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) + || ERTS_IS_CRASH_DUMPING); if (prt->common.id != id) return NULL; state = erts_atomic32_read_nob(&prt->state); @@ -680,4 +702,141 @@ erts_is_valid_tracer_port(Eterm id) } #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ +struct binary; + +#define ERTS_P2P_SIG_DATA_FLG_BANG_OP (1 << 0) +#define ERTS_P2P_SIG_DATA_FLG_REPLY (1 << 1) +#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND (1 << 2) +#define ERTS_P2P_SIG_DATA_FLG_FORCE (1 << 3) +#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT (1 << 4) +#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK (1 << 5) +#define ERTS_P2P_SIG_DATA_FLG_SCHED (1 << 6) + +struct ErtsProc2PortSigData_ { + int flags; + Eterm caller; + Uint32 ref[ERTS_MAX_REF_NUMBERS]; + union { + struct { + Eterm from; + ErlIOVec *evp; + ErlDrvBinary *cbinp; + } outputv; + struct { + Eterm from; + char *bufp; + ErlDrvSizeT size; + } output; + struct { + Eterm from; + Eterm connected; + } connect; + struct { + Eterm from; + Eterm reason; + ErlHeapFragment *bp; + } exit; + struct { + struct binary *binp; + unsigned int command; + char *bufp; + ErlDrvSizeT size; + } control; + struct { + unsigned int command; + char *bufp; + ErlDrvSizeT size; + } call; + struct { + Eterm item; + } info; + struct { + Eterm port; + Eterm to; + } link; + struct { + Eterm from; + } unlink; + struct { + ErlHeapFragment *bp; + Eterm data; + } set_data; + } u; +} ; + +#define ERTS_PROC2PORT_SIG_EXEC 0 +#define ERTS_PROC2PORT_SIG_ABORT 1 +#define ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND 2 +#define ERTS_PROC2PORT_SIG_ABORT_CLOSED 3 + +typedef int (*ErtsProc2PortSigCallback)(Port *, + erts_aint32_t, + int, + ErtsProc2PortSigData *); + +#define ERTS_PORT_REDS_CMD_OUTPUTV 400 +#define ERTS_PORT_REDS_CMD_OUTPUT 400 +#define ERTS_PORT_REDS_EXIT 300 +#define ERTS_PORT_REDS_CONNECT 40 +#define ERTS_PORT_REDS_UNLINK 40 +#define ERTS_PORT_REDS_LINK 40 +#define ERTS_PORT_REDS_BADSIG 40 +#define ERTS_PORT_REDS_CONTROL 400 +#define ERTS_PORT_REDS_CALL 400 +#define ERTS_PORT_REDS_INFO 100 +#define ERTS_PORT_REDS_SET_DATA 40 +#define ERTS_PORT_REDS_GET_DATA 40 + +typedef enum { + ERTS_PORT_OP_BADARG, + ERTS_PORT_OP_CALLER_EXIT, + ERTS_PORT_OP_BUSY, + ERTS_PORT_OP_BUSY_SCHEDULED, + ERTS_PORT_OP_SCHEDULED, + ERTS_PORT_OP_DROPPED, + ERTS_PORT_OP_DONE +} ErtsPortOpResult; + +ErtsPortOpResult +erts_schedule_proc2port_signal(Process *, + Port *, + Eterm, + Eterm *, + ErtsProc2PortSigData *, + int, + ErtsProc2PortSigCallback); + +int erts_deliver_port_exit(Port *, Eterm, Eterm, int); + +/* + * Port signal flags + */ +#define ERTS_PORT_SIG_FLG_BANG_OP ERTS_P2P_SIG_DATA_FLG_BANG_OP +#define ERTS_PORT_SIG_FLG_NOSUSPEND ERTS_P2P_SIG_DATA_FLG_NOSUSPEND +#define ERTS_PORT_SIG_FLG_FORCE ERTS_P2P_SIG_DATA_FLG_FORCE +#define ERTS_PORT_SIG_FLG_BROKEN_LINK ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK +#define ERTS_PORT_SIG_FLG_BAD_OUTPUT ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT +#define ERTS_PORT_SIG_FLG_FORCE_SCHED ERTS_P2P_SIG_DATA_FLG_SCHED + +/* + * Port ! {Owner, {command, Data}} + * Port ! {Owner, {connect, NewOwner}} + * Port ! {Owner, close} + */ +ErtsPortOpResult erts_port_command(Process *, int, Port *, Eterm, Eterm *); + +/* + * Signals from processes to ports. + */ +ErtsPortOpResult erts_port_output(Process *, int, Port *, Eterm, Eterm, Eterm *); +ErtsPortOpResult erts_port_exit(Process *, int, Port *, Eterm, Eterm, Eterm *); +ErtsPortOpResult erts_port_connect(Process *, int, Port *, Eterm, Eterm, Eterm *); +ErtsPortOpResult erts_port_link(Process *, Port *, Eterm, Eterm *); +ErtsPortOpResult erts_port_unlink(Process *, Port *, Eterm, Eterm *); +ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm *); +ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *); +ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *); +ErtsPortOpResult erts_port_set_data(Process *, Port *, Eterm, Eterm *); +ErtsPortOpResult erts_port_get_data(Process *, Port *, Eterm *); + #endif |