aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port.h
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-09-21 15:12:07 +0200
committerRickard Green <[email protected]>2012-12-07 00:24:26 +0100
commit6e01408aba71e26884c5db81b8e4fa89bd803576 (patch)
tree709bc0a2da80ffdc73fb7426a3de80a55774ff58 /erts/emulator/beam/erl_port.h
parent23c6f9e07a3cae7c05e55abd01ff798384241538 (diff)
downloadotp-6e01408aba71e26884c5db81b8e4fa89bd803576.tar.gz
otp-6e01408aba71e26884c5db81b8e4fa89bd803576.tar.bz2
otp-6e01408aba71e26884c5db81b8e4fa89bd803576.zip
Implement true asynchronous signaling between processes and ports
Diffstat (limited to 'erts/emulator/beam/erl_port.h')
-rw-r--r--erts/emulator/beam/erl_port.h207
1 files changed, 183 insertions, 24 deletions
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 6ad92dcd7d..beeddd09a0 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -20,6 +20,7 @@
#ifndef ERL_PORT_TYPE__
#define ERL_PORT_TYPE__
typedef struct _erl_drv_port Port;
+typedef struct ErtsProc2PortSigData_ ErtsProc2PortSigData;
#endif
#if !defined(ERL_PORT_H__) && !defined(ERL_PORT_GET_PORT_TYPE_ONLY__)
@@ -33,6 +34,10 @@ typedef struct _erl_drv_port Port;
#define ERTS_DEFAULT_MAX_PORTS (1 << 16)
#define ERTS_MIN_PORTS 1024
+extern int erts_port_synchronous_ops;
+extern int erts_port_schedule_all_ops;
+extern int erts_port_parallelism;
+
typedef struct erts_driver_t_ erts_driver_t;
#define ERTS_INVALID_ERL_DRV_PORT ((ErlDrvPort) (SWord) -1)
@@ -63,6 +68,19 @@ typedef struct line_buf { /* Buffer used in line oriented I/O */
The rest is the overflow buffer. */
} LineBuf;
+/*
+ * Items part of erlang:port_info/1 result. Note am_registered_name
+ * *need* to be first.
+ */
+
+#define ERTS_PORT_INFO_1_ITEMS \
+ { am_registered_name, /* Needs to be first */ \
+ am_name, \
+ am_links, \
+ am_id, \
+ am_connected, \
+ am_input, \
+ am_output }
/*
* Port Specific Data.
@@ -113,7 +131,7 @@ struct _erl_drv_port {
erts_atomic32_t refc;
int cleanup;
#endif
- erts_smp_atomic_t connected;/* A connected process */
+ erts_atomic_t connected; /* A connected process */
Eterm caller; /* Current caller. */
Eterm data; /* Data associated with port. */
ErlHeapFragment* bp; /* Heap fragment holding data (NULL if imm data). */
@@ -135,6 +153,13 @@ struct _erl_drv_port {
ErtsPrtSD *psd; /* Port specific data */
};
+#define ERTS_PORT_GET_CONNECTED(PRT) \
+ ((Eterm) erts_atomic_read_nob(&(PRT)->connected))
+#define ERTS_PORT_SET_CONNECTED(PRT, PID) \
+ erts_atomic_set_relb(&(PRT)->connected, (erts_aint_t) (PID))
+#define ERTS_PORT_INIT_CONNECTED(PRT, PID) \
+ erts_atomic_init_nob(&(PRT)->connected, (erts_aint_t) (PID))
+
struct erl_drv_port_data_lock {
erts_mtx_t mtx;
@@ -240,6 +265,7 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
(ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID)
#define ERTS_PORT_SFLGS_INVALID_LOOKUP \
(ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
+ | ERTS_PORT_SFLG_EXITING \
| ERTS_PORT_SFLG_CLOSING)
#define ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP \
(ERTS_PORT_SFLGS_INVALID_LOOKUP \
@@ -358,17 +384,6 @@ erts_smp_port_unlock(Port *prt)
#define ERTS_PORT_SCHED_ID(P, ID) \
((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID)))
-#define ERTS_PORT_INIT_CONNECTED(PRT, PID) \
- erts_smp_atomic_init_nob(&(PRT)->connected, (erts_aint_t) (PID))
-#define ERTS_PORT_SET_CONNECTED(PRT, PID) \
- erts_smp_atomic_set_nob(&(PRT)->connected, (erts_aint_t) (PID))
-#define ERTS_PORT_SET_CONNECTED_RELB(PRT, PID) \
- erts_smp_atomic_set_relb(&(PRT)->connected, (erts_aint_t) (PID))
-#define ERTS_PORT_GET_CONNECTED(PRT) \
- ((Eterm) erts_smp_atomic_read_nob(&(PRT)->connected))
-#define ERTS_PORT_GET_CONNECTED_ACQB(PRT) \
- ((Eterm) erts_smp_atomic_read_acqb(&(PRT)->connected))
-
extern const Port erts_invalid_port;
#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port)
@@ -424,8 +439,11 @@ ERTS_GLB_INLINE Port *
erts_port_lookup(Eterm id, Uint32 invalid_sflgs)
{
Port *prt = erts_port_lookup_raw(id);
- erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
- return (state & invalid_sflgs) ? NULL : prt;
+ return (!prt
+ ? NULL
+ : ((invalid_sflgs & erts_atomic32_read_nob(&prt->state))
+ ? NULL
+ : prt));
}
@@ -584,14 +602,16 @@ erts_thr_drvport2port_raw(ErlDrvPort drvport)
else {
Port *prt = (Port *) drvport;
#if ERTS_ENABLE_LOCK_CHECK
- if (emu_thread) {
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- ERTS_LC_ASSERT(!prt->port_data_lock
- || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
- }
- else {
- ERTS_LC_ASSERT(prt->port_data_lock);
- ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
+ if (!ERTS_IS_CRASH_DUMPING) {
+ if (emu_thread) {
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(!prt->port_data_lock
+ || erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
+ }
+ else {
+ ERTS_LC_ASSERT(prt->port_data_lock);
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&prt->port_data_lock->mtx));
+ }
}
#endif
return prt;
@@ -606,7 +626,8 @@ erts_drvport2port_raw(ErlDrvPort drvport)
return NULL;
else {
Port *prt = (Port *) drvport;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
+ || ERTS_IS_CRASH_DUMPING);
return prt;
}
}
@@ -637,7 +658,8 @@ erts_drvportid2port(Eterm id)
internal_port_index(id));
if (!prt)
return NULL;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
+ || ERTS_IS_CRASH_DUMPING);
if (prt->common.id != id)
return NULL;
state = erts_atomic32_read_nob(&prt->state);
@@ -680,4 +702,141 @@ erts_is_valid_tracer_port(Eterm id)
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
+struct binary;
+
+#define ERTS_P2P_SIG_DATA_FLG_BANG_OP (1 << 0)
+#define ERTS_P2P_SIG_DATA_FLG_REPLY (1 << 1)
+#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND (1 << 2)
+#define ERTS_P2P_SIG_DATA_FLG_FORCE (1 << 3)
+#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT (1 << 4)
+#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK (1 << 5)
+#define ERTS_P2P_SIG_DATA_FLG_SCHED (1 << 6)
+
+struct ErtsProc2PortSigData_ {
+ int flags;
+ Eterm caller;
+ Uint32 ref[ERTS_MAX_REF_NUMBERS];
+ union {
+ struct {
+ Eterm from;
+ ErlIOVec *evp;
+ ErlDrvBinary *cbinp;
+ } outputv;
+ struct {
+ Eterm from;
+ char *bufp;
+ ErlDrvSizeT size;
+ } output;
+ struct {
+ Eterm from;
+ Eterm connected;
+ } connect;
+ struct {
+ Eterm from;
+ Eterm reason;
+ ErlHeapFragment *bp;
+ } exit;
+ struct {
+ struct binary *binp;
+ unsigned int command;
+ char *bufp;
+ ErlDrvSizeT size;
+ } control;
+ struct {
+ unsigned int command;
+ char *bufp;
+ ErlDrvSizeT size;
+ } call;
+ struct {
+ Eterm item;
+ } info;
+ struct {
+ Eterm port;
+ Eterm to;
+ } link;
+ struct {
+ Eterm from;
+ } unlink;
+ struct {
+ ErlHeapFragment *bp;
+ Eterm data;
+ } set_data;
+ } u;
+} ;
+
+#define ERTS_PROC2PORT_SIG_EXEC 0
+#define ERTS_PROC2PORT_SIG_ABORT 1
+#define ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND 2
+#define ERTS_PROC2PORT_SIG_ABORT_CLOSED 3
+
+typedef int (*ErtsProc2PortSigCallback)(Port *,
+ erts_aint32_t,
+ int,
+ ErtsProc2PortSigData *);
+
+#define ERTS_PORT_REDS_CMD_OUTPUTV 400
+#define ERTS_PORT_REDS_CMD_OUTPUT 400
+#define ERTS_PORT_REDS_EXIT 300
+#define ERTS_PORT_REDS_CONNECT 40
+#define ERTS_PORT_REDS_UNLINK 40
+#define ERTS_PORT_REDS_LINK 40
+#define ERTS_PORT_REDS_BADSIG 40
+#define ERTS_PORT_REDS_CONTROL 400
+#define ERTS_PORT_REDS_CALL 400
+#define ERTS_PORT_REDS_INFO 100
+#define ERTS_PORT_REDS_SET_DATA 40
+#define ERTS_PORT_REDS_GET_DATA 40
+
+typedef enum {
+ ERTS_PORT_OP_BADARG,
+ ERTS_PORT_OP_CALLER_EXIT,
+ ERTS_PORT_OP_BUSY,
+ ERTS_PORT_OP_BUSY_SCHEDULED,
+ ERTS_PORT_OP_SCHEDULED,
+ ERTS_PORT_OP_DROPPED,
+ ERTS_PORT_OP_DONE
+} ErtsPortOpResult;
+
+ErtsPortOpResult
+erts_schedule_proc2port_signal(Process *,
+ Port *,
+ Eterm,
+ Eterm *,
+ ErtsProc2PortSigData *,
+ int,
+ ErtsProc2PortSigCallback);
+
+int erts_deliver_port_exit(Port *, Eterm, Eterm, int);
+
+/*
+ * Port signal flags
+ */
+#define ERTS_PORT_SIG_FLG_BANG_OP ERTS_P2P_SIG_DATA_FLG_BANG_OP
+#define ERTS_PORT_SIG_FLG_NOSUSPEND ERTS_P2P_SIG_DATA_FLG_NOSUSPEND
+#define ERTS_PORT_SIG_FLG_FORCE ERTS_P2P_SIG_DATA_FLG_FORCE
+#define ERTS_PORT_SIG_FLG_BROKEN_LINK ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK
+#define ERTS_PORT_SIG_FLG_BAD_OUTPUT ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT
+#define ERTS_PORT_SIG_FLG_FORCE_SCHED ERTS_P2P_SIG_DATA_FLG_SCHED
+
+/*
+ * Port ! {Owner, {command, Data}}
+ * Port ! {Owner, {connect, NewOwner}}
+ * Port ! {Owner, close}
+ */
+ErtsPortOpResult erts_port_command(Process *, int, Port *, Eterm, Eterm *);
+
+/*
+ * Signals from processes to ports.
+ */
+ErtsPortOpResult erts_port_output(Process *, int, Port *, Eterm, Eterm, Eterm *);
+ErtsPortOpResult erts_port_exit(Process *, int, Port *, Eterm, Eterm, Eterm *);
+ErtsPortOpResult erts_port_connect(Process *, int, Port *, Eterm, Eterm, Eterm *);
+ErtsPortOpResult erts_port_link(Process *, Port *, Eterm, Eterm *);
+ErtsPortOpResult erts_port_unlink(Process *, Port *, Eterm, Eterm *);
+ErtsPortOpResult erts_port_control(Process *, Port *, unsigned int, Eterm, Eterm *);
+ErtsPortOpResult erts_port_call(Process *, Port *, unsigned int, Eterm, Eterm *);
+ErtsPortOpResult erts_port_info(Process *, Port *, Eterm, Eterm *);
+ErtsPortOpResult erts_port_set_data(Process *, Port *, Eterm, Eterm *);
+ErtsPortOpResult erts_port_get_data(Process *, Port *, Eterm *);
+
#endif