aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c290
1 files changed, 258 insertions, 32 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 3309b77086..79022d5dd7 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -72,6 +72,15 @@ erts_driver_t fd_driver;
static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
static void terminate_port(Port *p);
static void pdl_init(void);
+#ifdef ERTS_SMP
+static void driver_monitor_lock_pdl(Port *p);
+static void driver_monitor_unlock_pdl(Port *p);
+#define DRV_MONITOR_LOCK_PDL(Port) driver_monitor_lock_pdl(Port)
+#define DRV_MONITOR_UNLOCK_PDL(Port) driver_monitor_unlock_pdl(Port)
+#else
+#define DRV_MONITOR_LOCK_PDL(Port) /* nothing */
+#define DRV_MONITOR_UNLOCK_PDL(Port) /* nothing */
+#endif
static ERTS_INLINE ErlIOQueue*
drvport2ioq(ErlDrvPort drvport)
@@ -271,10 +280,36 @@ erts_test_next_port(int set, Uint next)
return res;
}
+
+static void port_cleanup(Port *prt);
+
+#ifdef ERTS_SMP
+
+static void
+sched_port_cleanup(void *vprt)
+{
+ Port *prt = (Port *) vprt;
+ erts_smp_mtx_lock(prt->lock);
+ port_cleanup(prt);
+}
+
+#endif
+
void
erts_port_cleanup(Port *prt)
{
#ifdef ERTS_SMP
+ if (erts_smp_mtx_trylock(prt->lock) == EBUSY)
+ erts_schedule_misc_op(sched_port_cleanup, (void *) prt);
+ else
+#endif
+ port_cleanup(prt);
+}
+
+void
+port_cleanup(Port *prt)
+{
+#ifdef ERTS_SMP
Uint32 port_specific;
erts_smp_mtx_t *mtx;
#endif
@@ -1540,14 +1575,14 @@ static void deliver_read_message(Port* prt, Eterm to,
pb = (ProcBin *) hp;
pb->thing_word = HEADER_PROC_BIN;
pb->size = len;
- pb->next = ohp->mso;
- ohp->mso = pb;
+ pb->next = ohp->first;
+ ohp->first = (struct erl_off_heap_header*)pb;
pb->val = bptr;
pb->bytes = (byte*) bptr->orig_bytes;
pb->flags = 0;
hp += PROC_BIN_SIZE;
- ohp->overhead += pb->size / sizeof(Eterm);
+ OH_OVERHEAD(ohp, pb->size / sizeof(Eterm));
listp = make_binary(pb);
}
@@ -1690,14 +1725,14 @@ deliver_vec_message(Port* prt, /* Port */
}
pb->thing_word = HEADER_PROC_BIN;
pb->size = iov->iov_len;
- pb->next = ohp->mso;
- ohp->mso = pb;
+ pb->next = ohp->first;
+ ohp->first = (struct erl_off_heap_header*)pb;
pb->val = ErlDrvBinary2Binary(b);
pb->bytes = base;
pb->flags = 0;
hp += PROC_BIN_SIZE;
- ohp->overhead += iov->iov_len / sizeof(Eterm);
+ OH_OVERHEAD(ohp, iov->iov_len / sizeof(Eterm));
if (listp == NIL) { /* compatible with deliver_bin_message */
listp = make_binary(pb);
@@ -1998,12 +2033,13 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason)
p->nlinks = NULL;
erts_sweep_links(lnk, &sweep_one_link, &sc);
}
+ DRV_MONITOR_LOCK_PDL(p);
{
ErtsMonitor *moni = p->monitors;
p->monitors = NULL;
erts_sweep_monitors(moni, &sweep_one_monitor, NULL);
}
-
+ DRV_MONITOR_UNLOCK_PDL(p);
if ((p->status & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) {
erts_do_net_exits(p->dist_entry, rreason);
@@ -2223,12 +2259,12 @@ erts_port_control(Process* p, Port* prt, Uint command, Eterm iolist)
ProcBin* pb = (ProcBin *) HAlloc(p, PROC_BIN_SIZE);
pb->thing_word = HEADER_PROC_BIN;
pb->size = dbin->orig_size;
- pb->next = MSO(p).mso;
- MSO(p).mso = pb;
+ pb->next = MSO(p).first;
+ MSO(p).first = (struct erl_off_heap_header*)pb;
pb->val = ErlDrvBinary2Binary(dbin);
pb->bytes = (byte*) dbin->orig_bytes;
pb->flags = 0;
- MSO(p).overhead += dbin->orig_size / sizeof(Eterm);
+ OH_OVERHEAD(&(MSO(p)), dbin->orig_size / sizeof(Eterm));
return make_binary(pb);
}
port_resp = dbin->orig_bytes;
@@ -2997,14 +3033,14 @@ driver_deliver_term(ErlDrvPort port,
driver_binary_inc_refc(b); /* caller will free binary */
pb->thing_word = HEADER_PROC_BIN;
pb->size = size;
- pb->next = ohp->mso;
- ohp->mso = pb;
+ pb->next = ohp->first;
+ ohp->first = (struct erl_off_heap_header*)pb;
pb->val = ErlDrvBinary2Binary(b);
pb->bytes = ((byte*) b->orig_bytes) + offset;
pb->flags = 0;
mess = make_binary(pb);
hp += PROC_BIN_SIZE;
- ohp->overhead += pb->size / sizeof(Eterm);
+ OH_OVERHEAD(ohp, pb->size / sizeof(Eterm));
}
ptr += 3;
break;
@@ -3036,12 +3072,12 @@ driver_deliver_term(ErlDrvPort port,
hp += PROC_BIN_SIZE;
pbp->thing_word = HEADER_PROC_BIN;
pbp->size = size;
- pbp->next = ohp->mso;
- ohp->mso = pbp;
+ pbp->next = ohp->first;
+ ohp->first = (struct erl_off_heap_header*)pbp;
pbp->val = bp;
pbp->bytes = (byte*) bp->orig_bytes;
pbp->flags = 0;
- ohp->overhead += (pbp->size / sizeof(Eterm));
+ OH_OVERHEAD(ohp, pbp->size / sizeof(Eterm));
mess = make_binary(pbp);
}
ptr += 2;
@@ -3536,6 +3572,32 @@ static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl)
erts_free(ERTS_ALC_T_PORT_DATA_LOCK, pdl);
}
+#ifdef ERTS_SMP
+
+static void driver_monitor_lock_pdl(Port *p) {
+ if (p->port_data_lock) {
+ driver_pdl_lock(p->port_data_lock);
+ }
+ /* Now we either have the port lock or the port_data_lock */
+ ERTS_LC_ASSERT(!p->port_data_lock
+ || erts_lc_mtx_is_locked(&(p->port_data_lock->mtx)));
+ ERTS_SMP_LC_ASSERT(p->port_data_lock
+ || erts_lc_is_port_locked(p));
+}
+
+static void driver_monitor_unlock_pdl(Port *p) {
+ /* We should either have the port lock or the port_data_lock */
+ ERTS_LC_ASSERT(!p->port_data_lock
+ || erts_lc_mtx_is_locked(&(p->port_data_lock->mtx)));
+ ERTS_SMP_LC_ASSERT(p->port_data_lock
+ || erts_lc_is_port_locked(p));
+ if (p->port_data_lock) {
+ driver_pdl_unlock(p->port_data_lock);
+ }
+}
+
+#endif
+
/*
* exported driver_pdl_* functions ...
*/
@@ -3994,7 +4056,7 @@ drv_cancel_timer(Port *prt)
erts_port_task_abort(prt->id, &prt->timeout_task);
}
-int driver_set_timer(ErlDrvPort ix, Uint t)
+int driver_set_timer(ErlDrvPort ix, UWord t)
{
Port* prt = erts_drvport2port(ix);
@@ -4053,12 +4115,16 @@ driver_read_timer(ErlDrvPort ix, unsigned long* t)
int
driver_get_now(ErlDrvNowData *now_data)
{
+ Uint mega,secs,micro;
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (now_data == NULL) {
return -1;
}
- get_now(&(now_data->megasecs),&(now_data->secs),&(now_data->microsecs));
+ get_now(&mega,&secs,&micro);
+ now_data->megasecs = (unsigned long) mega;
+ now_data->secs = (unsigned long) secs;
+ now_data->microsecs = (unsigned long) micro;
return 0;
}
@@ -4072,14 +4138,15 @@ static void ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon)
memcpy(mon,refp,sizeof(RefThing));
}
-int driver_monitor_process(ErlDrvPort port,
- ErlDrvTermData process,
- ErlDrvMonitor *monitor)
+
+static int do_driver_monitor_process(Port *prt,
+ Eterm *buf,
+ ErlDrvTermData process,
+ ErlDrvMonitor *monitor)
{
- Port *prt = erts_drvport2port(port);
Process *rp;
Eterm ref;
- Eterm buf[REF_THING_SIZE];
+
if (prt->drv_ptr->process_exit == NULL) {
return -1;
}
@@ -4089,22 +4156,76 @@ int driver_monitor_process(ErlDrvPort port,
if (!rp) {
return 1;
}
+
ref = erts_make_ref_in_buffer(buf);
erts_add_monitor(&(prt->monitors), MON_ORIGIN, ref, rp->id, NIL);
erts_add_monitor(&(rp->monitors), MON_TARGET, ref, prt->id, NIL);
-
+
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
ref_to_driver_monitor(ref,monitor);
return 0;
}
-int driver_demonitor_process(ErlDrvPort port,
- const ErlDrvMonitor *monitor)
+/*
+ * This can be called from a non scheduler thread iff a port_data_lock exists
+ */
+int driver_monitor_process(ErlDrvPort port,
+ ErlDrvTermData process,
+ ErlDrvMonitor *monitor)
+{
+ Port *prt;
+ int ret;
+ Uint32 status;
+ ErtsSchedulerData *sched = erts_get_scheduler_data();
+ int ix = (int) port;
+ if (ix < 0 || erts_max_ports <= ix) {
+ return -1;
+ }
+ prt = &erts_port[ix];
+
+ DRV_MONITOR_LOCK_PDL(prt);
+
+ if (sched) {
+ status = erts_port[ix].status;
+ } else {
+ erts_smp_port_state_lock(prt);
+ status = erts_port[ix].status;
+ erts_smp_port_state_unlock(prt);
+ }
+
+ if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
+ DRV_MONITOR_UNLOCK_PDL(prt);
+ return -1;
+ }
+
+ /* Now (in SMP) we should have either the port lock (if we have a scheduler) or the port data lock
+ (if we're a driver thread) */
+ ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
+
+#if !HEAP_ON_C_STACK
+ if (!sched) {
+ /* Need a separate allocation for the ref :( */
+ Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
+ sizeof(Eterm)*REF_THING_SIZE);
+ ret = do_driver_monitor_process(prt,buf,process,monitor);
+ erts_free(ERTS_ALC_T_TEMP_TERM,buf);
+ } else
+#endif
+ {
+ DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
+ UseTmpHeapNoproc(REF_THING_SIZE);
+ ret = do_driver_monitor_process(prt,buf,process,monitor);
+ UnUseTmpHeapNoproc(REF_THING_SIZE);
+ }
+ DRV_MONITOR_UNLOCK_PDL(prt);
+ return ret;
+}
+
+static int do_driver_demonitor_process(Port *prt, Eterm *buf,
+ const ErlDrvMonitor *monitor)
{
- Port *prt = erts_drvport2port(port);
Process *rp;
Eterm ref;
- Eterm buf[REF_THING_SIZE];
ErtsMonitor *mon;
Eterm to;
@@ -4137,12 +4258,60 @@ int driver_demonitor_process(ErlDrvPort port,
return 0;
}
-ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
+int driver_demonitor_process(ErlDrvPort port,
+ const ErlDrvMonitor *monitor)
+{
+ Port *prt;
+ int ret;
+ Uint32 status;
+ ErtsSchedulerData *sched = erts_get_scheduler_data();
+ int ix = (int) port;
+ if (ix < 0 || erts_max_ports <= ix) {
+ return -1;
+ }
+ prt = &erts_port[ix];
+
+ DRV_MONITOR_LOCK_PDL(prt);
+
+ if (sched) {
+ status = erts_port[ix].status;
+ } else {
+ erts_smp_port_state_lock(prt);
+ status = erts_port[ix].status;
+ erts_smp_port_state_unlock(prt);
+ }
+
+ if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
+ DRV_MONITOR_UNLOCK_PDL(prt);
+ return -1;
+ }
+
+ /* Now we should have either the port lock (if we have a scheduler) or the port data lock
+ (if we're a driver thread) */
+ ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
+#if !HEAP_ON_C_STACK
+ if (!sched) {
+ /* Need a separate allocation for the ref :( */
+ Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
+ sizeof(Eterm)*REF_THING_SIZE);
+ ret = do_driver_demonitor_process(prt,buf,monitor);
+ erts_free(ERTS_ALC_T_TEMP_TERM,buf);
+ } else
+#endif
+ {
+ DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
+ UseTmpHeapNoproc(REF_THING_SIZE);
+ ret = do_driver_demonitor_process(prt,buf,monitor);
+ UnUseTmpHeapNoproc(REF_THING_SIZE);
+ }
+ DRV_MONITOR_UNLOCK_PDL(prt);
+ return ret;
+}
+
+static ErlDrvTermData do_driver_get_monitored_process(Port *prt, Eterm *buf,
const ErlDrvMonitor *monitor)
{
- Port *prt = erts_drvport2port(port);
Eterm ref;
- Eterm buf[REF_THING_SIZE];
ErtsMonitor *mon;
Eterm to;
@@ -4158,6 +4327,59 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
return (ErlDrvTermData) to;
}
+
+ErlDrvTermData driver_get_monitored_process(ErlDrvPort port,
+ const ErlDrvMonitor *monitor)
+{
+ Port *prt;
+ ErlDrvTermData ret;
+ Uint32 status;
+ ErtsSchedulerData *sched = erts_get_scheduler_data();
+ int ix = (int) port;
+ if (ix < 0 || erts_max_ports <= ix) {
+ return driver_term_nil;
+ }
+ prt = &erts_port[ix];
+
+ DRV_MONITOR_LOCK_PDL(prt);
+
+ if (sched) {
+ status = erts_port[ix].status;
+ } else {
+ erts_smp_port_state_lock(prt);
+ status = erts_port[ix].status;
+ erts_smp_port_state_unlock(prt);
+ }
+
+ if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) {
+ DRV_MONITOR_UNLOCK_PDL(prt);
+ return driver_term_nil;
+ }
+
+ /* Now we should have either the port lock (if we have a scheduler) or the port data lock
+ (if we're a driver thread) */
+ ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock));
+
+#if !HEAP_ON_C_STACK
+ if (!sched) {
+ /* Need a separate allocation for the ref :( */
+ Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM,
+ sizeof(Eterm)*REF_THING_SIZE);
+ ret = do_driver_get_monitored_process(prt,buf,monitor);
+ erts_free(ERTS_ALC_T_TEMP_TERM,buf);
+ } else
+#endif
+ {
+ DeclareTmpHeapNoproc(buf,REF_THING_SIZE);
+ UseTmpHeapNoproc(REF_THING_SIZE);
+ ret = do_driver_get_monitored_process(prt,buf,monitor);
+ UnUseTmpHeapNoproc(REF_THING_SIZE);
+ }
+ DRV_MONITOR_UNLOCK_PDL(prt);
+ return ret;
+}
+
+
int driver_compare_monitors(const ErlDrvMonitor *monitor1,
const ErlDrvMonitor *monitor2)
{
@@ -4173,18 +4395,22 @@ void erts_fire_port_monitor(Port *prt, Eterm ref)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT(prt->drv_ptr != NULL);
-
+ DRV_MONITOR_LOCK_PDL(prt);
if (erts_lookup_monitor(prt->monitors,ref) == NULL) {
+ DRV_MONITOR_UNLOCK_PDL(prt);
return;
}
callback = prt->drv_ptr->process_exit;
ASSERT(callback != NULL);
ref_to_driver_monitor(ref,&drv_monitor);
+ DRV_MONITOR_UNLOCK_PDL(prt);
fpe_was_unmasked = erts_block_fpe();
(*callback)((ErlDrvData) (prt->drv_data), &drv_monitor);
erts_unblock_fpe(fpe_was_unmasked);
+ DRV_MONITOR_LOCK_PDL(prt);
/* remove monitor *after* callback */
rmon = erts_remove_monitor(&(prt->monitors),ref);
+ DRV_MONITOR_UNLOCK_PDL(prt);
if (rmon) {
erts_destroy_monitor(rmon);
}