diff options
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 489 |
1 files changed, 367 insertions, 122 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 3309b77086..df5f8b22a3 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2010. All Rights Reserved. + * Copyright Ericsson AB 1996-2011. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in @@ -56,7 +56,6 @@ static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error o per thread basis (for BC interfaces) */ Port* erts_port; /* The port table */ -erts_smp_atomic_t erts_ports_alive; erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */ erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */ @@ -72,6 +71,18 @@ erts_driver_t fd_driver; static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *); static void terminate_port(Port *p); static void pdl_init(void); +#ifdef ERTS_SMP +static void driver_monitor_lock_pdl(Port *p); +static void driver_monitor_unlock_pdl(Port *p); +#define DRV_MONITOR_LOCK_PDL(Port) driver_monitor_lock_pdl(Port) +#define DRV_MONITOR_UNLOCK_PDL(Port) driver_monitor_unlock_pdl(Port) +#else +#define DRV_MONITOR_LOCK_PDL(Port) /* nothing */ +#define DRV_MONITOR_UNLOCK_PDL(Port) /* nothing */ +#endif + +#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT) +#define SMALL_WRITE_VEC 16 static ERTS_INLINE ErlIOQueue* drvport2ioq(ErlDrvPort drvport) @@ -181,7 +192,7 @@ typedef struct line_buf_context { static erts_smp_spinlock_t get_free_port_lck; static Uint last_port_num; static Uint port_num_mask; -erts_smp_atomic_t erts_ports_snapshot; /* Identifies the _next_ snapshot (not the ongoing) */ +erts_smp_atomic32_t erts_ports_snapshot; /* Identifies the _next_ snapshot (not the ongoing) */ static ERTS_INLINE void @@ -271,10 +282,36 @@ erts_test_next_port(int set, Uint next) return res; } + +static void port_cleanup(Port *prt); + +#ifdef ERTS_SMP + +static void +sched_port_cleanup(void *vprt) +{ + Port *prt = (Port *) vprt; + erts_smp_mtx_lock(prt->lock); + port_cleanup(prt); +} + +#endif + void erts_port_cleanup(Port *prt) { #ifdef ERTS_SMP + if (erts_smp_mtx_trylock(prt->lock) == EBUSY) + erts_schedule_misc_op(sched_port_cleanup, (void *) prt); + else +#endif + port_cleanup(prt); +} + +void +port_cleanup(Port *prt) +{ +#ifdef ERTS_SMP Uint32 port_specific; erts_smp_mtx_t *mtx; #endif @@ -386,14 +423,13 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver, new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1); sys_strcpy(new_name, name); erts_smp_runq_lock(runq); - erts_smp_atomic_inc(&erts_ports_alive); erts_smp_port_state_lock(prt); prt->status = ERTS_PORT_SFLG_CONNECTED | xstatus; - prt->snapshot = (Uint32) erts_smp_atomic_read(&erts_ports_snapshot); + prt->snapshot = erts_smp_atomic32_read(&erts_ports_snapshot); old_name = prt->name; prt->name = new_name; #ifdef ERTS_SMP - erts_smp_atomic_set(&prt->run_queue, (long) runq); + erts_smp_atomic_set(&prt->run_queue, (erts_aint_t) runq); #endif ASSERT(!prt->drv_ptr); prt->drv_ptr = driver; @@ -635,7 +671,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ #ifdef ERTS_SMP erts_cancel_smp_ptimer(port->ptimer); #else - erl_cancel_timer(&(port->tm)); + erts_cancel_timer(&(port->tm)); #endif stopq(port); kill_port(port); @@ -919,13 +955,14 @@ do { \ int _bitoffs; \ int _bitsize; \ ERTS_GET_REAL_BIN(obj, _real, _offset, _bitoffs, _bitsize); \ - ASSERT(_bitsize == 0); \ + if (_bitsize != 0) goto L_type_error; \ if (thing_subtag(*binary_val(_real)) == REFC_BINARY_SUBTAG && \ _bitoffs == 0) { \ b_size += _size; \ + if (b_size < _size) goto L_overflow_error; \ in_clist = 0; \ v_size++; \ - if (_size >= bin_limit) { \ + if (_size >= ERL_SMALL_IO_BIN_LIMIT) { \ p_in_clist = 0; \ p_v_size++; \ } else { \ @@ -937,6 +974,7 @@ do { \ } \ } else { \ c_size += _size; \ + if (c_size < _size) goto L_overflow_error; \ if (!in_clist) { \ in_clist = 1; \ v_size++; \ @@ -951,29 +989,30 @@ do { \ /* -** Size of a io list in bytes -** return -1 if error -** returns: - Total size of io list -** vsize - SysIOVec size needed for a writev -** csize - Number of bytes not in binary (in the common binary) -** pvsize - SysIOVec size needed if packing small binaries -** pcsize - Number of bytes in the common binary if packing -*/ + * Returns 0 if successful and a non-zero value otherwise. + * + * Return values through pointers: + * *vsize - SysIOVec size needed for a writev + * *csize - Number of bytes not in binary (in the common binary) + * *pvsize - SysIOVec size needed if packing small binaries + * *pcsize - Number of bytes in the common binary if packing + * *total_size - Total size of iolist in bytes + */ static int -io_list_vec_len(Eterm obj, int* vsize, int* csize, - int bin_limit, /* small binaries limit */ - int * pvsize, int * pcsize) +io_list_vec_len(Eterm obj, Uint* vsize, Uint* csize, + Uint* pvsize, Uint* pcsize, Uint* total_size) { DECLARE_ESTACK(s); Eterm* objp; - int v_size = 0; - int c_size = 0; - int b_size = 0; - int in_clist = 0; - int p_v_size = 0; - int p_c_size = 0; - int p_in_clist = 0; + Uint v_size = 0; + Uint c_size = 0; + Uint b_size = 0; + Uint in_clist = 0; + Uint p_v_size = 0; + Uint p_c_size = 0; + Uint p_in_clist = 0; + Uint total; goto L_jump_start; /* avoid a push */ @@ -987,6 +1026,9 @@ io_list_vec_len(Eterm obj, int* vsize, int* csize, if (is_byte(obj)) { c_size++; + if (c_size == 0) { + goto L_overflow_error; + } if (!in_clist) { in_clist = 1; v_size++; @@ -1026,32 +1068,31 @@ io_list_vec_len(Eterm obj, int* vsize, int* csize, } } + total = c_size + b_size; + if (total < c_size) { + goto L_overflow_error; + } + *total_size = total; + DESTROY_ESTACK(s); - if (vsize != NULL) - *vsize = v_size; - if (csize != NULL) - *csize = c_size; - if (pvsize != NULL) - *pvsize = p_v_size; - if (pcsize != NULL) - *pcsize = p_c_size; - return c_size + b_size; + *vsize = v_size; + *csize = c_size; + *pvsize = p_v_size; + *pcsize = p_c_size; + return 0; L_type_error: + L_overflow_error: DESTROY_ESTACK(s); - return -1; + return 1; } -#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT) -#define SMALL_WRITE_VEC 16 - - /* write data to a port */ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) { char *buf; erts_driver_t *drv = p->drv_ptr; - int size; + Uint size; int fpe_was_unmasked; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); @@ -1059,10 +1100,10 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) p->caller = caller_id; if (drv->outputv != NULL) { - int vsize; - int csize; - int pvsize; - int pcsize; + Uint vsize; + Uint csize; + Uint pvsize; + Uint pcsize; int blimit; SysIOVec iv[SMALL_WRITE_VEC]; ErlDrvBinary* bv[SMALL_WRITE_VEC]; @@ -1071,9 +1112,8 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) ErlDrvBinary* cbin; ErlIOVec ev; - if ((size = io_list_vec_len(list, &vsize, &csize, - ERL_SMALL_IO_BIN_LIMIT, - &pvsize, &pcsize)) < 0) { + if (io_list_vec_len(list, &vsize, &csize, + &pvsize, &pcsize, &size)) { goto bad_value; } /* To pack or not to pack (small binaries) ...? */ @@ -1148,7 +1188,7 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) else { ASSERT(r == -1); /* Overflow */ erts_free(ERTS_ALC_T_TMP, buf); - if ((size = io_list_len(list)) < 0) { + if (erts_iolist_size(list, &size)) { goto bad_value; } @@ -1191,7 +1231,6 @@ void init_io(void) { int i; ErlDrvEntry** dp; - ErlDrvEntry* drv; char maxports[21]; /* enough for any 64-bit integer */ size_t maxportssize = sizeof(maxports); Uint ports_bits = ERTS_PORTS_BITS; @@ -1240,7 +1279,6 @@ void init_io(void) erts_smp_atomic_init(&erts_bytes_out, 0); erts_smp_atomic_init(&erts_bytes_in, 0); - erts_smp_atomic_init(&erts_ports_alive, 0); for (i = 0; i < erts_max_ports; i++) { erts_port_task_init_sched(&erts_port[i].sched); @@ -1262,7 +1300,7 @@ void init_io(void) erts_port[i].port_data_lock = NULL; } - erts_smp_atomic_init(&erts_ports_snapshot, (long) 0); + erts_smp_atomic32_init(&erts_ports_snapshot, (erts_aint32_t) 0); last_port_num = 0; erts_smp_spinlock_init(&get_free_port_lck, "get_free_port"); @@ -1274,10 +1312,8 @@ void init_io(void) init_driver(&fd_driver, &fd_driver_entry, NULL); init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); init_driver(&spawn_driver, &spawn_driver_entry, NULL); - for (dp = driver_tab; *dp != NULL; dp++) { - drv = *dp; + for (dp = driver_tab; *dp != NULL; dp++) erts_add_driver_entry(*dp, NULL, 1); - } erts_smp_tsd_set(driver_list_lock_status_key, NULL); erts_smp_mtx_unlock(&erts_driver_list_lock); @@ -1540,14 +1576,14 @@ static void deliver_read_message(Port* prt, Eterm to, pb = (ProcBin *) hp; pb->thing_word = HEADER_PROC_BIN; pb->size = len; - pb->next = ohp->mso; - ohp->mso = pb; + pb->next = ohp->first; + ohp->first = (struct erl_off_heap_header*)pb; pb->val = bptr; pb->bytes = (byte*) bptr->orig_bytes; pb->flags = 0; hp += PROC_BIN_SIZE; - ohp->overhead += pb->size / sizeof(Eterm); + OH_OVERHEAD(ohp, pb->size / sizeof(Eterm)); listp = make_binary(pb); } @@ -1690,14 +1726,14 @@ deliver_vec_message(Port* prt, /* Port */ } pb->thing_word = HEADER_PROC_BIN; pb->size = iov->iov_len; - pb->next = ohp->mso; - ohp->mso = pb; + pb->next = ohp->first; + ohp->first = (struct erl_off_heap_header*)pb; pb->val = ErlDrvBinary2Binary(b); pb->bytes = base; pb->flags = 0; hp += PROC_BIN_SIZE; - ohp->overhead += iov->iov_len / sizeof(Eterm); + OH_OVERHEAD(ohp, iov->iov_len / sizeof(Eterm)); if (listp == NIL) { /* compatible with deliver_bin_message */ listp = make_binary(pb); @@ -1804,7 +1840,7 @@ terminate_port(Port *prt) #ifdef ERTS_SMP erts_cancel_smp_ptimer(prt->ptimer); #else - erl_cancel_timer(&prt->tm); + erts_cancel_timer(&prt->tm); #endif drv = prt->drv_ptr; @@ -1998,12 +2034,13 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) p->nlinks = NULL; erts_sweep_links(lnk, &sweep_one_link, &sc); } + DRV_MONITOR_LOCK_PDL(p); { ErtsMonitor *moni = p->monitors; p->monitors = NULL; erts_sweep_monitors(moni, &sweep_one_monitor, NULL); } - + DRV_MONITOR_UNLOCK_PDL(p); if ((p->status & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) { erts_do_net_exits(p->dist_entry, rreason); @@ -2114,7 +2151,7 @@ erts_port_control(Process* p, Port* prt, Uint command, Eterm iolist) byte* to_port = NULL; /* Buffer to write to port. */ /* Initialization is for shutting up warning about use before set. */ - int to_len = 0; /* Length of buffer. */ + Uint to_len = 0; /* Length of buffer. */ int must_free = 0; /* True if the buffer should be freed. */ char port_result[ERL_ONHEAP_BIN_LIMIT]; /* Default buffer for result from port. */ char* port_resp; /* Pointer to result buffer. */ @@ -2159,7 +2196,7 @@ erts_port_control(Process* p, Port* prt, Uint command, Eterm iolist) } else { ASSERT(r == -1); /* Overflow */ erts_free(ERTS_ALC_T_TMP, (void *) to_port); - if ((to_len = io_list_len(iolist)) < 0) { /* Type error */ + if (erts_iolist_size(iolist, &to_len)) { /* Type error */ return THE_NON_VALUE; } must_free = 1; @@ -2223,12 +2260,12 @@ erts_port_control(Process* p, Port* prt, Uint command, Eterm iolist) ProcBin* pb = (ProcBin *) HAlloc(p, PROC_BIN_SIZE); pb->thing_word = HEADER_PROC_BIN; pb->size = dbin->orig_size; - pb->next = MSO(p).mso; - MSO(p).mso = pb; + pb->next = MSO(p).first; + MSO(p).first = (struct erl_off_heap_header*)pb; pb->val = ErlDrvBinary2Binary(dbin); pb->bytes = (byte*) dbin->orig_bytes; pb->flags = 0; - MSO(p).overhead += dbin->orig_size / sizeof(Eterm); + OH_OVERHEAD(&(MSO(p)), dbin->orig_size / sizeof(Eterm)); return make_binary(pb); } port_resp = dbin->orig_bytes; @@ -2384,7 +2421,7 @@ void erts_raw_port_command(Port* p, byte* buf, Uint len) if (len > (Uint) INT_MAX) erl_exit(ERTS_ABORT_EXIT, - "Absurdly large data buffer (%bpu bytes) passed to" + "Absurdly large data buffer (%beu bytes) passed to" "output callback of %s driver.\n", len, p->drv_ptr->name ? p->drv_ptr->name : "unknown"); @@ -2766,17 +2803,25 @@ driver_deliver_term(ErlDrvPort port, break; case ERL_DRV_INT: /* signed int argument */ ERTS_DDT_CHK_ENOUGH_ARGS(1); +#if HALFWORD_HEAP + erts_bld_sint64(NULL, &need, (Sint64)ptr[0]); +#else /* check for bignum */ if (!IS_SSMALL((Sint)ptr[0])) need += BIG_UINT_HEAP_SIZE; /* use small_to_big */ +#endif ptr++; depth++; break; case ERL_DRV_UINT: /* unsigned int argument */ ERTS_DDT_CHK_ENOUGH_ARGS(1); +#if HALFWORD_HEAP + erts_bld_uint64(NULL, &need, (Uint64)ptr[0]); +#else /* check for bignum */ if (!IS_USMALL(0, (Uint)ptr[0])) need += BIG_UINT_HEAP_SIZE; /* use small_to_big */ +#endif ptr++; depth++; break; @@ -2943,22 +2988,30 @@ driver_deliver_term(ErlDrvPort port, break; case ERL_DRV_INT: /* signed int argument */ +#if HALFWORD_HEAP + mess = erts_bld_sint64(&hp, NULL, (Sint64)ptr[0]); +#else if (IS_SSMALL((Sint)ptr[0])) mess = make_small((Sint)ptr[0]); else { mess = small_to_big((Sint)ptr[0], hp); hp += BIG_UINT_HEAP_SIZE; } +#endif ptr++; break; case ERL_DRV_UINT: /* unsigned int argument */ +#if HALFWORD_HEAP + mess = erts_bld_uint64(&hp, NULL, (Uint64)ptr[0]); +#else if (IS_USMALL(0, (Uint)ptr[0])) mess = make_small((Uint)ptr[0]); else { mess = uint_to_big((Uint)ptr[0], hp); hp += BIG_UINT_HEAP_SIZE; } +#endif ptr++; break; @@ -2997,14 +3050,14 @@ driver_deliver_term(ErlDrvPort port, driver_binary_inc_refc(b); /* caller will free binary */ pb->thing_word = HEADER_PROC_BIN; pb->size = size; - pb->next = ohp->mso; - ohp->mso = pb; + pb->next = ohp->first; + ohp->first = (struct erl_off_heap_header*)pb; pb->val = ErlDrvBinary2Binary(b); pb->bytes = ((byte*) b->orig_bytes) + offset; pb->flags = 0; mess = make_binary(pb); hp += PROC_BIN_SIZE; - ohp->overhead += pb->size / sizeof(Eterm); + OH_OVERHEAD(ohp, pb->size / sizeof(Eterm)); } ptr += 3; break; @@ -3036,12 +3089,12 @@ driver_deliver_term(ErlDrvPort port, hp += PROC_BIN_SIZE; pbp->thing_word = HEADER_PROC_BIN; pbp->size = size; - pbp->next = ohp->mso; - ohp->mso = pbp; + pbp->next = ohp->first; + ohp->first = (struct erl_off_heap_header*)pbp; pbp->val = bp; pbp->bytes = (byte*) bp->orig_bytes; pbp->flags = 0; - ohp->overhead += (pbp->size / sizeof(Eterm)); + OH_OVERHEAD(ohp, pbp->size / sizeof(Eterm)); mess = make_binary(pbp); } ptr += 2; @@ -3200,7 +3253,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, int hlen, return 0; prt->bytes_in += (hlen + len); - erts_smp_atomic_add(&erts_bytes_in, (long) (hlen + len)); + erts_smp_atomic_add(&erts_bytes_in, (erts_aint_t) (hlen + len)); if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) { return erts_net_message(prt, prt->dist_entry, @@ -3235,7 +3288,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, int hlen, char* buf, int len) return 0; prt->bytes_in += (hlen + len); - erts_smp_atomic_add(&erts_bytes_in, (long) (hlen + len)); + erts_smp_atomic_add(&erts_bytes_in, (erts_aint_t) (hlen + len)); if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) { if (len == 0) return erts_net_message(prt, @@ -3312,7 +3365,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, int hlen, ErlIOVec* vec, int skip) /* XXX handle distribution !!! */ prt->bytes_in += (hlen + size); - erts_smp_atomic_add(&erts_bytes_in, (long) (hlen + size)); + erts_smp_atomic_add(&erts_bytes_in, (erts_aint_t) (hlen + size)); deliver_vec_message(prt, prt->connected, hbuf, hlen, binv, iov, n, size); return 0; } @@ -3356,25 +3409,25 @@ int len; * reference count on driver binaries... */ -long +ErlDrvSInt driver_binary_get_refc(ErlDrvBinary *dbp) { Binary* bp = ErlDrvBinary2Binary(dbp); - return erts_refc_read(&bp->refc, 1); + return (ErlDrvSInt) erts_refc_read(&bp->refc, 1); } -long +ErlDrvSInt driver_binary_inc_refc(ErlDrvBinary *dbp) { Binary* bp = ErlDrvBinary2Binary(dbp); - return erts_refc_inctest(&bp->refc, 2); + return (ErlDrvSInt) erts_refc_inctest(&bp->refc, 2); } -long +ErlDrvSInt driver_binary_dec_refc(ErlDrvBinary *dbp) { Binary* bp = ErlDrvBinary2Binary(dbp); - return erts_refc_dectest(&bp->refc, 1); + return (ErlDrvSInt) erts_refc_dectest(&bp->refc, 1); } @@ -3489,12 +3542,12 @@ pdl_init_refc(ErlDrvPDL pdl) erts_atomic_init(&pdl->refc, 1); } -static ERTS_INLINE long +static ERTS_INLINE ErlDrvSInt pdl_read_refc(ErlDrvPDL pdl) { - long refc = erts_atomic_read(&pdl->refc); + erts_aint_t refc = erts_atomic_read(&pdl->refc); ERTS_LC_ASSERT(refc >= 0); - return refc; + return (ErlDrvSInt) refc; } static ERTS_INLINE void @@ -3504,12 +3557,12 @@ pdl_inc_refc(ErlDrvPDL pdl) ERTS_LC_ASSERT(driver_pdl_get_refc(pdl) > 1); } -static ERTS_INLINE long +static ERTS_INLINE ErlDrvSInt pdl_inctest_refc(ErlDrvPDL pdl) { - long refc = erts_atomic_inctest(&pdl->refc); + erts_aint_t refc = erts_atomic_inctest(&pdl->refc); ERTS_LC_ASSERT(refc > 1); - return refc; + return (ErlDrvSInt) refc; } #if 0 /* unused */ @@ -3521,12 +3574,12 @@ pdl_dec_refc(ErlDrvPDL pdl) } #endif -static ERTS_INLINE long +static ERTS_INLINE ErlDrvSInt pdl_dectest_refc(ErlDrvPDL pdl) { - long refc = erts_atomic_dectest(&pdl->refc); + erts_aint_t refc = erts_atomic_dectest(&pdl->refc); ERTS_LC_ASSERT(refc >= 0); - return refc; + return (ErlDrvSInt) refc; } static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl) @@ -3536,6 +3589,32 @@ static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl) erts_free(ERTS_ALC_T_PORT_DATA_LOCK, pdl); } +#ifdef ERTS_SMP + +static void driver_monitor_lock_pdl(Port *p) { + if (p->port_data_lock) { + driver_pdl_lock(p->port_data_lock); + } + /* Now we either have the port lock or the port_data_lock */ + ERTS_LC_ASSERT(!p->port_data_lock + || erts_lc_mtx_is_locked(&(p->port_data_lock->mtx))); + ERTS_SMP_LC_ASSERT(p->port_data_lock + || erts_lc_is_port_locked(p)); +} + +static void driver_monitor_unlock_pdl(Port *p) { + /* We should either have the port lock or the port_data_lock */ + ERTS_LC_ASSERT(!p->port_data_lock + || erts_lc_mtx_is_locked(&(p->port_data_lock->mtx))); + ERTS_SMP_LC_ASSERT(p->port_data_lock + || erts_lc_is_port_locked(p)); + if (p->port_data_lock) { + driver_pdl_unlock(p->port_data_lock); + } +} + +#endif + /* * exported driver_pdl_* functions ... */ @@ -3571,7 +3650,7 @@ driver_pdl_lock(ErlDrvPDL pdl) void driver_pdl_unlock(ErlDrvPDL pdl) { - long refc; + ErlDrvSInt refc; #ifdef HARDDEBUG erts_fprintf(stderr, "driver_pdl_unlock(0x%08X)\r\n",(unsigned) pdl); #endif @@ -3581,28 +3660,30 @@ driver_pdl_unlock(ErlDrvPDL pdl) pdl_destroy(pdl); } -long +ErlDrvSInt driver_pdl_get_refc(ErlDrvPDL pdl) { return pdl_read_refc(pdl); } -long +ErlDrvSInt driver_pdl_inc_refc(ErlDrvPDL pdl) { - long refc = pdl_inctest_refc(pdl); + ErlDrvSInt refc = pdl_inctest_refc(pdl); #ifdef HARDDEBUG - erts_fprintf(stderr, "driver_pdl_inc_refc(0x%08X) -> %ld\r\n",(unsigned) pdl, refc); + erts_fprintf(stderr, "driver_pdl_inc_refc(%p) -> %bed\r\n", + pdl, refc); #endif return refc; } -long +ErlDrvSInt driver_pdl_dec_refc(ErlDrvPDL pdl) { - long refc = pdl_dectest_refc(pdl); + ErlDrvSInt refc = pdl_dectest_refc(pdl); #ifdef HARDDEBUG - erts_fprintf(stderr, "driver_pdl_dec_refc(0x%08X) -> %ld\r\n",(unsigned) pdl, refc); + erts_fprintf(stderr, "driver_pdl_dec_refc(%p) -> %bpd\r\n", + pdl, refc); #endif if (!refc) pdl_destroy(pdl); @@ -3988,13 +4069,13 @@ drv_cancel_timer(Port *prt) #ifdef ERTS_SMP erts_cancel_smp_ptimer(prt->ptimer); #else - erl_cancel_timer(&prt->tm); + erts_cancel_timer(&prt->tm); #endif if (erts_port_task_is_scheduled(&prt->timeout_task)) erts_port_task_abort(prt->id, &prt->timeout_task); } -int driver_set_timer(ErlDrvPort ix, Uint t) +int driver_set_timer(ErlDrvPort ix, UWord t) { Port* prt = erts_drvport2port(ix); @@ -4012,7 +4093,7 @@ int driver_set_timer(ErlDrvPort ix, Uint t) (ErlTimeoutProc) schedule_port_timeout, t); #else - erl_set_timer(&prt->tm, + erts_set_timer(&prt->tm, (ErlTimeoutProc) schedule_port_timeout, NULL, prt, @@ -4043,9 +4124,9 @@ driver_read_timer(ErlDrvPort ix, unsigned long* t) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); #ifdef ERTS_SMP - *t = prt->ptimer ? time_left(&prt->ptimer->timer.tm) : 0; + *t = prt->ptimer ? erts_time_left(&prt->ptimer->timer.tm) : 0; #else - *t = time_left(&prt->tm); + *t = erts_time_left(&prt->tm); #endif return 0; } @@ -4053,12 +4134,16 @@ driver_read_timer(ErlDrvPort ix, unsigned long* t) int driver_get_now(ErlDrvNowData *now_data) { + Uint mega,secs,micro; ERTS_SMP_CHK_NO_PROC_LOCKS; if (now_data == NULL) { return -1; } - get_now(&(now_data->megasecs),&(now_data->secs),&(now_data->microsecs)); + get_now(&mega,&secs,µ); + now_data->megasecs = (unsigned long) mega; + now_data->secs = (unsigned long) secs; + now_data->microsecs = (unsigned long) micro; return 0; } @@ -4072,14 +4157,15 @@ static void ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon) memcpy(mon,refp,sizeof(RefThing)); } -int driver_monitor_process(ErlDrvPort port, - ErlDrvTermData process, - ErlDrvMonitor *monitor) + +static int do_driver_monitor_process(Port *prt, + Eterm *buf, + ErlDrvTermData process, + ErlDrvMonitor *monitor) { - Port *prt = erts_drvport2port(port); Process *rp; Eterm ref; - Eterm buf[REF_THING_SIZE]; + if (prt->drv_ptr->process_exit == NULL) { return -1; } @@ -4089,22 +4175,76 @@ int driver_monitor_process(ErlDrvPort port, if (!rp) { return 1; } + ref = erts_make_ref_in_buffer(buf); erts_add_monitor(&(prt->monitors), MON_ORIGIN, ref, rp->id, NIL); erts_add_monitor(&(rp->monitors), MON_TARGET, ref, prt->id, NIL); - + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); ref_to_driver_monitor(ref,monitor); return 0; } -int driver_demonitor_process(ErlDrvPort port, - const ErlDrvMonitor *monitor) +/* + * This can be called from a non scheduler thread iff a port_data_lock exists + */ +int driver_monitor_process(ErlDrvPort port, + ErlDrvTermData process, + ErlDrvMonitor *monitor) +{ + Port *prt; + int ret; + Uint32 status; + ErtsSchedulerData *sched = erts_get_scheduler_data(); + int ix = (int) port; + if (ix < 0 || erts_max_ports <= ix) { + return -1; + } + prt = &erts_port[ix]; + + DRV_MONITOR_LOCK_PDL(prt); + + if (sched) { + status = erts_port[ix].status; + } else { + erts_smp_port_state_lock(prt); + status = erts_port[ix].status; + erts_smp_port_state_unlock(prt); + } + + if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + DRV_MONITOR_UNLOCK_PDL(prt); + return -1; + } + + /* Now (in SMP) we should have either the port lock (if we have a scheduler) or the port data lock + (if we're a driver thread) */ + ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock)); + +#if !HEAP_ON_C_STACK + if (!sched) { + /* Need a separate allocation for the ref :( */ + Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM, + sizeof(Eterm)*REF_THING_SIZE); + ret = do_driver_monitor_process(prt,buf,process,monitor); + erts_free(ERTS_ALC_T_TEMP_TERM,buf); + } else +#endif + { + DeclareTmpHeapNoproc(buf,REF_THING_SIZE); + UseTmpHeapNoproc(REF_THING_SIZE); + ret = do_driver_monitor_process(prt,buf,process,monitor); + UnUseTmpHeapNoproc(REF_THING_SIZE); + } + DRV_MONITOR_UNLOCK_PDL(prt); + return ret; +} + +static int do_driver_demonitor_process(Port *prt, Eterm *buf, + const ErlDrvMonitor *monitor) { - Port *prt = erts_drvport2port(port); Process *rp; Eterm ref; - Eterm buf[REF_THING_SIZE]; ErtsMonitor *mon; Eterm to; @@ -4137,12 +4277,60 @@ int driver_demonitor_process(ErlDrvPort port, return 0; } -ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, +int driver_demonitor_process(ErlDrvPort port, + const ErlDrvMonitor *monitor) +{ + Port *prt; + int ret; + Uint32 status; + ErtsSchedulerData *sched = erts_get_scheduler_data(); + int ix = (int) port; + if (ix < 0 || erts_max_ports <= ix) { + return -1; + } + prt = &erts_port[ix]; + + DRV_MONITOR_LOCK_PDL(prt); + + if (sched) { + status = erts_port[ix].status; + } else { + erts_smp_port_state_lock(prt); + status = erts_port[ix].status; + erts_smp_port_state_unlock(prt); + } + + if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + DRV_MONITOR_UNLOCK_PDL(prt); + return -1; + } + + /* Now we should have either the port lock (if we have a scheduler) or the port data lock + (if we're a driver thread) */ + ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock)); +#if !HEAP_ON_C_STACK + if (!sched) { + /* Need a separate allocation for the ref :( */ + Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM, + sizeof(Eterm)*REF_THING_SIZE); + ret = do_driver_demonitor_process(prt,buf,monitor); + erts_free(ERTS_ALC_T_TEMP_TERM,buf); + } else +#endif + { + DeclareTmpHeapNoproc(buf,REF_THING_SIZE); + UseTmpHeapNoproc(REF_THING_SIZE); + ret = do_driver_demonitor_process(prt,buf,monitor); + UnUseTmpHeapNoproc(REF_THING_SIZE); + } + DRV_MONITOR_UNLOCK_PDL(prt); + return ret; +} + +static ErlDrvTermData do_driver_get_monitored_process(Port *prt, Eterm *buf, const ErlDrvMonitor *monitor) { - Port *prt = erts_drvport2port(port); Eterm ref; - Eterm buf[REF_THING_SIZE]; ErtsMonitor *mon; Eterm to; @@ -4158,6 +4346,59 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, return (ErlDrvTermData) to; } + +ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, + const ErlDrvMonitor *monitor) +{ + Port *prt; + ErlDrvTermData ret; + Uint32 status; + ErtsSchedulerData *sched = erts_get_scheduler_data(); + int ix = (int) port; + if (ix < 0 || erts_max_ports <= ix) { + return driver_term_nil; + } + prt = &erts_port[ix]; + + DRV_MONITOR_LOCK_PDL(prt); + + if (sched) { + status = erts_port[ix].status; + } else { + erts_smp_port_state_lock(prt); + status = erts_port[ix].status; + erts_smp_port_state_unlock(prt); + } + + if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + DRV_MONITOR_UNLOCK_PDL(prt); + return driver_term_nil; + } + + /* Now we should have either the port lock (if we have a scheduler) or the port data lock + (if we're a driver thread) */ + ERTS_SMP_LC_ASSERT((sched != NULL || prt->port_data_lock)); + +#if !HEAP_ON_C_STACK + if (!sched) { + /* Need a separate allocation for the ref :( */ + Eterm *buf = erts_alloc(ERTS_ALC_T_TEMP_TERM, + sizeof(Eterm)*REF_THING_SIZE); + ret = do_driver_get_monitored_process(prt,buf,monitor); + erts_free(ERTS_ALC_T_TEMP_TERM,buf); + } else +#endif + { + DeclareTmpHeapNoproc(buf,REF_THING_SIZE); + UseTmpHeapNoproc(REF_THING_SIZE); + ret = do_driver_get_monitored_process(prt,buf,monitor); + UnUseTmpHeapNoproc(REF_THING_SIZE); + } + DRV_MONITOR_UNLOCK_PDL(prt); + return ret; +} + + int driver_compare_monitors(const ErlDrvMonitor *monitor1, const ErlDrvMonitor *monitor2) { @@ -4173,18 +4414,22 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); ASSERT(prt->drv_ptr != NULL); - + DRV_MONITOR_LOCK_PDL(prt); if (erts_lookup_monitor(prt->monitors,ref) == NULL) { + DRV_MONITOR_UNLOCK_PDL(prt); return; } callback = prt->drv_ptr->process_exit; ASSERT(callback != NULL); ref_to_driver_monitor(ref,&drv_monitor); + DRV_MONITOR_UNLOCK_PDL(prt); fpe_was_unmasked = erts_block_fpe(); (*callback)((ErlDrvData) (prt->drv_data), &drv_monitor); erts_unblock_fpe(fpe_was_unmasked); + DRV_MONITOR_LOCK_PDL(prt); /* remove monitor *after* callback */ rmon = erts_remove_monitor(&(prt->monitors),ref); + DRV_MONITOR_UNLOCK_PDL(prt); if (rmon) { erts_destroy_monitor(rmon); } |