From 6da93c20472f5d13b34a40ca53cba4fe6f352d24 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Thu, 2 Aug 2012 18:21:50 +0200 Subject: Generalize process table implementation --- erts/emulator/beam/io.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 4dd60b4d23..f9e1b62f14 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -477,7 +477,7 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver, prt->bp = NULL; prt->data = am_undefined; /* Set default tracing */ - erts_get_default_tracing(&(prt->trace_flags), &(prt->tracer_proc)); + erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt)); prt->psd = NULL; @@ -1345,8 +1345,8 @@ void init_io(void) erts_port[i].xports = NULL; erts_smp_spinlock_init_x(&erts_port[i].state_lck, "port_state", make_small(i)); #endif - erts_port[i].tracer_proc = NIL; - erts_port[i].trace_flags = 0; + ERTS_TRACER_PROC(&erts_port[i]) = NIL; + ERTS_TRACE_FLAGS(&erts_port[i]) = 0; erts_port[i].drv_ptr = NULL; erts_port[i].status = ERTS_PORT_SFLG_FREE; @@ -1556,9 +1556,7 @@ deliver_result(Eterm sender, Eterm pid, Eterm res) ERTS_SMP_CHK_NO_PROC_LOCKS; - ASSERT(is_internal_port(sender) - && is_internal_pid(pid) - && internal_pid_index(pid) < erts_max_processes); + ASSERT(is_internal_port(sender) && is_internal_pid(pid)); rp = (scheduler ? erts_proc_lookup(pid) -- cgit v1.2.3 From 3b523c25af0df45fbf68ab3cf50c0556f1d4e0a1 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Mon, 4 Jun 2012 20:47:08 +0200 Subject: Atomic port state --- erts/emulator/beam/io.c | 343 +++++++++++++++++++++++++----------------------- 1 file changed, 180 insertions(+), 163 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index f9e1b62f14..00ecf1fca2 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -90,35 +90,35 @@ static ERTS_INLINE ErlIOQueue* drvport2ioq(ErlDrvPort drvport) { int ix = (int) drvport; - Uint32 status; + erts_aint32_t state; if (ix < 0 || erts_max_ports <= ix) return NULL; + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + +#ifdef ERTS_ENABLE_LOCK_CHECK + if (erts_get_scheduler_data()) { ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); ERTS_LC_ASSERT(!erts_port[ix].port_data_lock || erts_lc_mtx_is_locked( &erts_port[ix].port_data_lock->mtx)); - - status = erts_port[ix].status; } else { - erts_smp_port_state_lock(&erts_port[ix]); - status = erts_port[ix].status; - erts_smp_port_state_unlock(&erts_port[ix]); - - ERTS_LC_ASSERT((status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + ERTS_LC_ASSERT((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) || erts_port[ix].port_data_lock); ERTS_LC_ASSERT(!erts_port[ix].port_data_lock || erts_lc_mtx_is_locked( &erts_port[ix].port_data_lock->mtx)); - } - return ((status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) - ? NULL - : &erts_port[ix].ioq); +#endif + + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return NULL; + else + return &erts_port[ix].ioq; } static ERTS_INLINE int @@ -216,7 +216,7 @@ kill_port(Port *pp) { ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); erts_port_task_free_port(pp); - ASSERT(pp->status & ERTS_PORT_SFLGS_DEAD); + ASSERT(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD); } #ifdef ERTS_SMP @@ -242,27 +242,32 @@ get_free_port(void) erts_smp_spin_lock(&get_free_port_lck); num = last_port_num + 1; - for (;; ++num) { + for (;; ++num) { + erts_aint32_t act; + port = &erts_port[num & erts_port_tab_index_mask]; - erts_smp_port_state_lock(port); - if (port->status & ERTS_PORT_SFLG_FREE) { - last_port_num = num; - erts_smp_spin_unlock(&get_free_port_lck); - break; + act = erts_smp_atomic32_read_nob(&port->state); + + while (act & ERTS_PORT_SFLG_FREE) { + erts_aint32_t exp = act; + act = erts_smp_atomic32_cmpxchg_relb(&port->state, + ERTS_PORT_SFLG_INITIALIZING, + exp); + if (act == exp) { + last_port_num = num; + erts_smp_spin_unlock(&get_free_port_lck); + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0); + erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */ + return num & port_num_mask; + } } - erts_smp_port_state_unlock(port); if (--tries == 0) { erts_smp_spin_unlock(&get_free_port_lck); return -1; } } - port->status = ERTS_PORT_SFLG_INITIALIZING; - ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0); - erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */ - erts_smp_port_state_unlock(port); - return num & port_num_mask; } /* @@ -284,13 +289,12 @@ erts_test_next_port(int set, Uint next) Port* port = &erts_port[num & erts_port_tab_index_mask]; - erts_smp_port_state_lock(port); + erts_aint32_t state = erts_smp_atomic32_read_nob(&port->state); - if (port->status & ERTS_PORT_SFLG_FREE) { + if (state & ERTS_PORT_SFLG_FREE) { last_port_num = num - 1; res = num & port_num_mask; } - erts_smp_port_state_unlock(port); } erts_smp_spin_unlock(&get_free_port_lck); return res; @@ -326,38 +330,37 @@ void port_cleanup(Port *prt) { #ifdef ERTS_SMP - Uint32 port_specific; erts_smp_mtx_t *mtx; +#endif +#if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK) + erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state); #endif erts_driver_t *driver; - erts_smp_port_state_lock(prt); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); driver = prt->drv_ptr; prt->drv_ptr = NULL; ASSERT(driver); - ASSERT(prt->status & ERTS_PORT_SFLG_FREE_SCHEDULED); - ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0); + ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_FREE_SCHEDULED); + ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); + ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE)); - ASSERT(prt->status & ERTS_PORT_SFLG_PORT_DEBUG); - ASSERT(!(prt->status & ERTS_PORT_SFLG_FREE)); - prt->status = ERTS_PORT_SFLG_FREE; + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0); #ifdef ERTS_SMP - - port_specific = (prt->status & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK); - mtx = prt->lock; ASSERT(mtx); prt->lock = NULL; - erts_smp_port_state_unlock(prt); erts_smp_mtx_unlock(mtx); +#endif - if (port_specific) { + erts_smp_atomic32_set_relb(&prt->state, ERTS_PORT_SFLG_FREE); + +#ifdef ERTS_SMP + if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) { erts_smp_mtx_destroy(mtx); erts_free(ERTS_ALC_T_PORT_LOCK, mtx); } @@ -422,13 +425,13 @@ static void stopq(Port* prt) static void setup_port(Port* prt, Eterm pid, erts_driver_t *driver, - ErlDrvData drv_data, char *name, Uint32 xstatus) + ErlDrvData drv_data, char *name, erts_aint32_t xstate) { ErtsRunQueue *runq = erts_get_runq_current(NULL); char *new_name, *old_name; #ifdef DEBUG /* Make sure the debug flags survives until port is freed */ - xstatus |= ERTS_PORT_SFLG_PORT_DEBUG; + xstate |= ERTS_PORT_SFLG_PORT_DEBUG; #endif ASSERT(runq); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -437,8 +440,6 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver, new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1); sys_strcpy(new_name, name); erts_smp_runq_lock(runq); - erts_smp_port_state_lock(prt); - prt->status = ERTS_PORT_SFLG_CONNECTED | xstatus; prt->snapshot = erts_smp_atomic32_read_nob(&erts_ports_snapshot); old_name = prt->name; prt->name = new_name; @@ -447,7 +448,8 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver, #endif ASSERT(!prt->drv_ptr); prt->drv_ptr = driver; - erts_smp_port_state_unlock(prt); + erts_smp_atomic32_set_relb(&prt->state, + ERTS_PORT_SFLG_CONNECTED | xstate); erts_smp_runq_unlock(runq); #ifdef ERTS_SMP ASSERT(!prt->xports); @@ -492,7 +494,7 @@ erts_wake_process_later(Port *prt, Process *process) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt->status & ERTS_PORT_SFLGS_DEAD) + if (erts_smp_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) return; for (p = &(prt->suspended); *p != NULL; p = &((*p)->next)) @@ -522,7 +524,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ int port_num; int port_ix; ErlDrvData drv_data = 0; - Uint32 xstatus = 0; + erts_aint32_t xstate = 0; Port *port; int fpe_was_unmasked; @@ -601,11 +603,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ *error_number_ptr = BADARG; } /* Need to mark the port as free again */ - erts_smp_port_state_lock(port); - port->status = ERTS_PORT_SFLG_FREE; ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 2); erts_smp_atomic_set_nob(&port->refc, 0); - erts_smp_port_state_unlock(port); + erts_smp_atomic32_set_relb(&port->state, ERTS_PORT_SFLG_FREE); return -3; } @@ -623,7 +623,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_smp_mtx_init_x(port->lock, "port_lock", port->id); - xstatus |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; + xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } #endif @@ -637,7 +637,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_smp_mtx_lock(port->lock); #endif - setup_port(port, pid, driver, drv_data, name, xstatus); + setup_port(port, pid, driver, drv_data, name, xstate); if (IS_TRACED_FL(port, F_TRACE_PORTS)) { trace_port_open(port, @@ -737,11 +737,11 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ Process *rp; int port_num; Eterm port_id; - Uint32 xstatus = 0; + erts_aint32_t xstate = 0; ERTS_SMP_CHK_NO_PROC_LOCKS; - creator_port = erts_drvport2port(creator_port_ix); + creator_port = erts_drvport2port(creator_port_ix, NULL); if (!creator_port) return (ErlDrvTermData) -1; @@ -780,7 +780,7 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK, sizeof(erts_smp_mtx_t)); erts_smp_mtx_init_locked_x(port->lock, "port_lock", port_id); - xstatus |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; + xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } #endif @@ -793,7 +793,7 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port)); - setup_port(port, pid, driver, drv_data, name, xstatus); + setup_port(port, pid, driver, drv_data, name, xstate); port->id = port_id; erts_add_link(&(port->nlinks), LINK_PID, pid); @@ -1349,7 +1349,7 @@ void init_io(void) ERTS_TRACE_FLAGS(&erts_port[i]) = 0; erts_port[i].drv_ptr = NULL; - erts_port[i].status = ERTS_PORT_SFLG_FREE; + erts_smp_atomic32_init_nob(&erts_port[i].state, ERTS_PORT_SFLG_FREE); erts_port[i].name = NULL; erts_port[i].nlinks = NULL; erts_port[i].monitors = NULL; @@ -1593,7 +1593,7 @@ deliver_result(Eterm sender, Eterm pid, Eterm res) * len -- length of data */ -static void deliver_read_message(Port* prt, Eterm to, +static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, char *hbuf, ErlDrvSizeT hlen, char *buf, ErlDrvSizeT len, int eol) { @@ -1611,10 +1611,11 @@ static void deliver_read_message(Port* prt, Eterm to, ERTS_SMP_CHK_NO_PROC_LOCKS; need = 3 + 3 + 2*hlen; - if (prt->status & ERTS_PORT_SFLG_LINEBUF_IO) { + + if (state & ERTS_PORT_SFLG_LINEBUF_IO) { need += 3; } - if (prt->status & ERTS_PORT_SFLG_BINARY_IO && buf != NULL) { + if ((state & ERTS_PORT_SFLG_BINARY_IO) && buf != NULL) { need += PROC_BIN_SIZE; } else { need += 2*len; @@ -1630,7 +1631,7 @@ static void deliver_read_message(Port* prt, Eterm to, hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); listp = NIL; - if ((prt->status & ERTS_PORT_SFLG_BINARY_IO) == 0) { + if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) { listp = buf_to_intlist(&hp, buf, len, listp); } else if (buf != NULL) { ProcBin* pb; @@ -1661,7 +1662,7 @@ static void deliver_read_message(Port* prt, Eterm to, listp = buf_to_intlist(&hp, hbuf, hlen, listp); } - if (prt->status & ERTS_PORT_SFLG_LINEBUF_IO){ + if (state & ERTS_PORT_SFLG_LINEBUF_IO){ listp = TUPLE2(hp, (eol) ? am_eol : am_noeol, listp); hp += 3; } @@ -1686,7 +1687,8 @@ static void deliver_read_message(Port* prt, Eterm to, * Deliver all lines in a line buffer, repeats calls to * deliver_read_message, and takes the same parameters. */ -static void deliver_linebuf_message(Port* prt, Eterm to, +static void deliver_linebuf_message(Port* prt, erts_aint_t state, + Eterm to, char* hbuf, ErlDrvSizeT hlen, char *buf, ErlDrvSizeT len) { @@ -1695,7 +1697,7 @@ static void deliver_linebuf_message(Port* prt, Eterm to, if(init_linebuf_context(&lc,&(prt->linebuf), buf, len) < 0) return; while((ret = read_linebuf(&lc)) > LINEBUF_EMPTY) - deliver_read_message(prt, to, hbuf, hlen, LINEBUF_DATA(lc), + deliver_read_message(prt, state, to, hbuf, hlen, LINEBUF_DATA(lc), LINEBUF_DATALEN(lc), (ret == LINEBUF_EOL)); } @@ -1706,19 +1708,24 @@ static void deliver_linebuf_message(Port* prt, Eterm to, * Parameters: * prt - Pointer to a Port structure for this port. */ -static void flush_linebuf_messages(Port *prt) +static void flush_linebuf_messages(Port *prt, erts_aint32_t state) { LineBufContext lc; int ret; ERTS_SMP_LC_ASSERT(!prt || erts_lc_is_port_locked(prt)); - if(prt == NULL || !(prt->status & ERTS_PORT_SFLG_LINEBUF_IO)) + + if (!prt) + return; + + if (!(state & ERTS_PORT_SFLG_LINEBUF_IO)) return; if(init_linebuf_context(&lc,&(prt->linebuf), NULL, 0) < 0) return; while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY) deliver_read_message(prt, + state, prt->connected, NULL, 0, @@ -1747,6 +1754,7 @@ deliver_vec_message(Port* prt, /* Port */ ErlOffHeap *ohp; ErtsProcLocks rp_locks = 0; int scheduler = erts_get_scheduler_id() != 0; + erts_aint32_t state; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -1762,12 +1770,13 @@ deliver_vec_message(Port* prt, /* Port */ if (!rp) return; + state = erts_smp_atomic32_read_nob(&prt->state); /* * Calculate the exact number of heap words needed. */ need = 3 + 3; /* Heap space for two tuples */ - if (prt->status & ERTS_PORT_SFLG_BINARY_IO) { + if (state & ERTS_PORT_SFLG_BINARY_IO) { need += (2+PROC_BIN_SIZE)*vsize - 2 + hlen*2; } else { need += (hlen+csize)*2; @@ -1778,7 +1787,7 @@ deliver_vec_message(Port* prt, /* Port */ listp = NIL; iov += vsize; - if ((prt->status & ERTS_PORT_SFLG_BINARY_IO) == 0) { + if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) { Eterm* thp = hp; while (vsize--) { iov--; @@ -1864,7 +1873,7 @@ static void deliver_bin_message(Port* prt, /* port */ /* * Note. * - * The test for (p->status & ERTS_PORT_SFLGS_DEAD) == 0 is important since the + * The test for ERTS_PORT_SFLGS_DEAD is important since the * driver's flush function might call driver_async, which when using no * threads and being short circuited will notice that the io queue is empty * (after calling the driver's async_ready) and recursively call @@ -1899,7 +1908,8 @@ static void flush_port(Port *p) ASSERT(!p->xports); #endif } - if ((p->status & ERTS_PORT_SFLGS_DEAD) == 0 && is_port_ioq_empty(p)) { + if ((erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0 + && is_port_ioq_empty(p)) { terminate_port(p); } } @@ -1911,7 +1921,7 @@ terminate_port(Port *prt) Eterm send_closed_port_id; Eterm connected_id = NIL /* Initialize to silence compiler */; erts_driver_t *drv; - int halt; + erts_aint32_t state; ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -1919,10 +1929,10 @@ terminate_port(Port *prt) ASSERT(!prt->nlinks); ASSERT(!prt->monitors); - /* prt->status may be altered by kill_port()below */ - halt = (prt->status & ERTS_PORT_SFLG_HALT) != 0; - if (prt->status & ERTS_PORT_SFLG_SEND_CLOSED) { - erts_port_status_band_set(prt, ~ERTS_PORT_SFLG_SEND_CLOSED); + /* state may be altered by kill_port() below */ + state = erts_smp_atomic32_read_band_nob(&prt->state, + ~ERTS_PORT_SFLG_SEND_CLOSED); + if (state & ERTS_PORT_SFLG_SEND_CLOSED) { send_closed_port_id = prt->id; connected_id = prt->connected; } @@ -1978,7 +1988,8 @@ terminate_port(Port *prt) * We don't want to send the closed message until after the * port has been removed from the port table (in kill_port()). */ - if (halt && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) { + if ((state & ERTS_PORT_SFLG_HALT) + && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) { erts_smp_port_unlock(prt); /* We will exit and never return */ erl_exit_flush_async(erts_halt_code, ""); } @@ -2100,6 +2111,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) { ErtsLink *lnk; Eterm rreason; + erts_aint32_t state; ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); @@ -2119,9 +2131,10 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) } #endif - if ((p->status & (ERTS_PORT_SFLGS_DEAD - | ERTS_PORT_SFLG_EXITING - | ERTS_PORT_SFLG_IMMORTAL)) + state = erts_smp_atomic32_read_nob(&p->state); + if ((state & (ERTS_PORT_SFLGS_DEAD + | ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_IMMORTAL)) || ((reason == am_normal) && ((from != p->connected) && (from != p->id)))) { return; @@ -2142,7 +2155,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) if (p->reg != NULL) (void) erts_unregister_name(NULL, 0, p, p->reg->name); - erts_port_status_bor_set(p, ERTS_PORT_SFLG_EXITING); + state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); { SweepContext sc = {p->id, rreason}; @@ -2158,17 +2171,20 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) } DRV_MONITOR_UNLOCK_PDL(p); - if ((p->status & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) { + if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) { erts_do_net_exits(p->dist_entry, rreason); erts_deref_dist_entry(p->dist_entry); - p->dist_entry = NULL; - erts_port_status_band_set(p, ~ERTS_PORT_SFLG_DISTRIBUTION); + p->dist_entry = NULL; + erts_smp_atomic32_read_band_relb(&p->state, + ~ERTS_PORT_SFLG_DISTRIBUTION); } if ((reason != am_kill) && !is_port_ioq_empty(p)) { - erts_port_status_bandor_set(p, - ~ERTS_PORT_SFLG_EXITING, /* must turn it off */ - ERTS_PORT_SFLG_CLOSING); + /* must turn exiting flag off */ + erts_smp_atomic32_read_bset_relb(&p->state, + (ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_CLOSING), + ERTS_PORT_SFLG_CLOSING); flush_port(p); } else { @@ -2217,7 +2233,8 @@ void erts_port_command(Process *proc, if ((pid = port->connected) == tp[1]) { /* PID must be connected */ if (tp[2] == am_close) { - erts_port_status_bor_set(port, ERTS_PORT_SFLG_SEND_CLOSED); + erts_smp_atomic32_read_bor_relb(&port->state, + ERTS_PORT_SFLG_SEND_CLOSED); erts_do_exit_port(port, pid, am_normal); #ifdef USE_VM_PROBES @@ -2443,13 +2460,14 @@ void print_port_info(int to, void *arg, int i) { Port* p = &erts_port[i]; + erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); - if (p->status & ERTS_PORT_SFLGS_DEAD) + if (state & ERTS_PORT_SFLGS_DEAD) return; erts_print(to, arg, "=port:%T\n", p->id); erts_print(to, arg, "Slot: %d\n", i); - if (p->status & ERTS_PORT_SFLG_CONNECTED) { + if (state & ERTS_PORT_SFLG_CONNECTED) { erts_print(to, arg, "Connected: %T", p->connected); erts_print(to, arg, "\n"); } @@ -2497,8 +2515,8 @@ set_busy_port(ErlDrvPort port_num, int on) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num])); if (on) { - erts_port_status_bor_set(&erts_port[port_num], - ERTS_PORT_SFLG_PORT_BUSY); + erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state, + ERTS_PORT_SFLG_PORT_BUSY); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { erts_snprintf(port_str, sizeof(port_str), @@ -2508,8 +2526,8 @@ set_busy_port(ErlDrvPort port_num, int on) #endif } else { ErtsProcList* plp = erts_port[port_num].suspended; - erts_port_status_band_set(&erts_port[port_num], - ~ERTS_PORT_SFLG_PORT_BUSY); + erts_smp_atomic32_read_band_relb(&erts_port[port_num].state, + ~ERTS_PORT_SFLG_PORT_BUSY); erts_port[port_num].suspended = NULL; #ifdef USE_VM_PROBES @@ -2576,19 +2594,27 @@ void set_port_control_flags(ErlDrvPort port_num, int flags) erts_port[port_num].control_flags = flags; } -int get_port_flags(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); +int get_port_flags(ErlDrvPort ix) +{ + int flags; + Port *prt; + erts_aint32_t state; + + prt = erts_drvport2port(ix, &state); + if (!prt) + return 0; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt == NULL) - return 0; + flags = 0; + if (state & ERTS_PORT_SFLG_BINARY_IO) + flags |= PORT_FLAG_BINARY; + if (state & ERTS_PORT_SFLG_LINEBUF_IO) + flags |= PORT_FLAG_LINE; - return (prt->status & ERTS_PORT_SFLG_BINARY_IO ? PORT_FLAG_BINARY : 0) - | (prt->status & ERTS_PORT_SFLG_LINEBUF_IO ? PORT_FLAG_LINE : 0); + return flags; } - void erts_raw_port_command(Port* p, byte* buf, Uint len) { int fpe_was_unmasked; @@ -2624,8 +2650,9 @@ int async_ready(Port *p, void* data) ERTS_SMP_CHK_NO_PROC_LOCKS; if (p) { + erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); - ASSERT(!(p->status & ERTS_PORT_SFLGS_DEAD)); + ASSERT(!(state & ERTS_PORT_SFLGS_DEAD)); if (p->drv_ptr->ready_async != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_ready_async)) { @@ -2641,7 +2668,7 @@ int async_ready(Port *p, void* data) ASSERT(!p->xports); #endif } - if ((p->status & ERTS_PORT_SFLG_CLOSING) && is_port_ioq_empty(p)) { + if ((state & ERTS_PORT_SFLG_CLOSING) && is_port_ioq_empty(p)) { terminate_port(p); } } @@ -2736,7 +2763,7 @@ erts_get_port_names(Eterm id) pnp_len = sizeof(ErtsPortNames) + len; pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len); } - erts_smp_port_state_lock(prt); + erts_smp_port_minor_lock(prt); if (id != prt->id) { len = nlen = 0; name = driver_name = NULL; @@ -2764,7 +2791,7 @@ erts_get_port_names(Eterm id) } do_realloc = 0; } - erts_smp_port_state_unlock(prt); + erts_smp_port_minor_unlock(prt); } while (do_realloc); } return pnp; @@ -2803,7 +2830,7 @@ ErlDrvTermData driver_mk_term_nil(void) void driver_report_exit(int ix, int status) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); Eterm* hp; Eterm tuple; Process *rp; @@ -2852,14 +2879,13 @@ deliver_term_check_port(ErlDrvPort drvport) res = -1; /* invalid */ else { Port* prt = &erts_port[ix]; - erts_smp_port_state_lock(prt); - if (!(prt->status & ERTS_PORT_SFLGS_INVALID_LOOKUP)) + erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state); + if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) res = 1; /* ok */ - else if (prt->status & ERTS_PORT_SFLG_CLOSING) + else if (state & ERTS_PORT_SFLG_CLOSING) res = 0; /* closing */ else res = -1; /* invalid (dead) */ - erts_smp_port_state_unlock(prt); } return res; } @@ -3428,7 +3454,7 @@ driver_deliver_term(ErlDrvPort port, int driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -3454,19 +3480,20 @@ driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, ErlDrvBinary* bin, ErlDrvSizeT offs, ErlDrvSizeT len) { - Port* prt = erts_drvport2port(ix); + erts_aint32_t state; + Port* prt = erts_drvport2port(ix, &state); ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt->status & ERTS_PORT_SFLG_CLOSING) + if (state & ERTS_PORT_SFLG_CLOSING) return 0; prt->bytes_in += (hlen + len); erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len)); - if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) { + if (state & ERTS_PORT_SFLG_DISTRIBUTION) { return erts_net_message(prt, prt->dist_entry, (byte*) hbuf, hlen, @@ -3488,7 +3515,8 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, char* buf, ErlDrvSizeT len) { - Port* prt = erts_drvport2port(ix); + erts_aint32_t state; + Port* prt = erts_drvport2port(ix, &state); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -3497,12 +3525,12 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt->status & ERTS_PORT_SFLG_CLOSING) + if (state & ERTS_PORT_SFLG_CLOSING) return 0; prt->bytes_in += (hlen + len); erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len)); - if (prt->status & ERTS_PORT_SFLG_DISTRIBUTION) { + if (state & ERTS_PORT_SFLG_DISTRIBUTION) { if (len == 0) return erts_net_message(prt, prt->dist_entry, @@ -3514,10 +3542,10 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, (byte*) hbuf, hlen, (byte*) buf, len); } - else if(prt->status & ERTS_PORT_SFLG_LINEBUF_IO) - deliver_linebuf_message(prt, prt->connected, hbuf, hlen, buf, len); + else if (state & ERTS_PORT_SFLG_LINEBUF_IO) + deliver_linebuf_message(prt, state, prt->connected, hbuf, hlen, buf, len); else - deliver_read_message(prt, prt->connected, hbuf, hlen, buf, len, 0); + deliver_read_message(prt, state, prt->connected, hbuf, hlen, buf, len, 0); return 0; } @@ -3538,6 +3566,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, SysIOVec* iov; ErlDrvBinary** binv; Port* prt; + erts_aint32_t state; ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -3550,13 +3579,13 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, if (hlen < 0) hlen = 0; - prt = erts_drvport2port(ix); + prt = erts_drvport2port(ix, &state); if (prt == NULL) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (prt->status & ERTS_PORT_SFLG_CLOSING) + if (state & ERTS_PORT_SFLG_CLOSING) return 0; /* size > 0 ! */ @@ -3826,7 +3855,7 @@ ErlDrvPDL driver_pdl_create(ErlDrvPort dp) { ErlDrvPDL pdl; - Port *pp = erts_drvport2port(dp); + Port *pp = erts_drvport2port(dp, NULL); if (!pp || pp->port_data_lock) return NULL; pdl = erts_alloc(ERTS_ALC_T_PORT_DATA_LOCK, @@ -4283,7 +4312,7 @@ drv_cancel_timer(Port *prt) int driver_set_timer(ErlDrvPort ix, unsigned long t) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -4310,7 +4339,7 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t) int driver_cancel_timer(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); if (prt == NULL) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -4322,7 +4351,7 @@ int driver_cancel_timer(ErlDrvPort ix) int driver_read_timer(ErlDrvPort ix, unsigned long* t) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -4400,8 +4429,10 @@ int driver_monitor_process(ErlDrvPort port, { Port *prt; int ret; - Uint32 status; + erts_aint32_t state; +#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); +#endif int ix = (int) port; if (ix < 0 || erts_max_ports <= ix) { return -1; @@ -4410,15 +4441,9 @@ int driver_monitor_process(ErlDrvPort port, DRV_MONITOR_LOCK_PDL(prt); - if (sched) { - status = erts_port[ix].status; - } else { - erts_smp_port_state_lock(prt); - status = erts_port[ix].status; - erts_smp_port_state_unlock(prt); - } + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); - if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); return -1; } @@ -4488,8 +4513,10 @@ int driver_demonitor_process(ErlDrvPort port, { Port *prt; int ret; - Uint32 status; + erts_aint32_t state; +#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); +#endif int ix = (int) port; if (ix < 0 || erts_max_ports <= ix) { return -1; @@ -4498,15 +4525,9 @@ int driver_demonitor_process(ErlDrvPort port, DRV_MONITOR_LOCK_PDL(prt); - if (sched) { - status = erts_port[ix].status; - } else { - erts_smp_port_state_lock(prt); - status = erts_port[ix].status; - erts_smp_port_state_unlock(prt); - } + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); - if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); return -1; } @@ -4558,8 +4579,10 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, { Port *prt; ErlDrvTermData ret; - Uint32 status; + erts_aint32_t state; +#if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); +#endif int ix = (int) port; if (ix < 0 || erts_max_ports <= ix) { return driver_term_nil; @@ -4568,15 +4591,8 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, DRV_MONITOR_LOCK_PDL(prt); - if (sched) { - status = erts_port[ix].status; - } else { - erts_smp_port_state_lock(prt); - status = erts_port[ix].status; - erts_smp_port_state_unlock(prt); - } - - if (status & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { + state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); return driver_term_nil; } @@ -4651,7 +4667,8 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof) { - Port* prt = erts_drvport2port(ix); + erts_aint32_t state; + Port* prt = erts_drvport2port(ix, &state); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -4659,10 +4676,10 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); if (eof) - flush_linebuf_messages(prt); - if (prt->status & ERTS_PORT_SFLG_CLOSING) { + flush_linebuf_messages(prt, state); + if (state & ERTS_PORT_SFLG_CLOSING) { terminate_port(prt); - } else if (eof && (prt->status & ERTS_PORT_SFLG_SOFT_EOF)) { + } else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) { deliver_result(prt->id, prt->connected, am_eof); } else { /* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */ @@ -4684,7 +4701,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) */ int driver_exit(ErlDrvPort ix, int err) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); Process* rp; ErtsLink *lnk, *rlnk = NULL; @@ -4757,14 +4774,14 @@ ErlDrvTermData driver_mk_atom(char* string) ErlDrvTermData driver_mk_port(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); return (ErlDrvTermData) prt->id; } ErlDrvTermData driver_connected(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return NIL; @@ -4774,7 +4791,7 @@ ErlDrvTermData driver_connected(ErlDrvPort ix) ErlDrvTermData driver_caller(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return NIL; @@ -4784,7 +4801,7 @@ ErlDrvTermData driver_caller(ErlDrvPort ix) int driver_lock_driver(ErlDrvPort ix) { - Port* prt = erts_drvport2port(ix); + Port* prt = erts_drvport2port(ix, NULL); DE_Handle* dh; ERTS_SMP_CHK_NO_PROC_LOCKS; -- cgit v1.2.3 From b434a3ab242dde66e23a72122474854f51a61eff Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 8 Aug 2012 02:20:05 +0200 Subject: Prepare for use of ptab functionality also for ports --- erts/emulator/beam/io.c | 163 ++++++++++++++++++++++++------------------------ 1 file changed, 82 insertions(+), 81 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 00ecf1fca2..b772783f64 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -257,8 +257,8 @@ get_free_port(void) if (act == exp) { last_port_num = num; erts_smp_spin_unlock(&get_free_port_lck); - ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0); - erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */ + ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 0); + erts_smp_atomic32_set_nob(&port->common.refc, 2); /* Port alive + lock */ return num & port_num_mask; } } @@ -300,7 +300,6 @@ erts_test_next_port(int set, Uint next) return res; } - static void port_cleanup(Port *prt); #ifdef ERTS_SMP @@ -346,7 +345,7 @@ port_cleanup(Port *prt) ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE)); - ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0); + ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) == 0); #ifdef ERTS_SMP mtx = prt->lock; @@ -464,17 +463,17 @@ setup_port(Port* prt, Eterm pid, erts_driver_t *driver, prt->bytes_in = 0; prt->bytes_out = 0; prt->dist_entry = NULL; - prt->reg = NULL; + prt->common.u.alive.reg = NULL; #ifdef ERTS_SMP - prt->ptimer = NULL; + prt->common.u.alive.ptimer = NULL; #else - sys_memset(&prt->tm, 0, sizeof(ErlTimer)); + sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer)); #endif erts_port_task_handle_init(&prt->timeout_task); prt->suspended = NULL; sys_strcpy(prt->name, name); - prt->nlinks = NULL; - prt->monitors = NULL; + ERTS_P_LINKS(prt) = NULL; + ERTS_P_MONITORS(prt) = NULL; prt->linebuf = NULL; prt->bp = NULL; prt->data = am_undefined; @@ -542,7 +541,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ port_ix = port_num & erts_port_tab_index_mask; port = &erts_port[port_ix]; - port->id = make_internal_port(port_num); + port->common.id = make_internal_port(port_num); erts_smp_mtx_lock(&erts_driver_list_lock); if (!driver) { @@ -603,8 +602,8 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ *error_number_ptr = BADARG; } /* Need to mark the port as free again */ - ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 2); - erts_smp_atomic_set_nob(&port->refc, 0); + ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 2); + erts_smp_atomic32_set_nob(&port->common.refc, 0); erts_smp_atomic32_set_relb(&port->state, ERTS_PORT_SFLG_FREE); return -3; } @@ -622,7 +621,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ sizeof(erts_smp_mtx_t)); erts_smp_mtx_init_x(port->lock, "port_lock", - port->id); + port->common.id); xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } #endif @@ -686,9 +685,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ * Must clean up the port. */ #ifdef ERTS_SMP - erts_cancel_smp_ptimer(port->ptimer); + erts_cancel_smp_ptimer(port->common.u.alive.ptimer); #else - erts_cancel_timer(&(port->tm)); + erts_cancel_timer(&(port->common.u.alive.tm)); #endif stopq(port); kill_port(port); @@ -794,10 +793,10 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port)); setup_port(port, pid, driver, drv_data, name, xstate); - port->id = port_id; + port->common.id = port_id; - erts_add_link(&(port->nlinks), LINK_PID, pid); - erts_add_link(&(rp->nlinks), LINK_PID, port_id); + erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid); + erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port_id); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); return port_num & erts_port_tab_index_mask; } @@ -1339,7 +1338,7 @@ void init_io(void) for (i = 0; i < erts_max_ports; i++) { erts_port_task_init_sched(&erts_port[i].sched); - erts_smp_atomic_init_nob(&erts_port[i].refc, 0); + erts_smp_atomic32_init_nob(&erts_port[i].common.refc, 0); #ifdef ERTS_SMP erts_port[i].lock = NULL; erts_port[i].xports = NULL; @@ -1351,8 +1350,8 @@ void init_io(void) erts_port[i].drv_ptr = NULL; erts_smp_atomic32_init_nob(&erts_port[i].state, ERTS_PORT_SFLG_FREE); erts_port[i].name = NULL; - erts_port[i].nlinks = NULL; - erts_port[i].monitors = NULL; + ERTS_P_LINKS(&erts_port[i]) = NULL; + ERTS_P_MONITORS(&erts_port[i]) = NULL; erts_port[i].linebuf = NULL; erts_port[i].port_data_lock = NULL; } @@ -1669,7 +1668,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, tuple = TUPLE2(hp, am_data, listp); hp += 3; - tuple = TUPLE2(hp, prt->id, tuple); + tuple = TUPLE2(hp, prt->common.id, tuple); hp += 3; erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined @@ -1840,7 +1839,7 @@ deliver_vec_message(Port* prt, /* Port */ tuple = TUPLE2(hp, am_data, listp); hp += 3; - tuple = TUPLE2(hp, prt->id, tuple); + tuple = TUPLE2(hp, prt->common.id, tuple); hp += 3; erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined @@ -1926,14 +1925,14 @@ terminate_port(Port *prt) ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - ASSERT(!prt->nlinks); - ASSERT(!prt->monitors); + ASSERT(!ERTS_P_LINKS(prt)); + ASSERT(!ERTS_P_MONITORS(prt)); /* state may be altered by kill_port() below */ state = erts_smp_atomic32_read_band_nob(&prt->state, ~ERTS_PORT_SFLG_SEND_CLOSED); if (state & ERTS_PORT_SFLG_SEND_CLOSED) { - send_closed_port_id = prt->id; + send_closed_port_id = prt->common.id; connected_id = prt->connected; } else { @@ -1941,9 +1940,9 @@ terminate_port(Port *prt) } #ifdef ERTS_SMP - erts_cancel_smp_ptimer(prt->ptimer); + erts_cancel_smp_ptimer(prt->common.u.alive.ptimer); #else - erts_cancel_timer(&prt->tm); + erts_cancel_timer(&prt->common.u.alive.tm); #endif drv = prt->drv_ptr; @@ -2016,7 +2015,7 @@ static void sweep_one_monitor(ErtsMonitor *mon, void *vpsc) if (!rp) { goto done; } - rmon = erts_remove_monitor(&(rp->monitors),mon->ref); + rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); if (rmon == NULL) { goto done; @@ -2070,7 +2069,7 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) ASSERT(is_internal_pid(lnk->pid)); rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); if (rp) { - ErtsLink *rlnk = erts_remove_link(&(rp->nlinks), psc->port); + ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), psc->port); if (rlnk) { int xres = erts_send_exit_signal(NULL, @@ -2136,7 +2135,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) | ERTS_PORT_SFLG_EXITING | ERTS_PORT_SFLG_IMMORTAL)) || ((reason == am_normal) && - ((from != p->connected) && (from != p->id)))) { + ((from != p->connected) && (from != p->common.id)))) { return; } @@ -2144,29 +2143,29 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) trace_port(p, am_closed, reason); } - erts_trace_check_exiting(p->id); + erts_trace_check_exiting(p->common.id); /* * Setting the port to not busy here, frees the list of pending * processes and makes them runnable. */ - set_busy_port((ErlDrvPort)internal_port_index(p->id), 0); + set_busy_port((ErlDrvPort)internal_port_index(p->common.id), 0); - if (p->reg != NULL) - (void) erts_unregister_name(NULL, 0, p, p->reg->name); + if (p->common.u.alive.reg != NULL) + (void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name); state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); { - SweepContext sc = {p->id, rreason}; - lnk = p->nlinks; - p->nlinks = NULL; + SweepContext sc = {p->common.id, rreason}; + lnk = ERTS_P_LINKS(p); + ERTS_P_LINKS(p) = NULL; erts_sweep_links(lnk, &sweep_one_link, &sc); } DRV_MONITOR_LOCK_PDL(p); { - ErtsMonitor *moni = p->monitors; - p->monitors = NULL; + ErtsMonitor *moni = ERTS_P_MONITORS(p); + ERTS_P_MONITORS(p) = NULL; erts_sweep_monitors(moni, &sweep_one_monitor, NULL); } DRV_MONITOR_UNLOCK_PDL(p); @@ -2226,7 +2225,7 @@ void erts_port_command(Process *proc, erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_MAIN); ERTS_SMP_CHK_NO_PROC_LOCKS; - ASSERT(!INVALID_PORT(port, port->id)); + ASSERT(!INVALID_PORT(port, port->common.id)); if (is_tuple_arity(command, 2)) { tp = tuple_val(command); @@ -2257,7 +2256,7 @@ void erts_port_command(Process *proc, } #endif port->connected = tp[2]; - deliver_result(port->id, pid, am_connected); + deliver_result(port->common.id, pid, am_connected); goto done; } } @@ -2270,7 +2269,7 @@ void erts_port_command(Process *proc, port->connected, rp_locks); if (rp) { (void) erts_send_exit_signal(NULL, - port->id, + port->common.id, rp, &rp_locks, am_badsig, @@ -2351,7 +2350,7 @@ erts_port_control(Process* p, Port* prt, Uint command, Eterm iolist) } } - prt->caller = p->id; /* Internal pid */ + prt->caller = p->common.id; /* Internal pid */ erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -2465,32 +2464,32 @@ print_port_info(int to, void *arg, int i) if (state & ERTS_PORT_SFLGS_DEAD) return; - erts_print(to, arg, "=port:%T\n", p->id); + erts_print(to, arg, "=port:%T\n", p->common.id); erts_print(to, arg, "Slot: %d\n", i); if (state & ERTS_PORT_SFLG_CONNECTED) { erts_print(to, arg, "Connected: %T", p->connected); erts_print(to, arg, "\n"); } - if (p->nlinks != NULL) { + if (ERTS_P_LINKS(p)) { prt_one_lnk_data prtd; prtd.to = to; prtd.arg = arg; erts_print(to, arg, "Links: "); - erts_doforall_links(p->nlinks, &prt_one_lnk, &prtd); + erts_doforall_links(ERTS_P_LINKS(p), &prt_one_lnk, &prtd); erts_print(to, arg, "\n"); } - if (p->monitors != NULL) { + if (ERTS_P_MONITORS(p)) { prt_one_lnk_data prtd; prtd.to = to; prtd.arg = arg; erts_print(to, arg, "Monitors: "); - erts_doforall_monitors(p->monitors, &prt_one_monitor, &prtd); + erts_doforall_monitors(ERTS_P_MONITORS(p), &prt_one_monitor, &prtd); erts_print(to, arg, "\n"); } - if (p->reg != NULL) - erts_print(to, arg, "Registered as: %T\n", p->reg->name); + if (p->common.u.alive.reg != NULL) + erts_print(to, arg, "Registered as: %T\n", p->common.u.alive.reg->name); if (p->drv_ptr == &fd_driver) { erts_print(to, arg, "Port is UNIX fd not opened by emulator: %s\n", p->name); @@ -2678,12 +2677,12 @@ int async_ready(Port *p, void* data) static void report_missing_drv_callback(Port *p, char *drv_type, char *callback) { - ErtsPortNames *pnp = erts_get_port_names(p->id); + ErtsPortNames *pnp = erts_get_port_names(p->common.id); char *unknown = ""; char *drv_name = pnp->driver_name ? pnp->driver_name : unknown; char *prt_name = pnp->name ? pnp->name : unknown; erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); - erts_dsprintf(dsbufp, "%T: %s driver '%s' ", p->id, drv_type, drv_name); + erts_dsprintf(dsbufp, "%T: %s driver '%s' ", p->common.id, drv_type, drv_name); if (sys_strcmp(drv_name, prt_name) != 0) erts_dsprintf(dsbufp, "(%s) ", prt_name); erts_dsprintf(dsbufp, "does not implement the %s callback!\n", callback); @@ -2764,7 +2763,7 @@ erts_get_port_names(Eterm id) pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len); } erts_smp_port_minor_lock(prt); - if (id != prt->id) { + if (id != prt->common.id) { len = nlen = 0; name = driver_name = NULL; } @@ -2816,7 +2815,7 @@ static void schedule_port_timeout(Port *p) * /Rickard */ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); - (void) erts_port_task_schedule(p->id, + (void) erts_port_task_schedule(p->common.id, &p->timeout_task, ERTS_PORT_TASK_TIMEOUT, (ErlDrvEvent) -1, @@ -2856,7 +2855,7 @@ void driver_report_exit(int ix, int status) tuple = TUPLE2(hp, am_exit_status, make_small(status)); hp += 3; - tuple = TUPLE2(hp, prt->id, tuple); + tuple = TUPLE2(hp, prt->common.id, tuple); erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined #ifdef USE_VM_PROBES @@ -3864,7 +3863,7 @@ driver_pdl_create(ErlDrvPort dp) pdl_init_refc(pdl); pp->port_data_lock = pdl; #ifdef HARDDEBUG - erts_fprintf(stderr, "driver_pdl_create(%T) -> 0x%08X\r\n",pp->id,(unsigned) pdl); + erts_fprintf(stderr, "driver_pdl_create(%T) -> 0x%08X\r\n",pp->common.id,(unsigned) pdl); #endif return pdl; } @@ -4302,12 +4301,12 @@ static ERTS_INLINE void drv_cancel_timer(Port *prt) { #ifdef ERTS_SMP - erts_cancel_smp_ptimer(prt->ptimer); + erts_cancel_smp_ptimer(prt->common.u.alive.ptimer); #else - erts_cancel_timer(&prt->tm); + erts_cancel_timer(&prt->common.u.alive.tm); #endif if (erts_port_task_is_scheduled(&prt->timeout_task)) - erts_port_task_abort(prt->id, &prt->timeout_task); + erts_port_task_abort(prt->common.id, &prt->timeout_task); } int driver_set_timer(ErlDrvPort ix, unsigned long t) @@ -4323,12 +4322,12 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t) return -1; drv_cancel_timer(prt); #ifdef ERTS_SMP - erts_create_smp_ptimer(&prt->ptimer, - prt->id, + erts_create_smp_ptimer(&prt->common.u.alive.ptimer, + prt->common.id, (ErlTimeoutProc) schedule_port_timeout, t); #else - erts_set_timer(&prt->tm, + erts_set_timer(&prt->common.u.alive.tm, (ErlTimeoutProc) schedule_port_timeout, NULL, prt, @@ -4359,9 +4358,11 @@ driver_read_timer(ErlDrvPort ix, unsigned long* t) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); #ifdef ERTS_SMP - *t = prt->ptimer ? erts_time_left(&prt->ptimer->timer.tm) : 0; + *t = (prt->common.u.alive.ptimer + ? erts_time_left(&prt->common.u.alive.ptimer->timer.tm) + : 0); #else - *t = erts_time_left(&prt->tm); + *t = erts_time_left(&prt->common.u.alive.tm); #endif return 0; } @@ -4412,8 +4413,8 @@ static int do_driver_monitor_process(Port *prt, } ref = erts_make_ref_in_buffer(buf); - erts_add_monitor(&(prt->monitors), MON_ORIGIN, ref, rp->id, NIL); - erts_add_monitor(&(rp->monitors), MON_TARGET, ref, prt->id, NIL); + erts_add_monitor(&ERTS_P_MONITORS(prt), MON_ORIGIN, ref, rp->common.id, NIL); + erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, prt->common.id, NIL); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); ref_to_driver_monitor(ref,monitor); @@ -4481,7 +4482,7 @@ static int do_driver_demonitor_process(Port *prt, Eterm *buf, memcpy(buf,monitor,sizeof(Eterm)*REF_THING_SIZE); ref = make_internal_ref(buf); - mon = erts_lookup_monitor(prt->monitors, ref); + mon = erts_lookup_monitor(ERTS_P_MONITORS(prt), ref); if (mon == NULL) { return 1; } @@ -4493,13 +4494,13 @@ static int do_driver_demonitor_process(Port *prt, Eterm *buf, to, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); - mon = erts_remove_monitor(&(prt->monitors), ref); + mon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref); if (mon) { erts_destroy_monitor(mon); } if (rp) { ErtsMonitor *rmon; - rmon = erts_remove_monitor(&(rp->monitors), ref); + rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); if (rmon != NULL) { erts_destroy_monitor(rmon); @@ -4563,7 +4564,7 @@ static ErlDrvTermData do_driver_get_monitored_process(Port *prt, Eterm *buf, memcpy(buf,monitor,sizeof(Eterm)*REF_THING_SIZE); ref = make_internal_ref(buf); - mon = erts_lookup_monitor(prt->monitors, ref); + mon = erts_lookup_monitor(ERTS_P_MONITORS(prt), ref); if (mon == NULL) { return driver_term_nil; } @@ -4637,7 +4638,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); ASSERT(prt->drv_ptr != NULL); DRV_MONITOR_LOCK_PDL(prt); - if (erts_lookup_monitor(prt->monitors,ref) == NULL) { + if (erts_lookup_monitor(ERTS_P_MONITORS(prt), ref) == NULL) { DRV_MONITOR_UNLOCK_PDL(prt); return; } @@ -4656,7 +4657,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) erts_unblock_fpe(fpe_was_unmasked); DRV_MONITOR_LOCK_PDL(prt); /* remove monitor *after* callback */ - rmon = erts_remove_monitor(&(prt->monitors),ref); + rmon = erts_remove_monitor(&ERTS_P_MONITORS(prt), ref); DRV_MONITOR_UNLOCK_PDL(prt); if (rmon) { erts_destroy_monitor(rmon); @@ -4680,7 +4681,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) if (state & ERTS_PORT_SFLG_CLOSING) { terminate_port(prt); } else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) { - deliver_result(prt->id, prt->connected, am_eof); + deliver_result(prt->common.id, prt->connected, am_eof); } else { /* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */ if (prt->port_data_lock) @@ -4688,7 +4689,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) prt->ioq.size = 0; if (prt->port_data_lock) driver_pdl_unlock(prt->port_data_lock); - erts_do_exit_port(prt, prt->id, eof ? am_normal : term); + erts_do_exit_port(prt, prt->common.id, eof ? am_normal : term); } return 0; } @@ -4714,10 +4715,10 @@ int driver_exit(ErlDrvPort ix, int err) rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK); if (rp) { - rlnk = erts_remove_link(&(rp->nlinks),prt->id); + rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id); } - lnk = erts_remove_link(&(prt->nlinks),prt->connected); + lnk = erts_remove_link(&ERTS_P_LINKS(prt),prt->connected); #ifdef ERTS_SMP if (rp) @@ -4776,7 +4777,7 @@ ErlDrvTermData driver_mk_port(ErlDrvPort ix) { Port* prt = erts_drvport2port(ix, NULL); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - return (ErlDrvTermData) prt->id; + return (ErlDrvTermData) prt->common.id; } ErlDrvTermData driver_connected(ErlDrvPort ix) @@ -5006,7 +5007,7 @@ no_event_callback(ErlDrvData drv_data, ErlDrvEvent event, ErlDrvEventData event_ { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Event", "event()"); - driver_event((ErlDrvPort) internal_port_index(prt->id), event, NULL); + driver_event((ErlDrvPort) internal_port_index(prt->common.id), event, NULL); } static void @@ -5014,7 +5015,7 @@ no_ready_input_callback(ErlDrvData drv_data, ErlDrvEvent event) { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Input", "ready_input()"); - driver_select((ErlDrvPort) internal_port_index(prt->id), event, + driver_select((ErlDrvPort) internal_port_index(prt->common.id), event, (ERL_DRV_READ | ERL_DRV_USE_NO_CALLBACK), 0); } @@ -5023,7 +5024,7 @@ no_ready_output_callback(ErlDrvData drv_data, ErlDrvEvent event) { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Output", "ready_output()"); - driver_select((ErlDrvPort) internal_port_index(prt->id), event, + driver_select((ErlDrvPort) internal_port_index(prt->common.id), event, (ERL_DRV_WRITE | ERL_DRV_USE_NO_CALLBACK), 0); } -- cgit v1.2.3 From 50cb7c24f061fd3d7df5970d8202f47c470a4047 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 15 Aug 2012 01:28:55 +0200 Subject: Use ptab functionality also for ports --- erts/emulator/beam/io.c | 876 +++++++++++++++++++++--------------------------- 1 file changed, 383 insertions(+), 493 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index b772783f64..4dcec356a9 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -57,15 +57,14 @@ static erts_smp_tsd_key_t driver_list_lock_status_key; /*stop recursive locks wh static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error on a per thread basis (for BC interfaces) */ -Port* erts_port; /* The port table */ +ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */ erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */ erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */ -Uint erts_max_ports; -Uint erts_port_tab_index_mask; - const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL; +const Port erts_invalid_port = {{ERTS_INVALID_PORT}}; + erts_driver_t vanilla_driver; erts_driver_t spawn_driver; erts_driver_t fd_driver; @@ -89,36 +88,12 @@ static void driver_monitor_unlock_pdl(Port *p); static ERTS_INLINE ErlIOQueue* drvport2ioq(ErlDrvPort drvport) { - int ix = (int) drvport; - erts_aint32_t state; - - if (ix < 0 || erts_max_ports <= ix) - return NULL; - - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); - -#ifdef ERTS_ENABLE_LOCK_CHECK - - if (erts_get_scheduler_data()) { - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[ix])); - ERTS_LC_ASSERT(!erts_port[ix].port_data_lock - || erts_lc_mtx_is_locked( - &erts_port[ix].port_data_lock->mtx)); - } - else { - ERTS_LC_ASSERT((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) - || erts_port[ix].port_data_lock); - ERTS_LC_ASSERT(!erts_port[ix].port_data_lock - || erts_lc_mtx_is_locked( - &erts_port[ix].port_data_lock->mtx)); - } - -#endif - + Port *prt = erts_thr_drvport2port_raw(drvport); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) return NULL; else - return &erts_port[ix].ioq; + return &prt->ioq; } static ERTS_INLINE int @@ -196,27 +171,13 @@ typedef struct line_buf_context { dtrace_port_str((PORT), port_str); #endif -/* The 'number' field in a port now has two parts: the lowest bits - contain the index in the port table, and the higher bits are a counter - which is incremented each time we look for a free port and start from - the beginning of the table. erts_max_ports is the number of file descriptors, - rounded up to a power of 2. - To get the index from a port, use the macro 'internal_port_index'; - 'port_number' returns the whole number field. -*/ - -static erts_smp_spinlock_t get_free_port_lck; -static Uint last_port_num; -static Uint port_num_mask; -erts_smp_atomic32_t erts_ports_snapshot; /* Identifies the _next_ snapshot (not the ongoing) */ - - static ERTS_INLINE void kill_port(Port *pp) { ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); + erts_ptab_delete_element(&erts_port, &pp->common); /* Time of death */ erts_port_task_free_port(pp); - ASSERT(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD); + /* In non-smp case the port structure may have been deallocated now */ } #ifdef ERTS_SMP @@ -227,149 +188,194 @@ erts_lc_is_port_locked(Port *prt) { if (!prt) return 0; + ERTS_SMP_LC_ASSERT(prt->lock); return erts_smp_lc_mtx_is_locked(prt->lock); } #endif #endif /* #ifdef ERTS_SMP */ -static int -get_free_port(void) -{ - Uint num; - Uint tries = erts_max_ports; - Port* port; - - erts_smp_spin_lock(&get_free_port_lck); - num = last_port_num + 1; - for (;; ++num) { - erts_aint32_t act; - - port = &erts_port[num & erts_port_tab_index_mask]; - - act = erts_smp_atomic32_read_nob(&port->state); - - while (act & ERTS_PORT_SFLG_FREE) { - erts_aint32_t exp = act; - act = erts_smp_atomic32_cmpxchg_relb(&port->state, - ERTS_PORT_SFLG_INITIALIZING, - exp); - if (act == exp) { - last_port_num = num; - erts_smp_spin_unlock(&get_free_port_lck); - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 0); - erts_smp_atomic32_set_nob(&port->common.refc, 2); /* Port alive + lock */ - return num & port_num_mask; - } - } +static void initq(Port* prt); - if (--tries == 0) { - erts_smp_spin_unlock(&get_free_port_lck); - return -1; - } - } +static void insert_port_struct(void *vprt, Eterm data) +{ + Port *prt = (Port *) vprt; + Eterm id = make_internal_port(data); +#ifdef ERTS_SMP + ASSERT(prt->drv_ptr && prt->lock); + /* + * We are breaking lock order in the port specific locking + * case. This is, however, safe since the lock has not been + * published, yet. + */ + if (!prt->drv_ptr->lock) + erts_mtx_init_locked_x(prt->lock, "port_lock", id); +#endif + prt->common.id = id; + erts_atomic32_init_relb(&prt->state, ERTS_PORT_SFLG_INITIALIZING); } -/* - * erts_test_next_port() is only used for testing. - */ -Sint -erts_test_next_port(int set, Uint next) +static Port *create_port(char *name, + erts_driver_t *driver, + erts_mtx_t *driver_lock, + Eterm pid, + int *enop) { - Uint i, num; - Sint res = -1; + Port *prt; + char *p; + size_t port_size, size; + erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; +#ifdef DEBUG + /* Make sure the debug flags survives until port is freed */ + state |= ERTS_PORT_SFLG_PORT_DEBUG; +#endif - erts_smp_spin_lock(&get_free_port_lck); - if (set) { - last_port_num = (next - 1) & port_num_mask; +#ifdef ERTS_SMP + if (!driver_lock) { + /* Align size for mutex following port struct */ + port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); + size += sizeof(erts_mtx_t); } - num = last_port_num + 1; - - for (i=0; i < erts_max_ports && res<0; ++i, ++num) { - - Port* port = &erts_port[num & erts_port_tab_index_mask]; + else +#endif + port_size = size = sizeof(Port); - erts_aint32_t state = erts_smp_atomic32_read_nob(&port->state); + size += sys_strlen(name) + 1; - if (state & ERTS_PORT_SFLG_FREE) { - last_port_num = num - 1; - res = num & port_num_mask; - } + p = erts_alloc_fnf(ERTS_ALC_T_PORT, size); + if (!p) { + if (enop) + *enop = ENOMEM; + return NULL; } - erts_smp_spin_unlock(&get_free_port_lck); - return res; -} -static void port_cleanup(Port *prt); + prt = (Port *) p; + p += port_size; #ifdef ERTS_SMP + if (driver_lock) { + prt->lock = driver_lock; + erts_mtx_lock(driver_lock); + } + else { + prt->lock = (erts_mtx_t *) p; + p += sizeof(erts_mtx_t); + state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; + } + erts_smp_atomic_set_nob(&prt->run_queue, + (erts_aint_t) erts_get_runq_current(NULL)); + prt->xports = NULL; +#else + erts_atomic32_init_nob(&prt->refc, 1); + prt->cleanup = 0; +#endif + + prt->name = p; + sys_strcpy(p, name); + prt->drv_ptr = driver; + ERTS_P_LINKS(prt) = NULL; + ERTS_P_MONITORS(prt) = NULL; + prt->linebuf = NULL; + prt->bp = NULL; + prt->suspended = NULL; + prt->data = am_undefined; + prt->port_data_lock = NULL; + prt->control_flags = 0; + prt->bytes_in = 0; + prt->bytes_out = 0; + prt->dist_entry = NULL; + prt->connected = pid; + prt->common.u.alive.reg = NULL; +#ifdef ERTS_SMP + prt->common.u.alive.ptimer = NULL; +#else + sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer)); +#endif + erts_port_task_handle_init(&prt->timeout_task); + erts_port_task_init_sched(&prt->sched); + prt->psd = NULL; + prt->drv_data = (SWord) 0; -static void -sched_port_cleanup(void *vprt) -{ - Port *prt = (Port *) vprt; - erts_smp_mtx_lock(prt->lock); - port_cleanup(prt); -} + /* Set default tracing */ + erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt)); + + ASSERT(((char *) prt) == ((char *) &prt->common)); + if (!erts_ptab_new_element(&erts_port, + &prt->common, + (void *) prt, + insert_port_struct)) { +#ifdef ERTS_SMP + if (driver_lock) + erts_mtx_unlock(driver_lock); #endif + if (enop) + *enop = 0; + return NULL; + } + + ASSERT(prt == (Port *) (erts_ptab_pix2intptr_nob( + &erts_port, + internal_port_index(prt->common.id)))); + initq(prt); + + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + erts_atomic32_set_relb(&prt->state, state); + return prt; +} + +#ifndef ERTS_SMP void erts_port_cleanup(Port *prt) { -#ifdef ERTS_SMP - if (erts_smp_mtx_trylock(prt->lock) == EBUSY) - erts_schedule_misc_op(sched_port_cleanup, (void *) prt); - else -#endif - port_cleanup(prt); + if (prt->drv_ptr && prt->drv_ptr->handle) + erts_ddll_dereference_driver(prt->drv_ptr->handle); + prt->drv_ptr = NULL; + erts_port_dec_refc(prt); } +#endif void -port_cleanup(Port *prt) +erts_port_free(Port *prt) { -#ifdef ERTS_SMP - erts_smp_mtx_t *mtx; -#endif #if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK) - erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); #endif - erts_driver_t *driver; - - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - driver = prt->drv_ptr; - prt->drv_ptr = NULL; - ASSERT(driver); - - ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_FREE_SCHEDULED); - ERTS_LC_ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); + ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_FREE_SCHEDULED + | ERTS_PORT_SFLG_INITIALIZING)); + ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE)); - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&prt->common.refc) == 0); - #ifdef ERTS_SMP - mtx = prt->lock; - ASSERT(mtx); - - prt->lock = NULL; - - erts_smp_mtx_unlock(mtx); + ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0); +#else + ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0); #endif - erts_smp_atomic32_set_relb(&prt->state, ERTS_PORT_SFLG_FREE); - #ifdef ERTS_SMP - if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) { - erts_smp_mtx_destroy(mtx); - erts_free(ERTS_ALC_T_PORT_LOCK, mtx); - } -#endif + ASSERT(prt->lock); + if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) + erts_mtx_destroy(prt->lock); - if (driver->handle) - erts_ddll_dereference_driver(driver->handle); + /* + * We cannot dereference a driver using driver + * locking until here in smp case. Otherwise, + * the driver lock may still be in use by others. + * + * In the non-smp case we cannot do it here since + * this function may be called by non-scheduler + * threads. This is done in erts_port_cleanup() + * in the non-smp case. + */ + if (prt->drv_ptr->handle) + erts_ddll_dereference_driver(prt->drv_ptr->handle); +#endif + erts_atomic32_set_nob(&prt->state, ERTS_PORT_SFLG_FREE); + erts_free(ERTS_ALC_T_PORT, prt); } - /* ** Initialize v_start to point to the small fixed vector. ** Once (reallocated) we never reset the pointer to the small vector @@ -416,73 +422,7 @@ static void stopq(Port* prt) if (prt->port_data_lock) { driver_pdl_unlock(prt->port_data_lock); driver_pdl_dec_refc(prt->port_data_lock); - prt->port_data_lock = NULL; - } -} - - - -static void -setup_port(Port* prt, Eterm pid, erts_driver_t *driver, - ErlDrvData drv_data, char *name, erts_aint32_t xstate) -{ - ErtsRunQueue *runq = erts_get_runq_current(NULL); - char *new_name, *old_name; -#ifdef DEBUG - /* Make sure the debug flags survives until port is freed */ - xstate |= ERTS_PORT_SFLG_PORT_DEBUG; -#endif - ASSERT(runq); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - - new_name = (char*) erts_alloc(ERTS_ALC_T_PORT_NAME, sys_strlen(name)+1); - sys_strcpy(new_name, name); - erts_smp_runq_lock(runq); - prt->snapshot = erts_smp_atomic32_read_nob(&erts_ports_snapshot); - old_name = prt->name; - prt->name = new_name; -#ifdef ERTS_SMP - erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); -#endif - ASSERT(!prt->drv_ptr); - prt->drv_ptr = driver; - erts_smp_atomic32_set_relb(&prt->state, - ERTS_PORT_SFLG_CONNECTED | xstate); - erts_smp_runq_unlock(runq); -#ifdef ERTS_SMP - ASSERT(!prt->xports); -#endif - if (old_name) { - erts_free(ERTS_ALC_T_PORT_NAME, (void *) old_name); } - - prt->control_flags = 0; - prt->connected = pid; - prt->drv_data = (SWord) drv_data; - prt->bytes_in = 0; - prt->bytes_out = 0; - prt->dist_entry = NULL; - prt->common.u.alive.reg = NULL; -#ifdef ERTS_SMP - prt->common.u.alive.ptimer = NULL; -#else - sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer)); -#endif - erts_port_task_handle_init(&prt->timeout_task); - prt->suspended = NULL; - sys_strcpy(prt->name, name); - ERTS_P_LINKS(prt) = NULL; - ERTS_P_MONITORS(prt) = NULL; - prt->linebuf = NULL; - prt->bp = NULL; - prt->data = am_undefined; - /* Set default tracing */ - erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt)); - - prt->psd = NULL; - - initq(prt); } void @@ -493,7 +433,7 @@ erts_wake_process_later(Port *prt, Process *process) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (erts_smp_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) + if (erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) return; for (p = &(prt->suspended); *p != NULL; p = &((*p)->next)) @@ -513,36 +453,34 @@ erts_wake_process_later(Port *prt, Process *process) (*error_number_ptr must contain either BADARG or SYSTEM_LIMIT). The driver start function must obey the same conventions. */ -int +Port * erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ Eterm pid, /* Current process. */ char* name, /* Driver name. */ SysDriverOpts* opts, /* Options. */ - int *error_number_ptr) /* errno in case -2 is returned */ + int *error_type_ptr, /* error type */ + int *error_number_ptr) /* errno in case of error type -2 */ { - int port_num; - int port_ix; + +#undef ERTS_OPEN_DRIVER_RET +#define ERTS_OPEN_DRIVER_RET(Prt, EType, ENo) \ + do { \ + if (error_type_ptr) \ + *error_type_ptr = (EType); \ + if (error_number_ptr) \ + *error_number_ptr = (ENo); \ + return (Prt); \ + } while (0) + ErlDrvData drv_data = 0; - erts_aint32_t xstate = 0; Port *port; int fpe_was_unmasked; - - if (error_number_ptr) - *error_number_ptr = 0; + int error_type, error_number; + int port_errno = 0; + erts_mtx_t *driver_lock = NULL; ERTS_SMP_CHK_NO_PROC_LOCKS; - if ((port_num = get_free_port()) < 0) { - if (error_number_ptr) { - *error_number_ptr = SYSTEM_LIMIT; - } - return -3; - } - - port_ix = port_num & erts_port_tab_index_mask; - port = &erts_port[port_ix]; - port->common.id = make_internal_port(port_num); - erts_smp_mtx_lock(&erts_driver_list_lock); if (!driver) { for (driver = driver_list; driver; driver = driver->next) { @@ -551,9 +489,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } if (!driver) { erts_smp_mtx_unlock(&erts_driver_list_lock); - if (error_number_ptr) - *error_number_ptr = BADARG; - return -3; + ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG); } } if (driver == &spawn_driver) { @@ -598,32 +534,11 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ if (driver == NULL || (driver != &spawn_driver && opts->exit_status)) { erts_smp_mtx_unlock(&erts_driver_list_lock); - if (error_number_ptr) { - *error_number_ptr = BADARG; - } - /* Need to mark the port as free again */ - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&port->common.refc) == 2); - erts_smp_atomic32_set_nob(&port->common.refc, 0); - erts_smp_atomic32_set_relb(&port->state, ERTS_PORT_SFLG_FREE); - return -3; + ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG); } - /* - * We'll set up the port before calling the start function, - * to allow message sending and setting timers in the start function. - */ - #ifdef ERTS_SMP - ASSERT(!port->lock); - port->lock = driver->lock; - if (!port->lock) { - port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK, - sizeof(erts_smp_mtx_t)); - erts_smp_mtx_init_x(port->lock, - "port_lock", - port->common.id); - xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; - } + driver_lock = driver->lock; #endif if (driver->handle != NULL) { @@ -632,18 +547,32 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } erts_smp_mtx_unlock(&erts_driver_list_lock); -#ifdef ERTS_SMP - erts_smp_mtx_lock(port->lock); -#endif + /* + * We'll set up the port before calling the start function, + * to allow message sending and setting timers in the start function. + */ - setup_port(port, pid, driver, drv_data, name, xstate); + port = create_port(name, driver, driver_lock, pid, &port_errno); + if (!port) { + if (driver->handle) { + erts_smp_mtx_lock(&erts_driver_list_lock); + erts_ddll_decrement_port_count(driver->handle); + erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_ddll_dereference_driver(driver->handle); + } + if (port_errno) + ERTS_OPEN_DRIVER_RET(NULL, -2, port_errno); + else + ERTS_OPEN_DRIVER_RET(NULL, -3, SYSTEM_LIMIT); + } if (IS_TRACED_FL(port, F_TRACE_PORTS)) { trace_port_open(port, pid, am_atom_put(port->name, strlen(port->name))); } - + + error_number = error_type = 0; if (driver->start) { if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(port, am_in, am_start); @@ -656,15 +585,28 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } #endif fpe_was_unmasked = erts_block_fpe(); - drv_data = (*driver->start)((ErlDrvPort)(port_ix), - name, opts); + drv_data = (*driver->start)((ErlDrvPort) port, name, opts); + if (((SWord) drv_data) == -1) + error_type = -1; + else if (((SWord) drv_data) == -2) { + /* + * We need to save errno quickly after the + * call to the 'start' callback before + * something else modify it. + */ + error_type = -2; + error_number = errno; + } + else if (((SWord) drv_data) == -3) { + error_type = -3; + error_number = BADARG; + } + erts_unblock_fpe(fpe_was_unmasked); port->caller = NIL; if (IS_TRACED_FL(port, F_TRACE_SCHED_PORTS)) { trace_sched_ports_where(port, am_out, am_start); } - if (error_number_ptr && ((SWord) drv_data) == (SWord) -2) - *error_number_ptr = errno; #ifdef ERTS_SMP if (port->xports) erts_smp_xports_unlock(port); @@ -672,15 +614,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ #endif } - if (((SWord)drv_data) == -1 || - ((SWord)drv_data) == -2 || - ((SWord)drv_data) == -3) { - int res = (int) ((SWord) drv_data); - - if (res == -3 && error_number_ptr) { - *error_number_ptr = BADARG; - } - + if (error_type) { /* * Must clean up the port. */ @@ -690,7 +624,6 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_cancel_timer(&(port->common.u.alive.tm)); #endif stopq(port); - kill_port(port); if (port->linebuf != NULL) { erts_free(ERTS_ALC_T_LINEBUF, (void *) port->linebuf); @@ -701,11 +634,14 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_ddll_decrement_port_count(driver->handle); erts_smp_mtx_unlock(&erts_driver_list_lock); } + kill_port(port); erts_port_release(port); - return res; + ERTS_OPEN_DRIVER_RET(NULL, error_type, error_number); } - port->drv_data = (SWord) drv_data; - return port_ix; + port->drv_data = (UWord) drv_data; + ERTS_OPEN_DRIVER_RET(port, 0, 0); + +#undef ERTS_OPEN_DRIVER_RET } #ifdef ERTS_SMP @@ -734,15 +670,21 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ Port* port; erts_driver_t *driver; Process *rp; - int port_num; - Eterm port_id; - erts_aint32_t xstate = 0; + erts_mtx_t *driver_lock = NULL; ERTS_SMP_CHK_NO_PROC_LOCKS; + /* Need to be called from a scheduler thread */ + if (!erts_get_scheduler_id()) + return ERTS_INVALID_ERL_DRV_PORT; + creator_port = erts_drvport2port(creator_port_ix, NULL); if (!creator_port) - return (ErlDrvTermData) -1; + return ERTS_INVALID_ERL_DRV_PORT; + + rp = erts_proc_lookup(pid); + if (!rp) + return ERTS_INVALID_ERL_DRV_PORT; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(creator_port)); @@ -750,55 +692,61 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ erts_smp_mtx_lock(&erts_driver_list_lock); if (!erts_ddll_driver_ok(driver->handle)) { erts_smp_mtx_unlock(&erts_driver_list_lock); - return (ErlDrvTermData) -1; + return ERTS_INVALID_ERL_DRV_PORT; } - rp = erts_pid2proc(NULL, 0, pid, ERTS_PROC_LOCK_LINK); - if (!rp) { - erts_smp_mtx_unlock(&erts_driver_list_lock); - return (ErlDrvTermData) -1; /* pid does not exist */ + if (driver->handle != NULL) { + erts_ddll_increment_port_count(driver->handle); + erts_ddll_reference_referenced_driver(driver->handle); + } + +#ifdef ERTS_SMP + driver_lock = driver->lock; +#endif + + erts_smp_mtx_unlock(&erts_driver_list_lock); + + port = create_port(name, driver, driver_lock, pid, NULL); + if (!port) { + if (driver->handle) { + erts_smp_mtx_lock(&erts_driver_list_lock); + erts_ddll_decrement_port_count(driver->handle); + erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_ddll_dereference_driver(driver->handle); + } + return ERTS_INVALID_ERL_DRV_PORT; } - if ((port_num = get_free_port()) < 0) { - errno = SYSTEM_LIMIT; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port)); + + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); + if (ERTS_PROC_IS_EXITING(rp)) { erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - erts_smp_mtx_unlock(&erts_driver_list_lock); - return (ErlDrvTermData) -1; + if (driver->handle) { + erts_smp_mtx_lock(&erts_driver_list_lock); + erts_ddll_decrement_port_count(driver->handle); + erts_smp_mtx_unlock(&erts_driver_list_lock); + } + kill_port(port); + erts_port_release(port); + return ERTS_INVALID_ERL_DRV_PORT; } - port_id = make_internal_port(port_num); - port = &erts_port[port_num & erts_port_tab_index_mask]; + erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid); + erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port->common.id); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); #ifdef ERTS_SMP - ASSERT(!port->lock); - port->lock = driver->lock; - if (!port->lock) { + if (!driver_lock) { ErtsXPortsList *xplp = xports_list_alloc(); xplp->port = port; xplp->next = creator_port->xports; creator_port->xports = xplp; - port->lock = erts_alloc(ERTS_ALC_T_PORT_LOCK, - sizeof(erts_smp_mtx_t)); - erts_smp_mtx_init_locked_x(port->lock, "port_lock", port_id); - xstate |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } - #endif - if (driver->handle != NULL) { - erts_ddll_increment_port_count(driver->handle); - erts_ddll_reference_referenced_driver(driver->handle); - } - erts_smp_mtx_unlock(&erts_driver_list_lock); + port->drv_data = (UWord) drv_data; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(port)); - - setup_port(port, pid, driver, drv_data, name, xstate); - port->common.id = port_id; - - erts_add_link(&ERTS_P_LINKS(port), LINK_PID, pid); - erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, port_id); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - return port_num & erts_port_tab_index_mask; + return (ErlDrvPort) port; } #ifdef ERTS_SMP @@ -1282,15 +1230,22 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) } } +#ifdef ERTS_SMP +static void +release_port(void *vport) +{ + erts_port_dec_refc((Port *) vport); +} +#endif + /* initialize the port array */ void init_io(void) { - int i; ErlDrvEntry** dp; char maxports[21]; /* enough for any 64-bit integer */ size_t maxportssize = sizeof(maxports); Uint ports_bits = ERTS_PORTS_BITS; - Sint port_extra_shift; + int port_tab_size; #ifdef ERTS_SMP init_xports_list_alloc(); @@ -1299,66 +1254,39 @@ void init_io(void) pdl_init(); if (erts_sys_getenv("ERL_MAX_PORTS", maxports, &maxportssize) == 0) - erts_max_ports = atoi(maxports); + port_tab_size = atoi(maxports); else - erts_max_ports = sys_max_files(); + port_tab_size = sys_max_files(); - if (erts_max_ports > ERTS_MAX_PORTS) - erts_max_ports = ERTS_MAX_PORTS; - if (erts_max_ports < 1024) - erts_max_ports = 1024; + if (port_tab_size > ERTS_MAX_PORTS) + port_tab_size = ERTS_MAX_PORTS; + if (port_tab_size < 1024) + port_tab_size = 1024; if (erts_use_r9_pids_ports) { ports_bits = ERTS_R9_PORTS_BITS; - if (erts_max_ports > ERTS_MAX_R9_PORTS) - erts_max_ports = ERTS_MAX_R9_PORTS; + if (port_tab_size > ERTS_MAX_R9_PORTS) + port_tab_size = ERTS_MAX_R9_PORTS; } - port_extra_shift = erts_fit_in_bits_int32(erts_max_ports - 1); - port_num_mask = (1 << ports_bits) - 1; - - erts_port_tab_index_mask = ~(~((Uint) 0) << port_extra_shift); - erts_max_ports = 1 << port_extra_shift; - erts_smp_mtx_init(&erts_driver_list_lock,"driver_list"); driver_list = NULL; erts_smp_tsd_key_create(&driver_list_lock_status_key); erts_smp_tsd_key_create(&driver_list_last_error_key); - if (erts_max_ports * sizeof(Port) <= erts_max_ports) { - /* More memory needed than the whole address space. */ - erts_alloc_enomem(ERTS_ALC_T_PORT_TABLE, ~((Uint) 0)); - } - - erts_port = (Port *) erts_alloc(ERTS_ALC_T_PORT_TABLE, - erts_max_ports * sizeof(Port)); - - erts_smp_atomic_init_nob(&erts_bytes_out, 0); - erts_smp_atomic_init_nob(&erts_bytes_in, 0); - - for (i = 0; i < erts_max_ports; i++) { - erts_port_task_init_sched(&erts_port[i].sched); - erts_smp_atomic32_init_nob(&erts_port[i].common.refc, 0); + erts_ptab_init_table(&erts_port, + ERTS_ALC_T_PORT_TABLE, #ifdef ERTS_SMP - erts_port[i].lock = NULL; - erts_port[i].xports = NULL; - erts_smp_spinlock_init_x(&erts_port[i].state_lck, "port_state", make_small(i)); + release_port, +#else + NULL, #endif - ERTS_TRACER_PROC(&erts_port[i]) = NIL; - ERTS_TRACE_FLAGS(&erts_port[i]) = 0; - - erts_port[i].drv_ptr = NULL; - erts_smp_atomic32_init_nob(&erts_port[i].state, ERTS_PORT_SFLG_FREE); - erts_port[i].name = NULL; - ERTS_P_LINKS(&erts_port[i]) = NULL; - ERTS_P_MONITORS(&erts_port[i]) = NULL; - erts_port[i].linebuf = NULL; - erts_port[i].port_data_lock = NULL; - } + (ErtsPTabElementCommon *) &erts_invalid_port.common, + port_tab_size, + "port_table"); - erts_smp_atomic32_init_nob(&erts_ports_snapshot, (erts_aint32_t) 0); - last_port_num = 0; - erts_smp_spinlock_init(&get_free_port_lck, "get_free_port"); + erts_smp_atomic_init_nob(&erts_bytes_out, 0); + erts_smp_atomic_init_nob(&erts_bytes_in, 0); sys_init_io(); @@ -1769,7 +1697,7 @@ deliver_vec_message(Port* prt, /* Port */ if (!rp) return; - state = erts_smp_atomic32_read_nob(&prt->state); + state = erts_atomic32_read_nob(&prt->state); /* * Calculate the exact number of heap words needed. */ @@ -1907,7 +1835,7 @@ static void flush_port(Port *p) ASSERT(!p->xports); #endif } - if ((erts_smp_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0 + if ((erts_atomic32_read_nob(&p->state) & ERTS_PORT_SFLGS_DEAD) == 0 && is_port_ioq_empty(p)) { terminate_port(p); } @@ -1929,8 +1857,8 @@ terminate_port(Port *prt) ASSERT(!ERTS_P_MONITORS(prt)); /* state may be altered by kill_port() below */ - state = erts_smp_atomic32_read_band_nob(&prt->state, - ~ERTS_PORT_SFLG_SEND_CLOSED); + state = erts_atomic32_read_band_nob(&prt->state, + ~ERTS_PORT_SFLG_SEND_CLOSED); if (state & ERTS_PORT_SFLG_SEND_CLOSED) { send_closed_port_id = prt->common.id; connected_id = prt->connected; @@ -1981,6 +1909,8 @@ terminate_port(Port *prt) if (prt->psd) erts_free(ERTS_ALC_T_PRTSD, prt->psd); + ASSERT(prt->dist_entry == NULL); + kill_port(prt); /* @@ -1989,13 +1919,11 @@ terminate_port(Port *prt) */ if ((state & ERTS_PORT_SFLG_HALT) && (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0)) { - erts_smp_port_unlock(prt); /* We will exit and never return */ + erts_port_release(prt); /* We will exit and never return */ erl_exit_flush_async(erts_halt_code, ""); } if (is_internal_port(send_closed_port_id)) deliver_result(send_closed_port_id, connected_id, am_closed); - - ASSERT(prt->dist_entry == NULL); } void @@ -2130,7 +2058,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) } #endif - state = erts_smp_atomic32_read_nob(&p->state); + state = erts_atomic32_read_nob(&p->state); if ((state & (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_EXITING | ERTS_PORT_SFLG_IMMORTAL)) @@ -2149,12 +2077,12 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) * Setting the port to not busy here, frees the list of pending * processes and makes them runnable. */ - set_busy_port((ErlDrvPort)internal_port_index(p->common.id), 0); + set_busy_port((ErlDrvPort) p, 0); if (p->common.u.alive.reg != NULL) (void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name); - state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); + state = erts_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); { SweepContext sc = {p->common.id, rreason}; @@ -2174,16 +2102,16 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) erts_do_net_exits(p->dist_entry, rreason); erts_deref_dist_entry(p->dist_entry); p->dist_entry = NULL; - erts_smp_atomic32_read_band_relb(&p->state, - ~ERTS_PORT_SFLG_DISTRIBUTION); + erts_atomic32_read_band_relb(&p->state, + ~ERTS_PORT_SFLG_DISTRIBUTION); } if ((reason != am_kill) && !is_port_ioq_empty(p)) { /* must turn exiting flag off */ - erts_smp_atomic32_read_bset_relb(&p->state, - (ERTS_PORT_SFLG_EXITING - | ERTS_PORT_SFLG_CLOSING), - ERTS_PORT_SFLG_CLOSING); + erts_atomic32_read_bset_relb(&p->state, + (ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_CLOSING), + ERTS_PORT_SFLG_CLOSING); flush_port(p); } else { @@ -2232,8 +2160,8 @@ void erts_port_command(Process *proc, if ((pid = port->connected) == tp[1]) { /* PID must be connected */ if (tp[2] == am_close) { - erts_smp_atomic32_read_bor_relb(&port->state, - ERTS_PORT_SFLG_SEND_CLOSED); + erts_atomic32_read_bor_relb(&port->state, + ERTS_PORT_SFLG_SEND_CLOSED); erts_do_exit_port(port, pid, am_normal); #ifdef USE_VM_PROBES @@ -2456,16 +2384,15 @@ static void prt_one_lnk(ErtsLink *lnk, void *vprtd) } void -print_port_info(int to, void *arg, int i) +print_port_info(Port *p, int to, void *arg) { - Port* p = &erts_port[i]; - erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); + erts_aint32_t state = erts_atomic32_read_nob(&p->state); if (state & ERTS_PORT_SFLGS_DEAD) return; erts_print(to, arg, "=port:%T\n", p->common.id); - erts_print(to, arg, "Slot: %d\n", i); + erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id)); if (state & ERTS_PORT_SFLG_CONNECTED) { erts_print(to, arg, "Connected: %T", p->connected); erts_print(to, arg, "\n"); @@ -2505,43 +2432,46 @@ print_port_info(int to, void *arg, int i) void set_busy_port(ErlDrvPort port_num, int on) { + Port *prt; #ifdef USE_VM_PROBES DTRACE_CHARBUF(port_str, 16); #endif ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num])); + prt = erts_drvport2port_raw(port_num); + if (!prt) + return; if (on) { - erts_smp_atomic32_read_bor_relb(&erts_port[port_num].state, - ERTS_PORT_SFLG_PORT_BUSY); + erts_atomic32_read_bor_relb(&prt->state, + ERTS_PORT_SFLG_PORT_BUSY); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { erts_snprintf(port_str, sizeof(port_str), - "%T", erts_port[port_num].id); + "%T", prt->id); DTRACE1(port_busy, port_str); } #endif } else { - ErtsProcList* plp = erts_port[port_num].suspended; - erts_smp_atomic32_read_band_relb(&erts_port[port_num].state, - ~ERTS_PORT_SFLG_PORT_BUSY); - erts_port[port_num].suspended = NULL; + ErtsProcList* plp = prt->suspended; + erts_atomic32_read_band_relb(&prt->state, + ~ERTS_PORT_SFLG_PORT_BUSY); + prt->suspended = NULL; #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_not_busy)) { erts_snprintf(port_str, sizeof(port_str), - "%T", erts_port[port_num].id); + "%T", prt->id); DTRACE1(port_not_busy, port_str); } #endif - if (erts_port[port_num].dist_entry) { + if (prt->dist_entry) { /* * Processes suspended on distribution ports are * normally queued on the dist entry. */ - erts_dist_port_not_busy(&erts_port[port_num]); + erts_dist_port_not_busy(prt); } /* @@ -2587,10 +2517,9 @@ set_busy_port(ErlDrvPort port_num, int on) void set_port_control_flags(ErlDrvPort port_num, int flags) { - - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num])); - - erts_port[port_num].control_flags = flags; + Port *prt = erts_drvport2port_raw(port_num); + if (prt) + prt->control_flags = flags; } int get_port_flags(ErlDrvPort ix) @@ -2649,7 +2578,7 @@ int async_ready(Port *p, void* data) ERTS_SMP_CHK_NO_PROC_LOCKS; if (p) { - erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); + erts_aint32_t state = erts_atomic32_read_nob(&p->state); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); ASSERT(!(state & ERTS_PORT_SFLGS_DEAD)); if (p->drv_ptr->ready_async != NULL) { @@ -2697,7 +2626,7 @@ erts_stale_drv_select(Eterm port, int deselect) { char *type; - ErlDrvPort drv_port = internal_port_index(port); + ErlDrvPort drv_port = (ErlDrvPort) erts_port_lookup_raw(port); ErtsPortNames *pnp = erts_get_port_names(port); erts_dsprintf_buf_t *dsbufp; @@ -2737,16 +2666,16 @@ erts_stale_drv_select(Eterm port, ErtsPortNames * erts_get_port_names(Eterm id) { + Port *prt = erts_port_lookup_raw(id); ErtsPortNames *pnp; ASSERT(is_nil(id) || is_internal_port(id)); - - if (is_not_internal_port(id)) { + + if (!prt) { pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, sizeof(ErtsPortNames)); pnp->name = NULL; pnp->driver_name = NULL; } else { - Port* prt = &erts_port[internal_port_index(id)]; int do_realloc = 1; int len = -1; size_t pnp_len = sizeof(ErtsPortNames); @@ -2762,17 +2691,10 @@ erts_get_port_names(Eterm id) pnp_len = sizeof(ErtsPortNames) + len; pnp = erts_alloc(ERTS_ALC_T_PORT_NAMES, pnp_len); } - erts_smp_port_minor_lock(prt); - if (id != prt->common.id) { - len = nlen = 0; - name = driver_name = NULL; - } - else { - name = prt->name; - len = nlen = name ? sys_strlen(name) + 1 : 0; - driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL); - len += driver_name ? sys_strlen(driver_name) + 1 : 0; - } + name = prt->name; + len = nlen = name ? sys_strlen(name) + 1 : 0; + driver_name = (prt->drv_ptr ? prt->drv_ptr->name : NULL); + len += driver_name ? sys_strlen(driver_name) + 1 : 0; if (len <= pnp_len - sizeof(ErtsPortNames)) { if (!name) pnp->name = NULL; @@ -2790,7 +2712,6 @@ erts_get_port_names(Eterm id) } do_realloc = 0; } - erts_smp_port_minor_unlock(prt); } while (do_realloc); } return pnp; @@ -2827,7 +2748,7 @@ ErlDrvTermData driver_mk_term_nil(void) return driver_term_nil; } -void driver_report_exit(int ix, int status) +void driver_report_exit(ErlDrvPort ix, int status) { Port* prt = erts_drvport2port(ix, NULL); Eterm* hp; @@ -2868,27 +2789,6 @@ void driver_report_exit(int ix, int status) erts_smp_proc_dec_refc(rp); } - -static ERTS_INLINE int -deliver_term_check_port(ErlDrvPort drvport) -{ - int res; - int ix = (int) drvport; - if (ix < 0 || erts_max_ports <= ix) - res = -1; /* invalid */ - else { - Port* prt = &erts_port[ix]; - erts_aint32_t state = erts_smp_atomic32_read_nob(&prt->state); - if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) - res = 1; /* ok */ - else if (state & ERTS_PORT_SFLG_CLOSING) - res = 0; /* closing */ - else - res = -1; /* invalid (dead) */ - } - return res; -} - #define ERTS_B2T_STATES_DEF_STATES_SZ 5 #define ERTS_B2T_STATES_DEF_STATES_INC 100 @@ -2976,10 +2876,7 @@ cleanup_b2t_states(struct b2t_states__ *b2tsp) */ static int -driver_deliver_term(ErlDrvPort port, - Eterm to, - ErlDrvTermData* data, - int len) +driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) { #define ERTS_DDT_FAIL do { res = -1; goto done; } while (0) Uint need = 0; @@ -3181,11 +3078,8 @@ driver_deliver_term(ErlDrvPort port, b2t.ix = 0; /* - * The term is OK. Go ahead and validate the port and process. + * The term is OK. Go ahead and validate the process. */ - res = deliver_term_check_port(port); - if (res <= 0) - goto done; /* * Increase refc on proc if done from a non-scheduler thread. @@ -3449,25 +3343,29 @@ driver_deliver_term(ErlDrvPort port, #undef ERTS_DDT_FAIL } - int -driver_output_term(ErlDrvPort ix, ErlDrvTermData* data, int len) +driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len) { - Port* prt = erts_drvport2port(ix, NULL); - + erts_aint32_t state; + Port* prt = erts_drvport2port(drvport, &state); + if (!prt) + return -1; /* invalid (dead) */ ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - if (prt == NULL) - return -1; - return driver_deliver_term(ix, prt->connected, data, len); + if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) + return driver_deliver_term(prt->connected, data, len); + else if (state & ERTS_PORT_SFLG_CLOSING) + return 0; /* closing */ + else + return -1; /* invalid (dead) */ } int driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len) { - return driver_deliver_term(ix, to, data, len); + /* driver_send_term() assume port is ok... */ + return driver_deliver_term(to, data, len); } @@ -3817,6 +3715,7 @@ static ERTS_INLINE void pdl_destroy(ErlDrvPDL pdl) { ERTS_LC_ASSERT(driver_pdl_get_refc(pdl) == 0); erts_mtx_destroy(&pdl->mtx); + erts_port_dec_refc(pdl->prt); erts_free(ERTS_ALC_T_PORT_DATA_LOCK, pdl); } @@ -3861,6 +3760,8 @@ driver_pdl_create(ErlDrvPort dp) sizeof(struct erl_drv_port_data_lock)); erts_mtx_init(&pdl->mtx, "port_data_lock"); pdl_init_refc(pdl); + erts_port_inc_refc(pp); + pdl->prt = pp; pp->port_data_lock = pdl; #ifdef HARDDEBUG erts_fprintf(stderr, "driver_pdl_create(%T) -> 0x%08X\r\n",pp->common.id,(unsigned) pdl); @@ -4317,7 +4218,7 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t) if (prt == NULL) return -1; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + if (prt->drv_ptr->timeout == NULL) return -1; drv_cancel_timer(prt); @@ -4424,7 +4325,7 @@ static int do_driver_monitor_process(Port *prt, /* * This can be called from a non scheduler thread iff a port_data_lock exists */ -int driver_monitor_process(ErlDrvPort port, +int driver_monitor_process(ErlDrvPort drvport, ErlDrvTermData process, ErlDrvMonitor *monitor) { @@ -4434,15 +4335,12 @@ int driver_monitor_process(ErlDrvPort port, #if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif - int ix = (int) port; - if (ix < 0 || erts_max_ports <= ix) { - return -1; - } - prt = &erts_port[ix]; + + prt = erts_thr_drvport2port_raw(drvport); DRV_MONITOR_LOCK_PDL(prt); - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); @@ -4509,7 +4407,7 @@ static int do_driver_demonitor_process(Port *prt, Eterm *buf, return 0; } -int driver_demonitor_process(ErlDrvPort port, +int driver_demonitor_process(ErlDrvPort drvport, const ErlDrvMonitor *monitor) { Port *prt; @@ -4518,15 +4416,12 @@ int driver_demonitor_process(ErlDrvPort port, #if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif - int ix = (int) port; - if (ix < 0 || erts_max_ports <= ix) { - return -1; - } - prt = &erts_port[ix]; + + prt = erts_thr_drvport2port_raw(drvport); DRV_MONITOR_LOCK_PDL(prt); - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); @@ -4575,7 +4470,7 @@ static ErlDrvTermData do_driver_get_monitored_process(Port *prt, Eterm *buf, } -ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, +ErlDrvTermData driver_get_monitored_process(ErlDrvPort drvport, const ErlDrvMonitor *monitor) { Port *prt; @@ -4584,15 +4479,12 @@ ErlDrvTermData driver_get_monitored_process(ErlDrvPort port, #if !HEAP_ON_C_STACK || (defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)) ErtsSchedulerData *sched = erts_get_scheduler_data(); #endif - int ix = (int) port; - if (ix < 0 || erts_max_ports <= ix) { - return driver_term_nil; - } - prt = &erts_port[ix]; + + prt = erts_thr_drvport2port_raw(drvport); DRV_MONITOR_LOCK_PDL(prt); - state = erts_smp_atomic32_read_nob(&erts_port[ix].state); + state = erts_atomic32_read_nob(&prt->state); if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) { DRV_MONITOR_UNLOCK_PDL(prt); return driver_term_nil; @@ -4711,8 +4603,6 @@ int driver_exit(ErlDrvPort ix, int err) if (prt == NULL) return -1; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK); if (rp) { rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id); @@ -5007,7 +4897,7 @@ no_event_callback(ErlDrvData drv_data, ErlDrvEvent event, ErlDrvEventData event_ { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Event", "event()"); - driver_event((ErlDrvPort) internal_port_index(prt->common.id), event, NULL); + driver_event((ErlDrvPort) prt, event, NULL); } static void @@ -5015,7 +4905,7 @@ no_ready_input_callback(ErlDrvData drv_data, ErlDrvEvent event) { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Input", "ready_input()"); - driver_select((ErlDrvPort) internal_port_index(prt->common.id), event, + driver_select((ErlDrvPort) prt, event, (ERL_DRV_READ | ERL_DRV_USE_NO_CALLBACK), 0); } @@ -5024,7 +4914,7 @@ no_ready_output_callback(ErlDrvData drv_data, ErlDrvEvent event) { Port *prt = get_current_port(); report_missing_drv_callback(prt, "Output", "ready_output()"); - driver_select((ErlDrvPort) internal_port_index(prt->common.id), event, + driver_select((ErlDrvPort) prt, event, (ERL_DRV_WRITE | ERL_DRV_USE_NO_CALLBACK), 0); } @@ -5059,13 +4949,13 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle) drv->lock = NULL; else { drv->lock = erts_alloc(ERTS_ALC_T_DRIVER_LOCK, - sizeof(erts_smp_mtx_t)); - erts_smp_mtx_init_x(drv->lock, - "driver_lock", + sizeof(erts_mtx_t)); + erts_mtx_init_x(drv->lock, + "driver_lock", #if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT) - am_atom_put(drv->name, sys_strlen(drv->name)) + am_atom_put(drv->name, sys_strlen(drv->name)) #else - NIL + NIL #endif ); } -- cgit v1.2.3 From ef302baca81ceaedbfb128fae60a42e53910f061 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 24 Oct 2012 11:30:42 +0200 Subject: Remove R9 compatibility features --- erts/emulator/beam/io.c | 7 ------- 1 file changed, 7 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 4dcec356a9..21fc76b8db 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -1244,7 +1244,6 @@ void init_io(void) ErlDrvEntry** dp; char maxports[21]; /* enough for any 64-bit integer */ size_t maxportssize = sizeof(maxports); - Uint ports_bits = ERTS_PORTS_BITS; int port_tab_size; #ifdef ERTS_SMP @@ -1263,12 +1262,6 @@ void init_io(void) if (port_tab_size < 1024) port_tab_size = 1024; - if (erts_use_r9_pids_ports) { - ports_bits = ERTS_R9_PORTS_BITS; - if (port_tab_size > ERTS_MAX_R9_PORTS) - port_tab_size = ERTS_MAX_R9_PORTS; - } - erts_smp_mtx_init(&erts_driver_list_lock,"driver_list"); driver_list = NULL; erts_smp_tsd_key_create(&driver_list_lock_status_key); -- cgit v1.2.3 From 7e789df8dd9c7d86e9cc354521a37aa598aa5ec8 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 31 Oct 2012 23:31:50 +0100 Subject: Improve configuration of process and port tables --- erts/emulator/beam/io.c | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 21fc76b8db..1177e7e691 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -1238,13 +1238,10 @@ release_port(void *vport) } #endif -/* initialize the port array */ -void init_io(void) +void erts_init_io(int port_tab_size, + int port_tab_size_ignore_files) { ErlDrvEntry** dp; - char maxports[21]; /* enough for any 64-bit integer */ - size_t maxportssize = sizeof(maxports); - int port_tab_size; #ifdef ERTS_SMP init_xports_list_alloc(); @@ -1252,15 +1249,16 @@ void init_io(void) pdl_init(); - if (erts_sys_getenv("ERL_MAX_PORTS", maxports, &maxportssize) == 0) - port_tab_size = atoi(maxports); - else - port_tab_size = sys_max_files(); + if (!port_tab_size_ignore_files) { + int max_files = sys_max_files(); + if (port_tab_size < max_files) + port_tab_size = max_files; + } if (port_tab_size > ERTS_MAX_PORTS) port_tab_size = ERTS_MAX_PORTS; - if (port_tab_size < 1024) - port_tab_size = 1024; + else if (port_tab_size < ERTS_MIN_PORTS) + port_tab_size = ERTS_MIN_PORTS; erts_smp_mtx_init(&erts_driver_list_lock,"driver_list"); driver_list = NULL; -- cgit v1.2.3 From 56cef897ca3ad2377e34a6ea5800a54a28cbeb6e Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Mon, 20 Aug 2012 13:48:29 +0200 Subject: Optimize management of port tasks --- erts/emulator/beam/io.c | 97 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 30 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 1177e7e691..78c27b1385 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -197,19 +197,58 @@ erts_lc_is_port_locked(Port *prt) static void initq(Port* prt); +#if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT) +#define ERTS_PORT_INIT_INSTR_NEED_ID 1 +#else +#define ERTS_PORT_INIT_INSTR_NEED_ID 0 +#endif + +static ERTS_INLINE void port_init_instr(Port *prt +#if ERTS_PORT_INIT_INSTR_NEED_ID + , Eterm id +#endif + ) +{ +#if !ERTS_PORT_INIT_INSTR_NEED_ID + Eterm id = NIL; /* Not used */ +#endif + + /* + * Stuff that need to be initialized with the port id + * in the instrumented case, but not in the normal case. + */ +#ifdef ERTS_SMP + ASSERT(prt->drv_ptr && prt->lock); + if (!prt->drv_ptr->lock) + erts_mtx_init_locked_x(prt->lock, "port_lock", id); +#endif + erts_port_task_init_sched(&prt->sched, id); +} + +#if !ERTS_PORT_INIT_INSTR_NEED_ID +static ERTS_INLINE void port_init_instr_abort(Port *prt) +{ +#ifdef ERTS_SMP + ASSERT(prt->drv_ptr && prt->lock); + if (!prt->drv_ptr->lock) { + erts_mtx_unlock(prt->lock); + erts_mtx_destroy(prt->lock); + } +#endif + erts_port_task_fini_sched(&prt->sched); +} +#endif + static void insert_port_struct(void *vprt, Eterm data) { Port *prt = (Port *) vprt; Eterm id = make_internal_port(data); -#ifdef ERTS_SMP - ASSERT(prt->drv_ptr && prt->lock); +#if ERTS_PORT_INIT_INSTR_NEED_ID /* - * We are breaking lock order in the port specific locking - * case. This is, however, safe since the lock has not been - * published, yet. + * This cannot be done earlier in the instrumented + * case since we don't now 'id' until now. */ - if (!prt->drv_ptr->lock) - erts_mtx_init_locked_x(prt->lock, "port_lock", id); + port_init_instr(prt, id); #endif prt->common.id = id; erts_atomic32_init_relb(&prt->state, ERTS_PORT_SFLG_INITIALIZING); @@ -292,7 +331,6 @@ static Port *create_port(char *name, sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer)); #endif erts_port_task_handle_init(&prt->timeout_task); - erts_port_task_init_sched(&prt->sched); prt->psd = NULL; prt->drv_data = (SWord) 0; @@ -301,10 +339,23 @@ static Port *create_port(char *name, ASSERT(((char *) prt) == ((char *) &prt->common)); +#if !ERTS_PORT_INIT_INSTR_NEED_ID + /* + * When 'id' isn't needed (the normal case), it is better to + * do the initialization here avoiding unnecessary contention + * on table... + */ + port_init_instr(prt); +#endif + if (!erts_ptab_new_element(&erts_port, &prt->common, (void *) prt, insert_port_struct)) { + +#if !ERTS_PORT_INIT_INSTR_NEED_ID + port_init_instr_abort(prt); +#endif #ifdef ERTS_SMP if (driver_lock) erts_mtx_unlock(driver_lock); @@ -343,10 +394,9 @@ erts_port_free(Port *prt) #if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK) erts_aint32_t state = erts_atomic32_read_nob(&prt->state); #endif - ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_FREE_SCHEDULED - | ERTS_PORT_SFLG_INITIALIZING)); + ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_INITIALIZING + | ERTS_PORT_SFLG_FREE)); ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); - ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE)); #ifdef ERTS_SMP ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0); @@ -354,6 +404,8 @@ erts_port_free(Port *prt) ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0); #endif + erts_port_task_fini_sched(&prt->sched); + #ifdef ERTS_SMP ASSERT(prt->lock); if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) @@ -372,7 +424,6 @@ erts_port_free(Port *prt) if (prt->drv_ptr->handle) erts_ddll_dereference_driver(prt->drv_ptr->handle); #endif - erts_atomic32_set_nob(&prt->state, ERTS_PORT_SFLG_FREE); erts_free(ERTS_ALC_T_PORT, prt); } @@ -1230,14 +1281,6 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) } } -#ifdef ERTS_SMP -static void -release_port(void *vport) -{ - erts_port_dec_refc((Port *) vport); -} -#endif - void erts_init_io(int port_tab_size, int port_tab_size_ignore_files) { @@ -1267,11 +1310,7 @@ void erts_init_io(int port_tab_size, erts_ptab_init_table(&erts_port, ERTS_ALC_T_PORT_TABLE, -#ifdef ERTS_SMP - release_port, -#else NULL, -#endif (ErtsPTabElementCommon *) &erts_invalid_port.common, port_tab_size, "port_table"); @@ -2727,11 +2766,9 @@ static void schedule_port_timeout(Port *p) * /Rickard */ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); - (void) erts_port_task_schedule(p->common.id, - &p->timeout_task, - ERTS_PORT_TASK_TIMEOUT, - (ErlDrvEvent) -1, - NULL); + erts_port_task_schedule(p->common.id, + &p->timeout_task, + ERTS_PORT_TASK_TIMEOUT); } ErlDrvTermData driver_mk_term_nil(void) @@ -4198,7 +4235,7 @@ drv_cancel_timer(Port *prt) erts_cancel_timer(&prt->common.u.alive.tm); #endif if (erts_port_task_is_scheduled(&prt->timeout_task)) - erts_port_task_abort(prt->common.id, &prt->timeout_task); + erts_port_task_abort(&prt->timeout_task); } int driver_set_timer(ErlDrvPort ix, unsigned long t) -- cgit v1.2.3 From 620c8c5bfe4c2b306a7bc0a7d41749bddea4ee62 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Tue, 19 Jun 2012 15:39:26 +0200 Subject: Use rwlock for driver list Conflicts: erts/emulator/beam/io.c --- erts/emulator/beam/io.c | 71 ++++++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 33 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 78c27b1385..9d38e5539e 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -51,7 +51,7 @@ extern ErlDrvEntry spawn_driver_entry; extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */ erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */ -erts_smp_mtx_t erts_driver_list_lock; /* Mutex for driver list */ +erts_smp_rwmtx_t erts_driver_list_lock; /* Mutex for driver list */ static erts_smp_tsd_key_t driver_list_lock_status_key; /*stop recursive locks when calling driver init */ static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error on a @@ -532,14 +532,14 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ ERTS_SMP_CHK_NO_PROC_LOCKS; - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rlock(&erts_driver_list_lock); if (!driver) { for (driver = driver_list; driver; driver = driver->next) { if (sys_strcmp(driver->name, name) == 0) break; } if (!driver) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG); } } @@ -584,7 +584,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } if (driver == NULL || (driver != &spawn_driver && opts->exit_status)) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG); } @@ -596,7 +596,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ erts_ddll_increment_port_count(driver->handle); erts_ddll_reference_driver(driver->handle); } - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); /* * We'll set up the port before calling the start function, @@ -606,9 +606,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ port = create_port(name, driver, driver_lock, pid, &port_errno); if (!port) { if (driver->handle) { - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rlock(&erts_driver_list_lock); erts_ddll_decrement_port_count(driver->handle); - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); erts_ddll_dereference_driver(driver->handle); } if (port_errno) @@ -681,9 +681,9 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ port->linebuf = NULL; } if (driver->handle != NULL) { - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rlock(&erts_driver_list_lock); erts_ddll_decrement_port_count(driver->handle); - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); } kill_port(port); erts_port_release(port); @@ -740,9 +740,9 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(creator_port)); driver = creator_port->drv_ptr; - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rlock(&erts_driver_list_lock); if (!erts_ddll_driver_ok(driver->handle)) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); return ERTS_INVALID_ERL_DRV_PORT; } @@ -755,14 +755,14 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ driver_lock = driver->lock; #endif - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); port = create_port(name, driver, driver_lock, pid, NULL); if (!port) { if (driver->handle) { - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rlock(&erts_driver_list_lock); erts_ddll_decrement_port_count(driver->handle); - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); erts_ddll_dereference_driver(driver->handle); } return ERTS_INVALID_ERL_DRV_PORT; @@ -773,9 +773,9 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ if (ERTS_PROC_IS_EXITING(rp)) { erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); if (driver->handle) { - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rlock(&erts_driver_list_lock); erts_ddll_decrement_port_count(driver->handle); - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); } kill_port(port); erts_port_release(port); @@ -1285,6 +1285,9 @@ void erts_init_io(int port_tab_size, int port_tab_size_ignore_files) { ErlDrvEntry** dp; + erts_smp_rwmtx_opt_t drv_list_rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER; + drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ; + drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED; #ifdef ERTS_SMP init_xports_list_alloc(); @@ -1303,7 +1306,9 @@ void erts_init_io(int port_tab_size, else if (port_tab_size < ERTS_MIN_PORTS) port_tab_size = ERTS_MIN_PORTS; - erts_smp_mtx_init(&erts_driver_list_lock,"driver_list"); + erts_smp_rwmtx_init_opt(&erts_driver_list_lock, + &drv_list_rwmtx_opts, + "driver_list"); driver_list = NULL; erts_smp_tsd_key_create(&driver_list_lock_status_key); erts_smp_tsd_key_create(&driver_list_last_error_key); @@ -1321,7 +1326,7 @@ void erts_init_io(int port_tab_size, sys_init_io(); erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1); - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rwlock(&erts_driver_list_lock); init_driver(&fd_driver, &fd_driver_entry, NULL); init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); @@ -1330,7 +1335,7 @@ void erts_init_io(int port_tab_size, erts_add_driver_entry(*dp, NULL, 1); erts_smp_tsd_set(driver_list_lock_status_key, NULL); - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); } /* @@ -1921,9 +1926,9 @@ terminate_port(Port *prt) #endif } if(drv->handle != NULL) { - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rlock(&erts_driver_list_lock); erts_ddll_decrement_port_count(drv->handle); - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_runlock(&erts_driver_list_lock); } stopq(prt); /* clear queue memory */ if(prt->linebuf != NULL){ @@ -4725,20 +4730,20 @@ int driver_lock_driver(ErlDrvPort ix) ERTS_SMP_CHK_NO_PROC_LOCKS; - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rwlock(&erts_driver_list_lock); if (prt == NULL) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); return -1; } ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); if ((dh = (DE_Handle*)prt->drv_ptr->handle ) == NULL) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); return -1; } erts_ddll_lock_driver(dh, prt->drv_ptr->name); - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); return 0; } @@ -4748,7 +4753,7 @@ static int maybe_lock_driver_list(void) void *rec_lock; rec_lock = erts_smp_tsd_get(driver_list_lock_status_key); if (rec_lock == 0) { - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rwlock(&erts_driver_list_lock); return 1; } return 0; @@ -4756,7 +4761,7 @@ static int maybe_lock_driver_list(void) static void maybe_unlock_driver_list(int doit) { if (doit) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); } } /* @@ -5055,7 +5060,7 @@ int erts_add_driver_entry(ErlDrvEntry *de, DE_Handle *handle, int driver_list_lo int res; if (!driver_list_locked) { - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rwlock(&erts_driver_list_lock); } dp->next = driver_list; @@ -5084,7 +5089,7 @@ int erts_add_driver_entry(ErlDrvEntry *de, DE_Handle *handle, int driver_list_lo if (!driver_list_locked) { erts_smp_tsd_set(driver_list_lock_status_key, NULL); - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); } return res; } @@ -5097,7 +5102,7 @@ int remove_driver_entry(ErlDrvEntry *drv) rec_lock = erts_smp_tsd_get(driver_list_lock_status_key); if (rec_lock == NULL) { - erts_smp_mtx_lock(&erts_driver_list_lock); + erts_smp_rwmtx_rwlock(&erts_driver_list_lock); } dp = driver_list; while (dp && dp->entry != drv) @@ -5105,7 +5110,7 @@ int remove_driver_entry(ErlDrvEntry *drv) if (dp) { if (dp->handle) { if (rec_lock == NULL) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); } return -1; } @@ -5119,12 +5124,12 @@ int remove_driver_entry(ErlDrvEntry *drv) } erts_destroy_driver(dp); if (rec_lock == NULL) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); } return 1; } if (rec_lock == NULL) { - erts_smp_mtx_unlock(&erts_driver_list_lock); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); } return 0; } -- cgit v1.2.3 From 34fc6f243f8a462f4b2370a9fe5376df1ca08f1d Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 14 Sep 2012 21:42:11 +0200 Subject: Move busy port flag --- erts/emulator/beam/io.c | 60 ++++++++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 26 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 9d38e5539e..96a89a3b4e 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -476,23 +476,19 @@ static void stopq(Port* prt) } } -void -erts_wake_process_later(Port *prt, Process *process) +int +erts_save_suspend_process_on_port(Port *prt, Process *process) { - ErtsProcList** p; - ErtsProcList* new_p; - - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - if (erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) - return; - - for (p = &(prt->suspended); *p != NULL; p = &((*p)->next)) - /* Empty loop body */; - - new_p = erts_proclist_create(process); - new_p->next = NULL; - *p = new_p; + int saved; + erts_aint32_t flags; + erts_port_task_sched_lock(&prt->sched); + flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + saved = ((flags & (ERTS_PTS_FLG_BUSY + | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY); + if (saved) + erts_proclist_store_last(&prt->suspended, erts_proclist_create(process)); + erts_port_task_sched_unlock(&prt->sched); + return saved; } /* @@ -2094,9 +2090,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) #endif state = erts_atomic32_read_nob(&p->state); - if ((state & (ERTS_PORT_SFLGS_DEAD - | ERTS_PORT_SFLG_EXITING - | ERTS_PORT_SFLG_IMMORTAL)) + if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING)) || ((reason == am_normal) && ((from != p->connected) && (from != p->common.id)))) { return; @@ -2468,6 +2462,8 @@ void set_busy_port(ErlDrvPort port_num, int on) { Port *prt; + erts_aint32_t flags; + #ifdef USE_VM_PROBES DTRACE_CHARBUF(port_str, 16); #endif @@ -2479,8 +2475,10 @@ set_busy_port(ErlDrvPort port_num, int on) return; if (on) { - erts_atomic32_read_bor_relb(&prt->state, - ERTS_PORT_SFLG_PORT_BUSY); + flags = erts_smp_atomic32_read_bor_nob(&prt->sched.flags, + ERTS_PTS_FLG_BUSY); + if (flags & ERTS_PTS_FLG_BUSY) + return; /* Already busy */ #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { erts_snprintf(port_str, sizeof(port_str), @@ -2489,10 +2487,12 @@ set_busy_port(ErlDrvPort port_num, int on) } #endif } else { - ErtsProcList* plp = prt->suspended; - erts_atomic32_read_band_relb(&prt->state, - ~ERTS_PORT_SFLG_PORT_BUSY); - prt->suspended = NULL; + ErtsProcList *plp; + + flags = erts_smp_atomic32_read_band_nob(&prt->sched.flags, + ~ERTS_PTS_FLG_BUSY); + if (!(flags & ERTS_PTS_FLG_BUSY)) + return; /* Already non-busy */ #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_not_busy)) { @@ -2517,7 +2517,13 @@ set_busy_port(ErlDrvPort port_num, int on) * the first process. */ - if (plp) { + erts_port_task_sched_lock(&prt->sched); + plp = prt->suspended; + prt->suspended = NULL; + erts_port_task_sched_unlock(&prt->sched); + + if (erts_proclist_fetch(&plp, NULL)) { + #ifdef USE_VM_PROBES /* * Hrm, for blocked dist ports, plp always seems to be NULL. @@ -2540,8 +2546,10 @@ set_busy_port(ErlDrvPort port_num, int on) } } #endif + /* First proc should be resumed last */ if (plp->next) { + plp->next->prev = NULL; erts_resume_processes(plp->next); plp->next = NULL; } -- cgit v1.2.3 From 23c6f9e07a3cae7c05e55abd01ff798384241538 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Sun, 16 Sep 2012 02:45:32 +0200 Subject: Add erl_drv_[send|output]_term --- erts/emulator/beam/io.c | 149 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 120 insertions(+), 29 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 96a89a3b4e..6776f72894 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -323,7 +323,7 @@ static Port *create_port(char *name, prt->bytes_in = 0; prt->bytes_out = 0; prt->dist_entry = NULL; - prt->connected = pid; + ERTS_PORT_INIT_CONNECTED(prt, pid); prt->common.u.alive.reg = NULL; #ifdef ERTS_SMP prt->common.u.alive.ptimer = NULL; @@ -1684,7 +1684,7 @@ static void flush_linebuf_messages(Port *prt, erts_aint32_t state) while((ret = flush_linebuf(&lc)) > LINEBUF_EMPTY) deliver_read_message(prt, state, - prt->connected, + ERTS_PORT_GET_CONNECTED(prt), NULL, 0, LINEBUF_DATA(lc), @@ -1847,7 +1847,7 @@ static void flush_port(Port *p) if (p->drv_ptr->flush != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_flush)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p) DTRACE3(driver_flush, process_str, port_str, p->name); } #endif @@ -1892,7 +1892,7 @@ terminate_port(Port *prt) ~ERTS_PORT_SFLG_SEND_CLOSED); if (state & ERTS_PORT_SFLG_SEND_CLOSED) { send_closed_port_id = prt->common.id; - connected_id = prt->connected; + connected_id = ERTS_PORT_GET_CONNECTED(prt); } else { send_closed_port_id = NIL; @@ -1909,7 +1909,7 @@ terminate_port(Port *prt) int fpe_was_unmasked = erts_block_fpe(); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_stop)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt) + DTRACE_FORMAT_COMMON_PID_AND_PORT(connected_id, prt) DTRACE3(driver_stop, process_str, drv->name, port_str); } #endif @@ -2092,7 +2092,7 @@ erts_do_exit_port(Port *p, Eterm from, Eterm reason) state = erts_atomic32_read_nob(&p->state); if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING)) || ((reason == am_normal) && - ((from != p->connected) && (from != p->common.id)))) { + ((from != ERTS_PORT_GET_CONNECTED(p)) && (from != p->common.id)))) { return; } @@ -2186,7 +2186,7 @@ void erts_port_command(Process *proc, if (is_tuple_arity(command, 2)) { tp = tuple_val(command); - if ((pid = port->connected) == tp[1]) { + if ((pid = ERTS_PORT_GET_CONNECTED(port)) == tp[1]) { /* PID must be connected */ if (tp[2] == am_close) { erts_atomic32_read_bor_relb(&port->state, @@ -2212,7 +2212,7 @@ void erts_port_command(Process *proc, DTRACE4(port_command, process_str, port_str, port->name, "connect"); } #endif - port->connected = tp[2]; + ERTS_PORT_SET_CONNECTED_RELB(port, tp[2]); deliver_result(port->common.id, pid, am_connected); goto done; } @@ -2223,7 +2223,7 @@ void erts_port_command(Process *proc, { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; Process* rp = erts_pid2proc(NULL, 0, - port->connected, rp_locks); + ERTS_PORT_GET_CONNECTED(port), rp_locks); if (rp) { (void) erts_send_exit_signal(NULL, port->common.id, @@ -2423,7 +2423,7 @@ print_port_info(Port *p, int to, void *arg) erts_print(to, arg, "=port:%T\n", p->common.id); erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id)); if (state & ERTS_PORT_SFLG_CONNECTED) { - erts_print(to, arg, "Connected: %T", p->connected); + erts_print(to, arg, "Connected: %T", ERTS_PORT_GET_CONNECTED(p)); erts_print(to, arg, "\n"); } @@ -2627,7 +2627,7 @@ int async_ready(Port *p, void* data) if (p->drv_ptr->ready_async != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_ready_async)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(p->connected, p) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(p), p) DTRACE3(driver_ready_async, process_str, port_str, p->name); } #endif @@ -2804,7 +2804,7 @@ void driver_report_exit(ErlDrvPort ix, int status) ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - pid = prt->connected; + pid = ERTS_PORT_GET_CONNECTED(prt); ASSERT(is_internal_pid(pid)); rp = (scheduler @@ -3384,28 +3384,114 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) #undef ERTS_DDT_FAIL } +static ERTS_INLINE int +deliver_term_check_port(ErlDrvTermData port_id, Eterm *connected_p) +{ +#ifdef ERTS_SMP + ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay(); +#endif + Port *prt = erts_port_lookup_raw((Eterm) port_id); + erts_aint32_t state = erts_atomic32_read_nob(&prt->state); + if (connected_p) { +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + ETHR_MEMBAR(ETHR_LoadLoad); +#endif + *connected_p = ERTS_PORT_GET_CONNECTED(prt); + } +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) { + erts_thr_progress_unmanaged_continue(dhndl); + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); + } +#endif + ERTS_SMP_LC_ASSERT(dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED + ? erts_lc_is_port_locked(prt) + : !erts_lc_is_port_locked(prt)); + return ((state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + ? -1 + : ((state & ERTS_PORT_SFLG_CLOSING) ? 0 : 1)); +} + +int erl_drv_output_term(ErlDrvTermData port_id, ErlDrvTermData* data, int len) +{ + /* May be called from arbitrary thread */ + Eterm connected; + int res = deliver_term_check_port(port_id, &connected); + if (res <= 0) + return res; + return driver_deliver_term(connected, data, len); +} + +/* + * driver_output_term() is deprecated, and has been scheduled for + * removal in OTP-R17. It is replaced by erl_drv_output_term() + * above. + */ int driver_output_term(ErlDrvPort drvport, ErlDrvTermData* data, int len) { erts_aint32_t state; - Port* prt = erts_drvport2port(drvport, &state); + Port* prt; + + ERTS_SMP_CHK_NO_PROC_LOCKS; + /* NOTE! It *not* safe to access 'drvport' from unmanaged threads. */ + prt = erts_drvport2port(drvport, &state); if (!prt) return -1; /* invalid (dead) */ ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) - return driver_deliver_term(prt->connected, data, len); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return -1; else if (state & ERTS_PORT_SFLG_CLOSING) - return 0; /* closing */ - else - return -1; /* invalid (dead) */ + return 0; + + return driver_deliver_term(ERTS_PORT_GET_CONNECTED(prt), data, len); } +int erl_drv_send_term(ErlDrvTermData port_id, + ErlDrvTermData to, + ErlDrvTermData* data, + int len) +{ + /* May be called from arbitrary thread */ + int res = deliver_term_check_port(port_id, NULL); + if (res <= 0) + return res; + return driver_deliver_term(to, data, len); +} +/* + * driver_send_term() is deprecated, and has been scheduled for + * removal in OTP-R17. It is replaced by erl_drv_send_term() above. + */ int -driver_send_term(ErlDrvPort ix, ErlDrvTermData to, ErlDrvTermData* data, int len) +driver_send_term(ErlDrvPort drvport, + ErlDrvTermData to, + ErlDrvTermData* data, + int len) { - /* driver_send_term() assume port is ok... */ + /* + * NOTE! It is *not* safe to access the 'drvport' parameter + * from unmanaged threads. Also note that it is impossible + * to make this access safe without using a less efficient + * internal data representation for ErlDrvPort. + */ + ERTS_SMP_CHK_NO_PROC_LOCKS; +#ifdef ERTS_SMP + if (erts_thr_progress_is_managed_thread()) +#endif + { + erts_aint32_t state; + Port* prt = erts_drvport2port(drvport, &state); + if (!prt) + return -1; /* invalid (dead) */ + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + if (state & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP) + return -1; + else if (state & ERTS_PORT_SFLG_CLOSING) + return 0; + } return driver_deliver_term(to, data, len); } @@ -3438,7 +3524,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, (byte*) (bin->orig_bytes+offs), len); } else - deliver_bin_message(prt, prt->connected, + deliver_bin_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen, bin, offs, len); return 0; } @@ -3481,9 +3567,11 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, (byte*) buf, len); } else if (state & ERTS_PORT_SFLG_LINEBUF_IO) - deliver_linebuf_message(prt, state, prt->connected, hbuf, hlen, buf, len); + deliver_linebuf_message(prt, state, ERTS_PORT_GET_CONNECTED(prt), + hbuf, hlen, buf, len); else - deliver_read_message(prt, state, prt->connected, hbuf, hlen, buf, len, 0); + deliver_read_message(prt, state, ERTS_PORT_GET_CONNECTED(prt), + hbuf, hlen, buf, len, 0); return 0; } @@ -3548,7 +3636,8 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, /* XXX handle distribution !!! */ prt->bytes_in += (hlen + size); erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size)); - deliver_vec_message(prt, prt->connected, hbuf, hlen, binv, iov, n, size); + deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen, + binv, iov, n, size); return 0; } @@ -4581,7 +4670,7 @@ void erts_fire_port_monitor(Port *prt, Eterm ref) DRV_MONITOR_UNLOCK_PDL(prt); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_process_exit)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(prt->connected, prt) + DTRACE_FORMAT_COMMON_PID_AND_PORT(ERTS_PORT_GET_CONNECTED(prt), prt) DTRACE3(driver_process_exit, process_str, port_str, prt->name); } #endif @@ -4614,7 +4703,7 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) if (state & ERTS_PORT_SFLG_CLOSING) { terminate_port(prt); } else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) { - deliver_result(prt->common.id, prt->connected, am_eof); + deliver_result(prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof); } else { /* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */ if (prt->port_data_lock) @@ -4638,18 +4727,20 @@ int driver_exit(ErlDrvPort ix, int err) Port* prt = erts_drvport2port(ix, NULL); Process* rp; ErtsLink *lnk, *rlnk = NULL; + Eterm connected; ERTS_SMP_CHK_NO_PROC_LOCKS; if (prt == NULL) return -1; - rp = erts_pid2proc(NULL, 0, prt->connected, ERTS_PROC_LOCK_LINK); + connected = ERTS_PORT_GET_CONNECTED(prt); + rp = erts_pid2proc(NULL, 0, connected, ERTS_PROC_LOCK_LINK); if (rp) { rlnk = erts_remove_link(&ERTS_P_LINKS(rp),prt->common.id); } - lnk = erts_remove_link(&ERTS_P_LINKS(prt),prt->connected); + lnk = erts_remove_link(&ERTS_P_LINKS(prt), connected); #ifdef ERTS_SMP if (rp) @@ -4718,7 +4809,7 @@ ErlDrvTermData driver_connected(ErlDrvPort ix) if (prt == NULL) return NIL; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - return prt->connected; + return ERTS_PORT_GET_CONNECTED(prt); } ErlDrvTermData driver_caller(ErlDrvPort ix) -- cgit v1.2.3 From 6e01408aba71e26884c5db81b8e4fa89bd803576 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 21 Sep 2012 15:12:07 +0200 Subject: Implement true asynchronous signaling between processes and ports --- erts/emulator/beam/io.c | 3215 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 2675 insertions(+), 540 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 6776f72894..2a55a7c09f 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -43,6 +43,8 @@ #include "erl_version.h" #include "error.h" #include "erl_async.h" +#define ERTS_WANT_EXTERNAL_TAGS +#include "external.h" #include "dtrace-wrapper.h" extern ErlDrvEntry fd_driver_entry; @@ -69,6 +71,11 @@ erts_driver_t vanilla_driver; erts_driver_t spawn_driver; erts_driver_t fd_driver; +int erts_port_synchronous_ops = 0; +int erts_port_schedule_all_ops = 0; +int erts_port_parallelism = 0; + +static void deliver_result(Eterm sender, Eterm pid, Eterm res); static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *); static void terminate_port(Port *p); static void pdl_init(void); @@ -254,9 +261,12 @@ static void insert_port_struct(void *vprt, Eterm data) erts_atomic32_init_relb(&prt->state, ERTS_PORT_SFLG_INITIALIZING); } +#define ERTS_CREATE_PORT_FLAG_PARALLELISM (1 << 0) + static Port *create_port(char *name, erts_driver_t *driver, erts_mtx_t *driver_lock, + int create_flags, Eterm pid, int *enop) { @@ -264,6 +274,7 @@ static Port *create_port(char *name, char *p; size_t port_size, size; erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; + erts_aint32_t x_pts_flgs = 0; #ifdef DEBUG /* Make sure the debug flags survives until port is freed */ state |= ERTS_PORT_SFLG_PORT_DEBUG; @@ -373,6 +384,15 @@ static Port *create_port(char *name, ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + if (erts_port_schedule_all_ops) + x_pts_flgs |= ERTS_PTS_FLG_FORCE_SCHED; + + if (create_flags & ERTS_CREATE_PORT_FLAG_PARALLELISM) + x_pts_flgs |= ERTS_PTS_FLG_PARALLELISM; + + if (x_pts_flgs) + erts_smp_atomic32_read_bor_nob(&prt->sched.flags, x_pts_flgs); + erts_atomic32_set_relb(&prt->state, state); return prt; } @@ -525,6 +545,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ int error_type, error_number; int port_errno = 0; erts_mtx_t *driver_lock = NULL; + int cprt_flgs = 0; ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -599,7 +620,10 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ * to allow message sending and setting timers in the start function. */ - port = create_port(name, driver, driver_lock, pid, &port_errno); + if (opts->parallelism) + cprt_flgs |= ERTS_CREATE_PORT_FLAG_PARALLELISM; + + port = create_port(name, driver, driver_lock, cprt_flgs, pid, &port_errno); if (!port) { if (driver->handle) { erts_smp_rwmtx_rlock(&erts_driver_list_lock); @@ -713,6 +737,7 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ char* name, /* Driver name */ ErlDrvData drv_data) /* Driver data */ { + int cprt_flgs = 0; Port *creator_port; Port* port; erts_driver_t *driver; @@ -753,7 +778,11 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ erts_smp_rwmtx_runlock(&erts_driver_list_lock); - port = create_port(name, driver, driver_lock, pid, NULL); + /* Inherit parallelism flag from parent */ + if (ERTS_PTS_FLG_PARALLELISM & + erts_smp_atomic32_read_nob(&creator_port->sched.flags)) + cprt_flgs |= ERTS_CREATE_PORT_FLAG_PARALLELISM; + port = create_port(name, driver, driver_lock, cprt_flgs, pid, NULL); if (!port) { if (driver->handle) { erts_smp_rwmtx_rlock(&erts_driver_list_lock); @@ -806,10 +835,15 @@ erts_smp_xports_unlock(Port *prt) xplp = prt->xports; ASSERT(xplp); while (xplp) { + Port *rprt = xplp->port; ErtsXPortsList *free_xplp; - if (xplp->port->xports) - erts_smp_xports_unlock(xplp->port); - erts_port_release(xplp->port); + erts_aint32_t state; + if (rprt->xports) + erts_smp_xports_unlock(rprt); + state = erts_atomic32_read_nob(&rprt->state); + if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(rprt)) + terminate_port(rprt); + erts_port_release(rprt); free_xplp = xplp; xplp = xplp->next; xports_list_free(free_xplp); @@ -849,8 +883,8 @@ io_list_to_vec(Eterm obj, /* io-list */ DECLARE_ESTACK(s); Eterm* objp; char *buf = cbin->orig_bytes; - ErlDrvSizeT len = cbin->orig_size; - ErlDrvSizeT csize = 0; + Uint len = cbin->orig_size; + Uint csize = 0; int vlen = 0; char* cptr = buf; @@ -965,7 +999,7 @@ io_list_to_vec(Eterm obj, /* io-list */ #define IO_LIST_VEC_COUNT(obj) \ do { \ - ErlDrvSizeT _size = binary_size(obj); \ + Uint _size = binary_size(obj); \ Eterm _real; \ ERTS_DECLARE_DUMMY(Uint _offset); \ int _bitoffs; \ @@ -1016,8 +1050,9 @@ do { \ */ static int -io_list_vec_len(Eterm obj, Uint* vsize, Uint* csize, - Uint* pvsize, Uint* pcsize, Uint* total_size) +io_list_vec_len(Eterm obj, int* vsize, Uint* csize, + Uint* pvsize, Uint* pcsize, + ErlDrvSizeT* total_size) { DECLARE_ESTACK(s); Eterm* objp; @@ -1028,7 +1063,7 @@ io_list_vec_len(Eterm obj, Uint* vsize, Uint* csize, Uint p_v_size = 0; Uint p_c_size = 0; Uint p_in_clist = 0; - Uint total; + Uint total; /* Uint due to halfword emulator */ goto L_jump_start; /* avoid a push */ @@ -1088,7 +1123,7 @@ io_list_vec_len(Eterm obj, Uint* vsize, Uint* csize, if (total < c_size) { goto L_overflow_error; } - *total_size = total; + *total_size = (ErlDrvSizeT) total; DESTROY_ESTACK(s); *vsize = v_size; @@ -1103,267 +1138,1477 @@ io_list_vec_len(Eterm obj, Uint* vsize, Uint* csize, return 1; } -/* write data to a port */ -int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) -{ - char *buf; - erts_driver_t *drv = p->drv_ptr; - Uint size; +typedef enum { + ERTS_TRY_IMM_DRV_CALL_OK, + ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK, + ERTS_TRY_IMM_DRV_CALL_INVALID_PORT, + ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS +} ErtsTryImmDrvCallResult; + +typedef struct { + Process *c_p; /* Currently executing process (unlocked) */ + Port *port; /* Port to operate on */ + Eterm port_op; /* port operation as an atom */ + erts_aint32_t state; /* in: invalid state; out: read state (if read) */ + erts_aint32_t sched_flags; /* in: invalid flags; out: read flags (if read) */ + int async; /* Asynchronous operation */ + int pre_chk_sched_flags; /* Check sched flags before lock? */ int fpe_was_unmasked; - - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); +} ErtsTryImmDrvCallState; + +#define ERTS_INIT_TRY_IMM_DRV_CALL_STATE(C_P, PRT, SFLGS, PTS_FLGS, A, PRT_OP) \ + {(C_P), (PRT), (PRT_OP), (SFLGS), (PTS_FLGS), (A), 1, 0} + +/* + * Try doing an immediate driver callback call from a process. If + * this fail, the operation should be scheduled in the normal case... + * + */ +static ERTS_INLINE ErtsTryImmDrvCallResult +try_imm_drv_call(ErtsTryImmDrvCallState *sp) +{ + ErtsTryImmDrvCallResult res; + erts_aint32_t invalid_state, invalid_sched_flags; + Port *prt = sp->port; + Process *c_p = sp->c_p; + + ASSERT(is_atom(sp->port_op)); + + invalid_sched_flags = ERTS_PTS_FLGS_FORCE_SCHEDULE_OP; + invalid_sched_flags |= sp->sched_flags; + if (sp->async) + invalid_sched_flags |= ERTS_PTS_FLG_PARALLELISM; + + if (sp->pre_chk_sched_flags) { + sp->sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + if (sp->sched_flags & invalid_sched_flags) + return ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS; + } + + if (erts_smp_port_trylock(prt) == EBUSY) + return ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK; + + invalid_state = sp->state; + sp->state = erts_atomic32_read_nob(&prt->state); + if (sp->state & invalid_state) { + res = ERTS_TRY_IMM_DRV_CALL_INVALID_PORT; + goto locked_fail; + } + + sp->sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + if (sp->sched_flags & invalid_sched_flags) { + res = ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS; + goto locked_fail; + } + + if (c_p) { + if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS)) + trace_virtual_sched(c_p, am_out); + if (erts_system_profile_flags.runnable_procs + && erts_system_profile_flags.exclusive) + profile_runnable_proc(c_p, am_inactive); + + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); + } + ERTS_SMP_CHK_NO_PROC_LOCKS; - p->caller = caller_id; - if (drv->outputv != NULL) { - Uint vsize; - Uint csize; - Uint pvsize; - Uint pcsize; - ErlDrvSizeT blimit; - SysIOVec iv[SMALL_WRITE_VEC]; - ErlDrvBinary* bv[SMALL_WRITE_VEC]; - SysIOVec* ivp; - ErlDrvBinary** bvp; - ErlDrvBinary* cbin; - ErlIOVec ev; + if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) + trace_sched_ports_where(prt, am_in, sp->port_op); + if (erts_system_profile_flags.runnable_ports + && !erts_port_is_scheduled(prt)) + profile_runnable_port(prt, am_active); - if (io_list_vec_len(list, &vsize, &csize, - &pvsize, &pcsize, &size)) { - goto bad_value; - } - /* To pack or not to pack (small binaries) ...? */ - vsize++; - if (vsize <= SMALL_WRITE_VEC) { - /* Do NOT pack */ - blimit = 0; - } else { - /* Do pack */ - vsize = pvsize + 1; - csize = pcsize; - blimit = ERL_SMALL_IO_BIN_LIMIT; - } - /* Use vsize and csize from now on */ - if (vsize <= SMALL_WRITE_VEC) { - ivp = iv; - bvp = bv; - } else { - ivp = (SysIOVec *) erts_alloc(ERTS_ALC_T_TMP, - vsize * sizeof(SysIOVec)); - bvp = (ErlDrvBinary**) erts_alloc(ERTS_ALC_T_TMP, - vsize * sizeof(ErlDrvBinary*)); - } - cbin = driver_alloc_binary(csize); - if (!cbin) - erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize)); + sp->fpe_was_unmasked = erts_block_fpe(); - /* Element 0 is for driver usage to add header block */ - ivp[0].iov_base = NULL; - ivp[0].iov_len = 0; - bvp[0] = NULL; - ev.vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit); - if (ev.vsize < 0) { - if (ivp != iv) { - erts_free(ERTS_ALC_T_TMP, (void *) ivp); - } - if (bvp != bv) { - erts_free(ERTS_ALC_T_TMP, (void *) bvp); - } - driver_free_binary(cbin); - goto bad_value; - } - ev.vsize++; -#if 0 - /* This assertion may say something useful, but it can - be falsified during the emulator test suites. */ - ASSERT(ev.vsize == vsize); -#endif - ev.size = size; /* total size */ - ev.iov = ivp; - ev.binv = bvp; -#ifdef USE_VM_PROBES - if (DTRACE_ENABLED(driver_outputv)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(caller_id, p) - DTRACE4(driver_outputv, process_str, port_str, p->name, size); - } -#endif - fpe_was_unmasked = erts_block_fpe(); - (*drv->outputv)((ErlDrvData)p->drv_data, &ev); - erts_unblock_fpe(fpe_was_unmasked); - if (ivp != iv) { - erts_free(ERTS_ALC_T_TMP, (void *) ivp); - } - if (bvp != bv) { - erts_free(ERTS_ALC_T_TMP, (void *) bvp); - } - driver_free_binary(cbin); - } else { - int r; - - /* Try with an 8KB buffer first (will often be enough I guess). */ - size = 8*1024; - /* See below why the extra byte is added. */ - buf = erts_alloc(ERTS_ALC_T_TMP, size+1); - r = io_list_to_buf(list, buf, size); + return ERTS_TRY_IMM_DRV_CALL_OK; -#ifdef USE_VM_PROBES - if(DTRACE_ENABLED(port_command)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(caller_id, p) - DTRACE4(port_command, process_str, port_str, p->name, "command"); - } -#endif +locked_fail: + erts_port_release(prt); + return res; +} - if (r >= 0) { - size -= r; -#ifdef USE_VM_PROBES - if (DTRACE_ENABLED(driver_output)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(caller_id, p) - DTRACE4(driver_output, process_str, port_str, p->name, size); - } +static ERTS_INLINE void +finalize_imm_drv_call(ErtsTryImmDrvCallState *sp) +{ + Port *prt = sp->port; + Process *c_p = sp->c_p; + + erts_unblock_fpe(sp->fpe_was_unmasked); + +#ifdef ERTS_SMP + if (prt->xports) + erts_smp_xports_unlock(prt); + ASSERT(!prt->xports); #endif - fpe_was_unmasked = erts_block_fpe(); - (*drv->output)((ErlDrvData)p->drv_data, buf, size); - erts_unblock_fpe(fpe_was_unmasked); - erts_free(ERTS_ALC_T_TMP, buf); - } - else if (r == -2) { - erts_free(ERTS_ALC_T_TMP, buf); - goto bad_value; - } - else { - ASSERT(r == -1); /* Overflow */ - erts_free(ERTS_ALC_T_TMP, buf); - if (erts_iolist_size(list, &size)) { - goto bad_value; - } - /* - * I know drivers that pad space with '\0' this is clearly - * incorrect but I don't feel like fixing them now, insted - * add ONE extra byte. - */ - buf = erts_alloc(ERTS_ALC_T_TMP, size+1); - r = io_list_to_buf(list, buf, size); + if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) + trace_sched_ports_where(prt, am_out, sp->port_op); + if (erts_system_profile_flags.runnable_ports + && !erts_port_is_scheduled(prt)) + profile_runnable_port(prt, am_inactive); + + erts_port_release(prt); + + if (c_p) { + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); + + if (IS_TRACED_FL(c_p, F_TRACE_SCHED_PROCS)) + trace_virtual_sched(c_p, am_in); + if (erts_system_profile_flags.runnable_procs + && erts_system_profile_flags.exclusive) + profile_runnable_proc(c_p, am_active); + } +} + +#define ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE (REF_THING_SIZE + 3) + +static ERTS_INLINE void +queue_port_sched_op_reply(Process *rp, + ErtsProcLocks *rp_locksp, + Eterm *hp_start, + Eterm *hp, + Uint h_size, + ErlHeapFragment* bp, + Uint32 *ref_num, + Eterm msg) +{ + Eterm ref = make_internal_ref(hp); + write_ref_thing(hp, ref_num[0], ref_num[1], ref_num[2]); + hp += REF_THING_SIZE; + + msg = TUPLE2(hp, ref, msg); + hp += 3; + + if (!bp) { + HRelease(rp, hp_start + h_size, hp); + } + else { + Uint used_h_size = hp - hp_start; + ASSERT(h_size >= used_h_size); + if (h_size > used_h_size) + bp = erts_resize_message_buffer(bp, used_h_size, &msg, 1); + } + + erts_queue_message(rp, + rp_locksp, + bp, + msg, + NIL #ifdef USE_VM_PROBES - if (DTRACE_ENABLED(driver_output)) { - DTRACE_FORMAT_COMMON_PID_AND_PORT(caller_id, p) - DTRACE4(driver_output, process_str, port_str, p->name, size); - } + , NIL #endif - fpe_was_unmasked = erts_block_fpe(); - (*drv->output)((ErlDrvData)p->drv_data, buf, size); - erts_unblock_fpe(fpe_was_unmasked); - erts_free(ERTS_ALC_T_TMP, buf); + ); +} + +static void +port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg) +{ + Process *rp = erts_proc_lookup_raw(to); + if (rp) { + ErlOffHeap *ohp; + ErlHeapFragment* bp; + Eterm msg_copy; + Uint hsz, msg_sz; + Eterm *hp, *hp_start; + ErtsProcLocks rp_locks = 0; + + hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; + if (is_immed(msg)) + msg_sz = 0; + else { + msg_sz = size_object(msg); + hsz += msg_sz; } - } - p->bytes_out += size; - erts_smp_atomic_add_nob(&erts_bytes_out, size); -#ifdef ERTS_SMP - if (p->xports) - erts_smp_xports_unlock(p); - ASSERT(!p->xports); -#endif - p->caller = NIL; - return 0; + hp_start = hp = erts_alloc_message_heap(hsz, + &bp, + &ohp, + rp, + &rp_locks); + if (is_immed(msg)) + msg_copy = msg; + else + msg_copy = copy_struct(msg, msg_sz, &hp, ohp); - bad_value: - p->caller = NIL; - { - erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); - erts_dsprintf(dsbufp, "Bad value on output port '%s'\n", p->name); - erts_send_error_to_logger_nogl(dsbufp); - return 1; + queue_port_sched_op_reply(rp, + &rp_locks, + hp_start, + hp, + hsz, + bp, + ref_num, + msg_copy); + + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); } } -void erts_init_io(int port_tab_size, - int port_tab_size_ignore_files) + +ErtsPortOpResult +erts_schedule_proc2port_signal(Process *c_p, + Port *prt, + Eterm caller, + Eterm *refp, + ErtsProc2PortSigData *sigdp, + int task_flags, + ErtsProc2PortSigCallback callback) { - ErlDrvEntry** dp; - erts_smp_rwmtx_opt_t drv_list_rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER; - drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ; - drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED; + int sched_res; + if (!refp) { + if (c_p) + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); + } + else { + ASSERT(c_p); + sigdp->flags |= ERTS_P2P_SIG_DATA_FLG_REPLY; + erts_make_ref_in_array(sigdp->ref); + *refp = erts_proc_store_ref(c_p, sigdp->ref); -#ifdef ERTS_SMP - init_xports_list_alloc(); -#endif + /* + * Caller needs to wait for a message containing + * the ref that we just created. No such message + * can exist in callers message queue at this time. + * We therefore move the save pointer of the + * callers message queue to the end of the queue. + * + * NOTE: It is of vital importance that the caller + * immediately do a receive unconditionaly + * waiting for the message with the reference; + * otherwise, next receive will *not* work + * as expected! + */ + erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); + + if (ERTS_PROC_PENDING_EXIT(c_p)) { + /* need to exit caller instead */ + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); + KILL_CATCHES(c_p); + c_p->freason = EXC_EXIT; + return ERTS_PORT_OP_CALLER_EXIT; + } - pdl_init(); + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); + c_p->msg.save = c_p->msg.last; - if (!port_tab_size_ignore_files) { - int max_files = sys_max_files(); - if (port_tab_size < max_files) - port_tab_size = max_files; + erts_smp_proc_unlock(c_p, + (ERTS_PROC_LOCK_MAIN + | ERTS_PROC_LOCKS_MSG_RECEIVE)); } - if (port_tab_size > ERTS_MAX_PORTS) - port_tab_size = ERTS_MAX_PORTS; - else if (port_tab_size < ERTS_MIN_PORTS) - port_tab_size = ERTS_MIN_PORTS; - erts_smp_rwmtx_init_opt(&erts_driver_list_lock, - &drv_list_rwmtx_opts, - "driver_list"); - driver_list = NULL; - erts_smp_tsd_key_create(&driver_list_lock_status_key); - erts_smp_tsd_key_create(&driver_list_last_error_key); + sigdp->caller = caller; - erts_ptab_init_table(&erts_port, - ERTS_ALC_T_PORT_TABLE, - NULL, - (ErtsPTabElementCommon *) &erts_invalid_port.common, - port_tab_size, - "port_table"); + /* Schedule port close call for later execution... */ + sched_res = erts_port_task_schedule(prt->common.id, + NULL, + ERTS_PORT_TASK_PROC_SIG, + sigdp, + callback, + task_flags); - erts_smp_atomic_init_nob(&erts_bytes_out, 0); - erts_smp_atomic_init_nob(&erts_bytes_in, 0); + if (c_p) + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); - sys_init_io(); + if (sched_res != 0) { + if (refp) + *refp = NIL; + return ERTS_PORT_OP_DROPPED; + } + return ERTS_PORT_OP_SCHEDULED; +} - erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1); - erts_smp_rwmtx_rwlock(&erts_driver_list_lock); +static ERTS_INLINE void +send_badsig(Port *prt) +{ + ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + Process* rp; + Eterm connected = ERTS_PORT_GET_CONNECTED(prt); - init_driver(&fd_driver, &fd_driver_entry, NULL); - init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); - init_driver(&spawn_driver, &spawn_driver_entry, NULL); - for (dp = driver_tab; *dp != NULL; dp++) - erts_add_driver_entry(*dp, NULL, 1); + ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_LC_ASSERT(erts_get_scheduler_id()); - erts_smp_tsd_set(driver_list_lock_status_key, NULL); - erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); + ASSERT(is_internal_pid(connected)); + + rp = erts_proc_lookup_raw(connected); + if (rp) { + erts_smp_proc_lock(rp, rp_locks); + if (!ERTS_PROC_IS_EXITING(rp)) + (void) erts_send_exit_signal(NULL, + prt->common.id, + rp, + &rp_locks, + am_badsig, + NIL, + NULL, + 0); + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } +} + +static void +badsig_received(int bang_op, + Port *prt, + erts_aint32_t state, + int bad_output_value) +{ + /* + * if (bang_op) + * we are part of a "Prt ! Something" operation + * else + * we are part of a call to a port BIF + * behave accordingly... + */ + if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) { + if (bad_output_value) { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + erts_dsprintf(dsbufp, "Bad value on output port '%s'\n", prt->name); + erts_send_error_to_logger_nogl(dsbufp); + } + if (bang_op) + send_badsig(prt); + } +} + +static int +port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) +{ + if (op == ERTS_PROC2PORT_SIG_EXEC) + badsig_received(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP, + prt, + state, + sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT); + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) + port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); + return ERTS_PORT_REDS_BADSIG; } + /* - * Buffering of data when using line oriented I/O on ports + * bad_port_signal() will + * - preserve signal order of signals. + * - send a 'badsig' exit signal to connected process if 'from' is an + * internal pid and the port is alive when the bad signal reaches + * it. */ +static ErtsPortOpResult +bad_port_signal(Process *c_p, + int flags, + Port *prt, + Eterm from, + Eterm *refp, + Eterm port_op) +{ + ErtsProc2PortSigData *sigdp; + ErtsTryImmDrvCallResult try_call_res; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + !refp, + port_op); + + try_call_res = try_imm_drv_call(&try_call_state); + switch (try_call_res) { + case ERTS_TRY_IMM_DRV_CALL_OK: + badsig_received(flags & ERTS_PORT_SIG_FLG_BANG_OP, + prt, + try_call_state.state, + flags & ERTS_PORT_SIG_FLG_BAD_OUTPUT); + finalize_imm_drv_call(&try_call_state); + return ERTS_PORT_OP_BADARG; + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_DROPPED; + case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: + case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: + /* Schedule badsig() call instead... */ + break; + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = flags; + + return erts_schedule_proc2port_signal(c_p, + prt, + c_p->common.id, + refp, + sigdp, + 0, + port_badsig); +} -/* - * Buffer states - */ -#define LINEBUF_MAIN 0 -#define LINEBUF_FULL 1 -#define LINEBUF_CR_INSIDE 2 -#define LINEBUF_CR_AFTER 3 /* - * Creates a LineBuf to be added to the port structure, - * Returns: Pointer to a newly allocated and initialized LineBuf. - * Parameters: - * bufsiz - The (maximum) size of the line buffer. + * Driver outputv() callback */ -LineBuf *allocate_linebuf(bufsiz) -int bufsiz; + +static ERTS_INLINE void +call_driver_outputv(int bang_op, + Eterm caller, + Eterm from, + Port *prt, + erts_driver_t *drv, + ErlIOVec *evp) { - int ovsiz = (bufsiz < LINEBUF_INITIAL) ? bufsiz : LINEBUF_INITIAL; - LineBuf *lb = (LineBuf *) erts_alloc(ERTS_ALC_T_LINEBUF, - sizeof(LineBuf)+ovsiz); - lb->ovsiz = ovsiz; - lb->bufsiz = bufsiz; - lb->ovlen = 0; - lb->data[0] = LINEBUF_MAIN; /* state */ - return lb; -} + /* + * if (bang_op) + * we are part of a "Prt ! {From, {command, Data}}" operation + * else + * we are part of a call to port_command/[2,3] + * behave accordingly... + */ + if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt)) + send_badsig(prt); + else { + ErlDrvSizeT size = evp->size; + + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + +#ifdef USE_VM_PROBES + if (DTRACE_ENABLED(driver_outputv)) { + DTRACE_FORMAT_COMMON_PID_AND_PORT(caller, prt); + DTRACE4(driver_outputv, process_str, port_str, prt->name, size); + } +#endif + + prt->caller = caller; + (*drv->outputv)((ErlDrvData) prt->drv_data, evp); + prt->caller = NIL; + + prt->bytes_out += size; + erts_smp_atomic_add_nob(&erts_bytes_out, size); + } +} + +static ERTS_INLINE void +cleanup_scheduled_outputv(ErlIOVec *ev, ErlDrvBinary *cbinp) +{ + int i; + /* Need to free all binaries */ + for (i = 1; i < ev->vsize; i++) + if (ev->binv[i]) + driver_free_binary(ev->binv[i]); + if (cbinp) + driver_free_binary(cbinp); + erts_free(ERTS_ALC_T_DRV_CMD_DATA, ev); +} + +static int +port_sig_outputv(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) +{ + Eterm reply; + + switch (op) { + case ERTS_PROC2PORT_SIG_EXEC: + /* Execution of a scheduled outputv() call */ + + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) + reply = am_badarg; + else { + call_driver_outputv(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP, + sigdp->caller, + sigdp->u.outputv.from, + prt, + prt->drv_ptr, + sigdp->u.outputv.evp); + reply = am_true; + } + break; + case ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND: + reply = am_false; + break; + default: + reply = am_badarg; + break; + } + + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) + port_sched_op_reply(sigdp->caller, sigdp->ref, reply); + + cleanup_scheduled_outputv(sigdp->u.outputv.evp, + sigdp->u.outputv.cbinp); + + return ERTS_PORT_REDS_CMD_OUTPUTV; +} + +/* + * Driver output() callback + */ + +static ERTS_INLINE void +call_driver_output(int bang_op, + Eterm caller, + Eterm from, + Port *prt, + erts_driver_t *drv, + char *bufp, + ErlDrvSizeT size) +{ + /* + * if (bang_op) + * we are part of a "Prt ! {From, {command, Data}}" operation + * else + * we are part of a call to port_command/[2,3] + * behave accordingly... + */ + if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt)) + send_badsig(prt); + else { + +#ifdef USE_VM_PROBES + if (DTRACE_ENABLED(driver_output)) { + DTRACE_FORMAT_COMMON_PID_AND_PORT(caller, prt); + DTRACE4(driver_output, process_str, port_str, prt->name, size); + } +#endif + + prt->caller = caller; + (*drv->output)((ErlDrvData) prt->drv_data, bufp, size); + prt->caller = NIL; + + prt->bytes_out += size; + erts_smp_atomic_add_nob(&erts_bytes_out, size); + } +} + +static ERTS_INLINE void +cleanup_scheduled_output(char *bufp) +{ + erts_free(ERTS_ALC_T_DRV_CMD_DATA, bufp); +} + +static int +port_sig_output(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) +{ + Eterm reply; + + switch (op) { + case ERTS_PROC2PORT_SIG_EXEC: + /* Execution of a scheduled output() call */ + + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) + reply = am_badarg; + else { + call_driver_output(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP, + sigdp->caller, + sigdp->u.output.from, + prt, + prt->drv_ptr, + sigdp->u.output.bufp, + sigdp->u.output.size); + reply = am_true; + } + break; + case ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND: + reply = am_false; + break; + default: + reply = am_badarg; + break; + } + + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) + port_sched_op_reply(sigdp->caller, sigdp->ref, reply); + + cleanup_scheduled_output(sigdp->u.output.bufp); + + return ERTS_PORT_REDS_CMD_OUTPUT; +} + +ErtsPortOpResult +erts_port_output(Process *c_p, + int flags, + Port *prt, + Eterm from, + Eterm list, + Eterm *refp) +{ + ErtsPortOpResult res; + ErtsProc2PortSigData *sigdp; + erts_driver_t *drv = prt->drv_ptr; + size_t size; + int try_call; + erts_aint32_t sched_flags, busy_flag, invalid_flags; + int task_flags; + ErtsProc2PortSigCallback port_sig_callback; + ErlDrvBinary *cbin = NULL; + ErlIOVec *evp = NULL; + char *buf = NULL; + + ASSERT((flags & ~(ERTS_PORT_SIG_FLG_BANG_OP + | ERTS_PORT_SIG_FLG_NOSUSPEND + | ERTS_PORT_SIG_FLG_FORCE)) == 0); + + busy_flag = ((flags & ERTS_PORT_SIG_FLG_FORCE) + ? ((erts_aint32_t) 0) + : ERTS_PTS_FLG_BUSY); + invalid_flags = busy_flag; + if (!refp) + invalid_flags |= ERTS_PTS_FLG_PARALLELISM; + + /* + * Assumes caller have checked that port is valid... + */ + + sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + if (sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) + return ((sched_flags & busy_flag) + ? ERTS_PORT_OP_BUSY + : ERTS_PORT_OP_DROPPED); + + try_call = !(sched_flags & (invalid_flags|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP)); + +#ifdef USE_VM_PROBES + if(DTRACE_ENABLED(port_command)) { + DTRACE_FORMAT_COMMON_PID_AND_PORT(c_p->common.id, prt); + DTRACE4(port_command, process_str, port_str, prt->name, "command"); + } +#endif + + if (drv->outputv) { + ErlIOVec ev; + SysIOVec iv[SMALL_WRITE_VEC]; + ErlDrvBinary* bv[SMALL_WRITE_VEC]; + SysIOVec* ivp; + ErlDrvBinary** bvp; + int vsize; + Uint csize; + Uint pvsize; + Uint pcsize; + Uint blimit; + size_t iov_offset, binv_offset, alloc_size; + + if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size)) + goto bad_value; + + iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec)); + binv_offset = iov_offset; + binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec)); + alloc_size = binv_offset; + alloc_size += (vsize+1)*sizeof(ErlDrvBinary *); + + if (try_call && vsize < SMALL_WRITE_VEC) { + ivp = ev.iov = iv; + bvp = ev.binv = bv; + evp = &ev; + } + else { + char *ptr = erts_alloc((try_call + ? ERTS_ALC_T_TMP + : ERTS_ALC_T_DRV_CMD_DATA), alloc_size); + + evp = (ErlIOVec *) ptr; + ivp = evp->iov = (SysIOVec *) (ptr + iov_offset); + bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset); + } + + /* To pack or not to pack (small binaries) ...? */ + if (vsize < SMALL_WRITE_VEC) { + /* Do NOT pack */ + blimit = 0; + } + else { + /* Do pack */ + vsize = pvsize + 1; + csize = pcsize; + blimit = ERL_SMALL_IO_BIN_LIMIT; + } + /* Use vsize and csize from now on */ + + cbin = driver_alloc_binary(csize); + if (!cbin) + erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize)); + + /* Element 0 is for driver usage to add header block */ + ivp[0].iov_base = NULL; + ivp[0].iov_len = 0; + bvp[0] = NULL; + evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit); + if (evp->vsize < 0) { + if (evp != &ev) + erts_free(try_call ? ERTS_ALC_T_TMP : ERTS_ALC_T_DRV_CMD_DATA, + evp); + driver_free_binary(cbin); + goto bad_value; + } +#if 0 + /* This assertion may say something useful, but it can + be falsified during the emulator test suites. */ + ASSERT(evp->vsize == vsize); +#endif + evp->vsize++; + evp->size = size; /* total size */ + + if (!try_call) { + int i; + /* Need to increase refc on all binaries */ + for (i = 1; i < evp->vsize; i++) + if (bvp[i]) + driver_binary_inc_refc(bvp[i]); + } + else { + int i; + ErlIOVec *new_evp; + ErtsTryImmDrvCallResult try_call_res; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + invalid_flags, + !refp, + am_command); + try_call_state.pre_chk_sched_flags = 0; /* already checked */ + try_call_res = try_imm_drv_call(&try_call_state); + switch (try_call_res) { + case ERTS_TRY_IMM_DRV_CALL_OK: + call_driver_outputv(flags & ERTS_PORT_SIG_FLG_BANG_OP, + c_p->common.id, + from, + prt, + drv, + evp); + finalize_imm_drv_call(&try_call_state); + /* Fall through... */ + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + driver_free_binary(cbin); + if (evp != &ev) + erts_free(ERTS_ALC_T_TMP, evp); + if (try_call_res == ERTS_TRY_IMM_DRV_CALL_OK) + return ERTS_PORT_OP_DONE; + else + return ERTS_PORT_OP_DROPPED; + case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: + sched_flags = try_call_state.sched_flags; + case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: + /* Schedule outputv() call instead... */ + break; + } + + /* Need to increase refc on all binaries */ + for (i = 1; i < evp->vsize; i++) + if (bvp[i]) + driver_binary_inc_refc(bvp[i]); + + new_evp = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, alloc_size); + + if (evp != &ev) { + sys_memcpy((void *) new_evp, (void *) evp, alloc_size); + new_evp->iov = (SysIOVec *) (((char *) new_evp) + + iov_offset); + bvp = new_evp->binv = (ErlDrvBinary **) (((char *) new_evp) + + binv_offset); + +#ifdef DEBUG + ASSERT(new_evp->vsize == evp->vsize); + ASSERT(new_evp->size == evp->size); + for (i = 0; i < evp->vsize; i++) { + ASSERT(new_evp->iov[i].iov_len == evp->iov[i].iov_len); + ASSERT(new_evp->iov[i].iov_base == evp->iov[i].iov_base); + ASSERT(new_evp->binv[i] == evp->binv[i]); + } +#endif + + erts_free(ERTS_ALC_T_TMP, evp); + } + else { /* from stack allocated structure; offsets may differ */ + + sys_memcpy((void *) new_evp, (void *) evp, sizeof(ErlIOVec)); + new_evp->iov = (SysIOVec *) (((char *) new_evp) + + iov_offset); + sys_memcpy((void *) new_evp->iov, + (void *) evp->iov, + evp->vsize * sizeof(SysIOVec)); + new_evp->binv = (ErlDrvBinary **) (((char *) new_evp) + + binv_offset); + sys_memcpy((void *) new_evp->binv, + (void *) evp->binv, + evp->vsize * sizeof(ErlDrvBinary *)); + +#ifdef DEBUG + ASSERT(new_evp->vsize == evp->vsize); + ASSERT(new_evp->size == evp->size); + for (i = 0; i < evp->vsize; i++) { + ASSERT(new_evp->iov[i].iov_len == evp->iov[i].iov_len); + ASSERT(new_evp->iov[i].iov_base == evp->iov[i].iov_base); + ASSERT(new_evp->binv[i] == evp->binv[i]); + } +#endif + + } + + evp = new_evp; + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->u.outputv.from = from; + sigdp->u.outputv.evp = evp; + sigdp->u.outputv.cbinp = cbin; + port_sig_callback = port_sig_outputv; + } + else { + ErlDrvSizeT r; + + /* + * Apperently there exist code that write 1 byte to + * much in buffer. Where it resides I don't know, but + * we can live with one byte extra allocated... + */ + + if (!try_call) { + if (erts_iolist_size(list, &size)) + goto bad_value; + + buf = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, size + 1); + + r = erts_iolist_to_buf(list, buf, size); + ASSERT(ERTS_IOLIST_TO_BUF_SUCCEEDED(r)); + } + else { + char *new_buf; + ErtsTryImmDrvCallResult try_call_res; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + invalid_flags, + !refp, + am_command); + + /* Try with an 8KB buffer first (will often be enough I guess). */ + size = 8*1024; + + buf = erts_alloc(ERTS_ALC_T_TMP, size + 1); + r = erts_iolist_to_buf(list, buf, size); + + if (ERTS_IOLIST_TO_BUF_SUCCEEDED(r)) { + ASSERT(r <= size); + size -= r; + } + else { + erts_free(ERTS_ALC_T_TMP, buf); + if (r == ERTS_IOLIST_TO_BUF_TYPE_ERROR) + goto bad_value; + ASSERT(r == ERTS_IOLIST_TO_BUF_OVERFLOW); + if (erts_iolist_size(list, &size)) + goto bad_value; + buf = erts_alloc(ERTS_ALC_T_TMP, size + 1); + r = erts_iolist_to_buf(list, buf, size); + ASSERT(ERTS_IOLIST_TO_BUF_SUCCEEDED(r)); + } + try_call_state.pre_chk_sched_flags = 0; /* already checked */ + try_call_res = try_imm_drv_call(&try_call_state); + switch (try_call_res) { + case ERTS_TRY_IMM_DRV_CALL_OK: + call_driver_output(flags & ERTS_PORT_SIG_FLG_BANG_OP, + c_p->common.id, + from, + prt, + drv, + buf, + size); + finalize_imm_drv_call(&try_call_state); + /* Fall through... */ + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + erts_free(ERTS_ALC_T_TMP, buf); + if (try_call_res == ERTS_TRY_IMM_DRV_CALL_OK) + return ERTS_PORT_OP_DONE; + else + return ERTS_PORT_OP_DROPPED; + case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: + sched_flags = try_call_state.sched_flags; + case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: + /* Schedule outputv() call instead... */ + break; + } + + new_buf = erts_alloc(ERTS_ALC_T_DRV_CMD_DATA, size + 1); + sys_memcpy(new_buf, buf, size); + erts_free(ERTS_ALC_T_TMP, buf); + buf = new_buf; + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->u.output.from = from; + sigdp->u.output.bufp = buf; + sigdp->u.output.size = size; + port_sig_callback = port_sig_output; + } + + task_flags = ERTS_PT_FLG_WAIT_BUSY; + sigdp->flags = flags; + if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) { + task_flags = 0; + if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE) + sigdp->flags &= ~ERTS_P2P_SIG_DATA_FLG_NOSUSPEND; + else if (flags & ERTS_P2P_SIG_DATA_FLG_NOSUSPEND) + task_flags = ERTS_PT_FLG_NOSUSPEND; + } + + res = erts_schedule_proc2port_signal(c_p, + prt, + c_p->common.id, + refp, + sigdp, + task_flags, + port_sig_callback); + + if (res != ERTS_PORT_OP_SCHEDULED) { + if (drv->outputv) + cleanup_scheduled_outputv(evp, cbin); + else + cleanup_scheduled_output(buf); + return res; + } + + if ((sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY) + return ERTS_PORT_OP_BUSY_SCHEDULED; + + return res; + +bad_value: + + flags |= ERTS_PORT_SIG_FLG_BAD_OUTPUT; + return bad_port_signal(c_p, flags, prt, from, refp, am_command); +} + +static ERTS_INLINE ErtsPortOpResult +call_deliver_port_exit(int bang_op, + Eterm from, + Port *prt, + erts_aint32_t state, + Eterm reason, + int broken_link) +{ + /* + * if (bang_op) + * we are part of a "Prt ! {From, close}" operation + * else + * we are part of a call to port_close(Port) + * behave accordingly... + */ + + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) + return ERTS_PORT_OP_DROPPED; + + if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt)) { + send_badsig(prt); + return ERTS_PORT_OP_DROPPED; + } + + if (broken_link) { + ErtsLink *lnk = erts_remove_link(&ERTS_P_LINKS(prt), from); + if (lnk) + erts_destroy_link(lnk); + else + return ERTS_PORT_OP_DROPPED; + } + + if (!erts_deliver_port_exit(prt, from, reason, bang_op)) + return ERTS_PORT_OP_DROPPED; + +#ifdef USE_VM_PROBES + if(DTRACE_ENABLED(port_command) && bang_op) { + DTRACE_FORMAT_COMMON_PID_AND_PORT(from, prt); + DTRACE4(port_command, process_str, port_str, prt->name, "close"); + } +#endif + + return ERTS_PORT_OP_DONE; +} + +static int +port_sig_exit(Port *prt, + erts_aint32_t state, + int op, + ErtsProc2PortSigData *sigdp) +{ + Eterm msg = am_badarg; + if (op == ERTS_PROC2PORT_SIG_EXEC) { + ErtsPortOpResult res; + int bang_op = sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP; + int broken_link = sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK; + res = call_deliver_port_exit(bang_op, + sigdp->u.exit.from, + prt, + state, + sigdp->u.exit.reason, + broken_link); + + if (res == ERTS_PORT_OP_DONE) + msg = am_true; + } + if (sigdp->u.exit.bp) + free_message_buffer(sigdp->u.exit.bp); + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) + port_sched_op_reply(sigdp->caller, sigdp->ref, msg); + + return ERTS_PORT_REDS_EXIT; +} + +ErtsPortOpResult +erts_port_exit(Process *c_p, + int flags, + Port *prt, + Eterm from, + Eterm reason, + Eterm *refp) +{ + ErtsPortOpResult res; + ErtsProc2PortSigData *sigdp; + ErlHeapFragment *bp = NULL; + + ASSERT((flags & ~(ERTS_PORT_SIG_FLG_BANG_OP + | ERTS_PORT_SIG_FLG_BROKEN_LINK + | ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0); + + if (!(flags & ERTS_PORT_SIG_FLG_FORCE_SCHED)) { + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + !refp, + am_exit); + + + switch (try_imm_drv_call(&try_call_state)) { + case ERTS_TRY_IMM_DRV_CALL_OK: { + res = call_deliver_port_exit(flags & ERTS_PORT_SIG_FLG_BANG_OP, + from, + prt, + try_call_state.state, + reason, + flags & ERTS_PORT_SIG_FLG_BROKEN_LINK); + finalize_imm_drv_call(&try_call_state); + return res; + } + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_DROPPED; + default: + /* Schedule call instead... */ + break; + } + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = flags; + sigdp->u.exit.from = from; + + if (is_immed(reason)) { + sigdp->u.exit.reason = reason; + sigdp->u.exit.bp = NULL; + } + else { + Eterm *hp; + Uint hsz = size_object(reason); + bp = new_message_buffer(hsz); + sigdp->u.exit.bp = bp; + hp = bp->mem; + sigdp->u.exit.reason = copy_struct(reason, + hsz, + &hp, + &bp->off_heap); + } + + res = erts_schedule_proc2port_signal(c_p, + prt, + c_p ? c_p->common.id : from, + refp, + sigdp, + 0, + port_sig_exit); + + if (res == ERTS_PORT_OP_DROPPED) { + if (bp) + free_message_buffer(bp); + } + + return res; +} + +static ErtsPortOpResult +set_port_connected(int bang_op, + Eterm from, + Port *prt, + erts_aint32_t state, + Eterm connect) +{ + /* + * if (bang_op) + * we are part of a "Prt ! {From, {connect, Connect}}" operation + * else + * we are part of a call to port_connect(Port, Connect) + * behave accordingly... + */ + + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) + return ERTS_PORT_OP_DROPPED; + + if (bang_op) { /* Bang operation */ + if (is_not_internal_pid(connect) || ERTS_PORT_GET_CONNECTED(prt) != from) { + send_badsig(prt); + return ERTS_PORT_OP_DROPPED; + } + + ERTS_PORT_SET_CONNECTED(prt, connect); + deliver_result(prt->common.id, from, am_connected); + +#ifdef USE_VM_PROBES + if(DTRACE_ENABLED(port_command)) { + DTRACE_FORMAT_COMMON_PID_AND_PORT(from, prt); + DTRACE4(port_command, process_str, port_str, prt->name, "connect"); + } +#endif + } + else { /* Port BIF operation */ + Process *rp = erts_proc_lookup_raw(connect); + if (!rp) + return ERTS_PORT_OP_DROPPED; + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); + if (ERTS_PROC_IS_EXITING(rp)) { + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + return ERTS_PORT_OP_DROPPED; + } + + erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, prt->common.id); + erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, connect); + + ERTS_PORT_SET_CONNECTED(prt, connect); + + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + +#ifdef USE_VM_PROBES + if (DTRACE_ENABLED(port_connect)) { + DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); + DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); + DTRACE_CHARBUF(newprocess_str, DTRACE_TERM_BUF_SIZE); + + dtrace_pid_str(connect, process_str); + erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); + dtrace_proc_str(rp, newprocess_str); + DTRACE4(port_connect, process_str, port_str, prt->name, newprocess_str); + } +#endif + } + + return ERTS_PORT_OP_DONE; +} + +static int +port_sig_connect(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) +{ + Eterm msg = am_badarg; + if (op == ERTS_PROC2PORT_SIG_EXEC) { + ErtsPortOpResult res; + res = set_port_connected(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP, + sigdp->u.connect.from, + prt, + state, + sigdp->u.connect.connected); + if (res == ERTS_PORT_OP_DONE) + msg = am_true; + } + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) + port_sched_op_reply(sigdp->caller, sigdp->ref, msg); + return ERTS_PORT_REDS_CONNECT; +} + +ErtsPortOpResult +erts_port_connect(Process *c_p, + int flags, + Port *prt, + Eterm from, + Eterm connect, + Eterm *refp) +{ + ErtsProc2PortSigData *sigdp; + Eterm connect_id; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + !refp, + am_connect); + + ASSERT((flags & ~ERTS_PORT_SIG_FLG_BANG_OP) == 0); + + if (is_not_internal_pid(connect)) + connect_id = NIL; /* Fail in op (for signal order) */ + else + connect_id = connect; + + switch (try_imm_drv_call(&try_call_state)) { + case ERTS_TRY_IMM_DRV_CALL_OK: { + ErtsPortOpResult res; + res = set_port_connected(flags & ERTS_PORT_SIG_FLG_BANG_OP, + from, + prt, + try_call_state.state, + connect_id); + finalize_imm_drv_call(&try_call_state); + return res; + } + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_DROPPED; + default: + /* Schedule call instead... */ + break; + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = flags; + + sigdp->u.connect.from = from; + sigdp->u.connect.connected = connect_id; + + return erts_schedule_proc2port_signal(c_p, + prt, + c_p->common.id, + refp, + sigdp, + 0, + port_sig_connect); +} + +static void +port_unlink(Port *prt, Eterm from) +{ + ErtsLink *lnk = erts_remove_link(&ERTS_P_LINKS(prt), from); + if (lnk) + erts_destroy_link(lnk); +} + +static int +port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) +{ + if (op == ERTS_PROC2PORT_SIG_EXEC) + port_unlink(prt, sigdp->u.unlink.from); + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true); + return ERTS_PORT_REDS_UNLINK; +} + +ErtsPortOpResult +erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) +{ + ErtsProc2PortSigData *sigdp; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p, + prt, + ERTS_PORT_SFLGS_DEAD, + 0, + !refp, + am_unlink); + + switch (try_imm_drv_call(&try_call_state)) { + case ERTS_TRY_IMM_DRV_CALL_OK: + port_unlink(prt, from); + finalize_imm_drv_call(&try_call_state); + return ERTS_PORT_OP_DONE; + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_DROPPED; + default: + /* Schedule call instead... */ + break; + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = 0; + sigdp->u.unlink.from = from; + + return erts_schedule_proc2port_signal(c_p, + prt, + c_p ? c_p->common.id : from, + refp, + sigdp, + 0, + port_sig_unlink); +} + +static void +port_link_failure(Eterm port_id, Eterm linker) +{ + Process *rp; + ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; + ASSERT(is_internal_pid(linker)); + rp = erts_pid2proc(NULL, 0, linker, rp_locks); + if (rp) { + ErtsLink *rlnk = erts_remove_link(&ERTS_P_LINKS(rp), port_id); + if (rlnk) { + int xres = erts_send_exit_signal(NULL, + port_id, + rp, + &rp_locks, + am_noproc, + NIL, + NULL, + 0); + if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { + /* We didn't exit the process and it is traced */ + if (IS_TRACED_FL(rp, F_TRACE_PROCS)) + trace_proc(NULL, rp, am_getting_unlinked, port_id); + } + } + } +} + +static void +port_link(Port *prt, erts_aint32_t state, Eterm to) +{ + if (!(state & ERTS_PORT_SFLGS_INVALID_LOOKUP)) + erts_add_link(&ERTS_P_LINKS(prt), LINK_PID, to); + else + port_link_failure(prt->common.id, to); +} + +static int +port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) +{ + if (op == ERTS_PROC2PORT_SIG_EXEC) + port_link(prt, state, sigdp->u.link.to); + else + port_link_failure(sigdp->u.link.port, sigdp->u.link.to); + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true); + return ERTS_PORT_REDS_LINK; +} + +ErtsPortOpResult +erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) +{ + ErtsProc2PortSigData *sigdp; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + !refp, + am_link); + + switch (try_imm_drv_call(&try_call_state)) { + case ERTS_TRY_IMM_DRV_CALL_OK: + port_link(prt, try_call_state.state, to); + finalize_imm_drv_call(&try_call_state); + return ERTS_PORT_OP_DONE; + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_BADARG; + default: + /* Schedule call instead... */ + break; + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = 0; + sigdp->u.link.port = prt->common.id; + sigdp->u.link.to = to; + + return erts_schedule_proc2port_signal(c_p, + prt, + c_p ? c_p->common.id : to, + refp, + sigdp, + 0, + port_sig_link); +} + +void erts_init_io(int port_tab_size, + int port_tab_size_ignore_files) +{ + ErlDrvEntry** dp; + erts_smp_rwmtx_opt_t drv_list_rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER; + drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ; + drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED; + +#ifdef ERTS_SMP + init_xports_list_alloc(); +#endif + + pdl_init(); + + if (!port_tab_size_ignore_files) { + int max_files = sys_max_files(); + if (port_tab_size < max_files) + port_tab_size = max_files; + } + + if (port_tab_size > ERTS_MAX_PORTS) + port_tab_size = ERTS_MAX_PORTS; + else if (port_tab_size < ERTS_MIN_PORTS) + port_tab_size = ERTS_MIN_PORTS; + + erts_smp_rwmtx_init_opt(&erts_driver_list_lock, + &drv_list_rwmtx_opts, + "driver_list"); + driver_list = NULL; + erts_smp_tsd_key_create(&driver_list_lock_status_key); + erts_smp_tsd_key_create(&driver_list_last_error_key); + + erts_ptab_init_table(&erts_port, + ERTS_ALC_T_PORT_TABLE, + NULL, + (ErtsPTabElementCommon *) &erts_invalid_port.common, + port_tab_size, + "port_table"); + + erts_smp_atomic_init_nob(&erts_bytes_out, 0); + erts_smp_atomic_init_nob(&erts_bytes_in, 0); + + sys_init_io(); + + erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1); + erts_smp_rwmtx_rwlock(&erts_driver_list_lock); + + init_driver(&fd_driver, &fd_driver_entry, NULL); + init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); + init_driver(&spawn_driver, &spawn_driver_entry, NULL); + for (dp = driver_tab; *dp != NULL; dp++) + erts_add_driver_entry(*dp, NULL, 1); + + erts_smp_tsd_set(driver_list_lock_status_key, NULL); + erts_smp_rwmtx_rwunlock(&erts_driver_list_lock); +} + +/* + * Buffering of data when using line oriented I/O on ports + */ + +/* + * Buffer states + */ +#define LINEBUF_MAIN 0 +#define LINEBUF_FULL 1 +#define LINEBUF_CR_INSIDE 2 +#define LINEBUF_CR_AFTER 3 + +/* + * Creates a LineBuf to be added to the port structure, + * Returns: Pointer to a newly allocated and initialized LineBuf. + * Parameters: + * bufsiz - The (maximum) size of the line buffer. + */ +LineBuf *allocate_linebuf(bufsiz) +int bufsiz; +{ + int ovsiz = (bufsiz < LINEBUF_INITIAL) ? bufsiz : LINEBUF_INITIAL; + LineBuf *lb = (LineBuf *) erts_alloc(ERTS_ALC_T_LINEBUF, + sizeof(LineBuf)+ovsiz); + lb->ovsiz = ovsiz; + lb->bufsiz = bufsiz; + lb->ovlen = 0; + lb->data[0] = LINEBUF_MAIN; /* state */ + return lb; +} /* * Initializes a LineBufContext to be used in calls to read_linebuf @@ -1526,16 +2771,19 @@ deliver_result(Eterm sender, Eterm pid, Eterm res) ErlOffHeap *ohp; Eterm* hp; Uint sz_res; - sz_res = size_object(res); - hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks); - res = copy_struct(res, sz_res, &hp, ohp); - tuple = TUPLE2(hp, sender, res); + + sz_res = size_object(res); + hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks); + res = copy_struct(res, sz_res, &hp, ohp); + tuple = TUPLE2(hp, sender, res); erts_queue_message(rp, &rp_locks, bp, tuple, NIL #ifdef USE_VM_PROBES , NIL #endif ); - erts_smp_proc_unlock(rp, rp_locks); + + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) erts_smp_proc_dec_refc(rp); @@ -2055,344 +3303,1228 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) erts_destroy_link(lnk); } -/* 'from' is sending 'this_port' an exit signal, (this_port must be internal). - * If reason is normal we don't do anything, *unless* from is our connected - * process in which case we close the port. Any other reason kills the port. - * If 'from' is ourself we always die. - * When a driver has data in ioq then driver will be set to closing - * and become inaccessible to the processes. One exception exists and - * that is to kill a port till reason kill. Then the port is stopped. - * - */ -void -erts_do_exit_port(Port *p, Eterm from, Eterm reason) +/* 'from' is sending 'this_port' an exit signal, (this_port must be internal). + * If reason is normal we don't do anything, *unless* from is our connected + * process in which case we close the port. Any other reason kills the port. + * If 'from' is ourself we always die. + * When a driver has data in ioq then driver will be set to closing + * and become inaccessible to the processes. One exception exists and + * that is to kill a port till reason kill. Then the port is stopped. + * + */ + +int +erts_deliver_port_exit(Port *p, Eterm from, Eterm reason, int send_closed) +{ + ErtsLink *lnk; + Eterm rreason; + erts_aint32_t state; + + ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); + + rreason = (reason == am_kill) ? am_killed : reason; + +#ifdef USE_VM_PROBES + if (DTRACE_ENABLED(port_exit)) { + DTRACE_CHARBUF(from_str, DTRACE_TERM_BUF_SIZE); + DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); + DTRACE_CHARBUF(rreason_str, 64); + + erts_snprintf(from_str, sizeof(from_str), "%T", from); + dtrace_port_str(p, port_str); + erts_snprintf(rreason_str, sizeof(rreason_str), "%T", rreason); + DTRACE4(port_exit, from_str, port_str, p->name, rreason_str); + } +#endif + + state = erts_atomic32_read_nob(&p->state); + if (state & (ERTS_PORT_SFLGS_DEAD + | ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_CLOSING)) + return 0; + + if (reason == am_normal && from != ERTS_PORT_GET_CONNECTED(p) && from != p->common.id) + return 0; + + if (send_closed) + erts_atomic32_read_bor_relb(&p->state, + ERTS_PORT_SFLG_SEND_CLOSED); + + if (IS_TRACED_FL(p, F_TRACE_PORTS)) { + trace_port(p, am_closed, reason); + } + + erts_trace_check_exiting(p->common.id); + + /* + * Setting the port to not busy here, frees the list of pending + * processes and makes them runnable. + */ + set_busy_port((ErlDrvPort) p, 0); + + if (p->common.u.alive.reg != NULL) + (void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name); + + state = erts_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); + + { + SweepContext sc = {p->common.id, rreason}; + lnk = ERTS_P_LINKS(p); + ERTS_P_LINKS(p) = NULL; + erts_sweep_links(lnk, &sweep_one_link, &sc); + } + DRV_MONITOR_LOCK_PDL(p); + { + ErtsMonitor *moni = ERTS_P_MONITORS(p); + ERTS_P_MONITORS(p) = NULL; + erts_sweep_monitors(moni, &sweep_one_monitor, NULL); + } + DRV_MONITOR_UNLOCK_PDL(p); + + if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) { + erts_do_net_exits(p->dist_entry, rreason); + erts_deref_dist_entry(p->dist_entry); + p->dist_entry = NULL; + erts_atomic32_read_band_relb(&p->state, + ~ERTS_PORT_SFLG_DISTRIBUTION); + } + + if ((reason != am_kill) && !is_port_ioq_empty(p)) { + /* must turn exiting flag off */ + erts_atomic32_read_bset_relb(&p->state, + (ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_CLOSING), + ERTS_PORT_SFLG_CLOSING); + flush_port(p); + } + else { + terminate_port(p); + } + + return 1; +} + +/* About the states ERTS_PORT_SFLG_EXITING and ERTS_PORT_SFLG_CLOSING used above. +** +** ERTS_PORT_SFLG_EXITING is a recursion protection for erts_deliver_port_exit(). +** It is unclear whether this state is necessary or not, it might be possible +** to merge it with ERTS_PORT_SFLG_CLOSING. ERTS_PORT_SFLG_EXITING only persists +** over a section of sequential (but highly recursive) code. +** +** ERTS_PORT_SFLG_CLOSING is a state where the port is in Limbo, waiting to +** pass on. All links are removed, and the port receives in/out-put events so +** as soon as the port queue gets empty terminate_port() is called. +*/ + + + +/* Command should be of the form +** {PID, close} +** {PID, {command, io-list}} +** {PID, {connect, New_PID}} +*/ +ErtsPortOpResult +erts_port_command(Process *c_p, + int flags, + Port *port, + Eterm command, + Eterm *refp) +{ + Eterm *tp; + + ASSERT(port); + + flags |= ERTS_PORT_SIG_FLG_BANG_OP; + + if (is_tuple_arity(command, 2)) { + Eterm cntd; + tp = tuple_val(command); + cntd = tp[1]; + if (is_internal_pid(cntd)) { + if (tp[2] == am_close) { + if (!erts_port_synchronous_ops) + refp = NULL; + flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND; + return erts_port_exit(c_p, flags, port, cntd, am_normal, refp); + } else if (is_tuple_arity(tp[2], 2)) { + tp = tuple_val(tp[2]); + if (tp[1] == am_command) { + if (!(flags & ERTS_PORT_SIG_FLG_NOSUSPEND) + && !erts_port_synchronous_ops) + refp = NULL; + return erts_port_output(c_p, flags, port, cntd, tp[2], refp); + } + else if (tp[1] == am_connect) { + if (!erts_port_synchronous_ops) + refp = NULL; + flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND; + return erts_port_connect(c_p, flags, port, cntd, tp[2], refp); + } + } + } + } + + /* badsig */ + if (!erts_port_synchronous_ops) + refp = NULL; + flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND; + return bad_port_signal(c_p, flags, port, c_p->common.id, refp, am_command); +} + +static ERTS_INLINE ErtsPortOpResult +call_driver_control(Eterm caller, + Port *prt, + unsigned int command, + char *bufp, + ErlDrvSizeT size, + char **resp_bufp, + ErlDrvSizeT *from_size) +{ + ErlDrvSSizeT cres; + + if (!prt->drv_ptr->control) + return ERTS_PORT_OP_BADARG; + + +#ifdef USE_VM_PROBES + if (DTRACE_ENABLED(port_control) || DTRACE_ENABLED(driver_control)) { + DTRACE_FORMAT_COMMON_PID_AND_PORT(caller, prt); + DTRACE4(port_control, process_str, port_str, prt->name, command); + DTRACE5(driver_control, process_str, port_str, prt->name, + command, size); + } +#endif + + prt->caller = caller; + cres = prt->drv_ptr->control((ErlDrvData) prt->drv_data, + command, + bufp, + size, + resp_bufp, + *from_size); + prt->caller = NIL; + + if (cres < 0) + return ERTS_PORT_OP_BADARG; + + *from_size = (ErlDrvSizeT) cres; + + return ERTS_PORT_OP_DONE; +} + +static void +cleanup_scheduled_control(Binary *binp, char *bufp) +{ + if (binp) { + if (erts_refc_dectest(&binp->refc, 0) == 0) + erts_bin_free(binp); + } + else { + if (bufp) + erts_free(ERTS_ALC_T_DRV_CTRL_DATA, bufp); + } +} + + +static ERTS_INLINE Uint +port_control_result_size(int control_flags, + char *resp_bufp, + ErlDrvSizeT *resp_size, + char *pre_alloc_buf) +{ + if (!resp_bufp) + return (Uint) 0; + + if (control_flags & PORT_CONTROL_FLAG_BINARY) { + if (resp_bufp != pre_alloc_buf) { + ErlDrvBinary *dbin = (ErlDrvBinary *) resp_bufp; + *resp_size = dbin->orig_size; + if (*resp_size > ERL_ONHEAP_BIN_LIMIT) + return PROC_BIN_SIZE; + } + ASSERT(*resp_size <= ERL_ONHEAP_BIN_LIMIT); + return (Uint) heap_bin_size((*resp_size)); + } + + return (Uint) 2*(*resp_size); +} + +static ERTS_INLINE Eterm +write_port_control_result(int control_flags, + char *resp_bufp, + ErlDrvSizeT resp_size, + char *pre_alloc_buf, + Eterm **hpp, + ErlHeapFragment *bp, + ErlOffHeap *ohp) +{ + Eterm res; + if (!resp_bufp) + return NIL; + if (control_flags & PORT_CONTROL_FLAG_BINARY) { + /* Binary result */ + ErlDrvBinary *dbin; + ErlHeapBin *hbin; + + if (resp_bufp == pre_alloc_buf) + dbin = NULL; + else { + dbin = (ErlDrvBinary *) resp_bufp; + if (dbin->orig_size > ERL_ONHEAP_BIN_LIMIT) { + ProcBin* pb = (ProcBin *) *hpp; + *hpp += PROC_BIN_SIZE; + pb->thing_word = HEADER_PROC_BIN; + pb->size = dbin->orig_size; + pb->next = ohp->first; + ohp->first = (struct erl_off_heap_header *) pb; + pb->val = ErlDrvBinary2Binary(dbin); + pb->bytes = (byte*) dbin->orig_bytes; + pb->flags = 0; + OH_OVERHEAD(ohp, dbin->orig_size / sizeof(Eterm)); + return make_binary(pb); + } + resp_bufp = dbin->orig_bytes; + resp_size = dbin->orig_size; + } + + hbin = (ErlHeapBin *) *hpp; + *hpp += heap_bin_size(resp_size); + ASSERT(resp_size <= ERL_ONHEAP_BIN_LIMIT); + hbin->thing_word = header_heap_bin(resp_size); + hbin->size = resp_size; + sys_memcpy(hbin->data, resp_bufp, resp_size); + if (dbin) + driver_free_binary(dbin); + return make_binary(hbin); + } + + /* List result */ + res = buf_to_intlist(hpp, resp_bufp, resp_size, NIL); + if (resp_bufp != pre_alloc_buf) + driver_free(resp_bufp); + return res; +} + +static int +port_sig_control(Port *prt, + erts_aint32_t state, + int op, + ErtsProc2PortSigData *sigdp) +{ + ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY); + + if (op == ERTS_PROC2PORT_SIG_EXEC) { + char resp_buf[ERL_ONHEAP_BIN_LIMIT]; + ErlDrvSizeT resp_size = sizeof(resp_buf); + char *resp_bufp = &resp_buf[0]; + ErtsPortOpResult res; + + res = call_driver_control(sigdp->caller, + prt, + sigdp->u.control.command, + sigdp->u.control.bufp, + sigdp->u.control.size, + &resp_bufp, + &resp_size); + + if (res == ERTS_PORT_OP_DONE) { + Eterm msg; + Eterm *hp, *hp_start; + ErlHeapFragment *bp; + ErlOffHeap *ohp; + Process *rp; + ErtsProcLocks rp_locks = 0; + Uint hsz; + int control_flags; + + rp = erts_proc_lookup_raw(sigdp->caller); + if (!rp) + goto done; + + control_flags = prt->control_flags; + + hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; + hsz += port_control_result_size(control_flags, + resp_bufp, + &resp_size, + &resp_buf[0]); + + hp_start = hp = erts_alloc_message_heap(hsz, + &bp, + &ohp, + rp, + &rp_locks); + + msg = write_port_control_result(control_flags, + resp_bufp, + resp_size, + &resp_buf[0], + &hp, + bp, + ohp); + + queue_port_sched_op_reply(rp, + &rp_locks, + hp_start, + hp, + hsz, + bp, + sigdp->ref, + msg); + + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + goto done; + } + } + + /* failure */ + + port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); + +done: + + cleanup_scheduled_control(sigdp->u.control.binp, + sigdp->u.control.bufp); + + return ERTS_PORT_REDS_CONTROL; +} + + +ErtsPortOpResult +erts_port_control(Process* c_p, + Port *prt, + unsigned int command, + Eterm data, + Eterm *retvalp) +{ + ErtsPortOpResult res; + char *bufp = NULL; + ErlDrvSizeT size = 0; + int try_call; + int tmp_alloced = 0; + erts_aint32_t sched_flags; + Binary *binp; + int copy; + ErtsProc2PortSigData *sigdp; + + sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + if (sched_flags & ERTS_PTS_FLG_EXIT) + return ERTS_PORT_OP_BADARG; + + try_call = !(sched_flags & ERTS_PTS_FLGS_FORCE_SCHEDULE_OP); + + if (is_binary(data) && binary_bitoffset(data) == 0) { + byte *bytep; + ERTS_DECLARE_DUMMY(Uint bitoffs); + ERTS_DECLARE_DUMMY(Uint bitsize); + ERTS_GET_BINARY_BYTES(data, bytep, bitoffs, bitsize); + bufp = (char *) bytep; + size = binary_size(data); + } else { + int r; + + if (!try_call) { + if (erts_iolist_size(data, &size)) + return ERTS_PORT_OP_BADARG; + bufp = erts_alloc(ERTS_ALC_T_DRV_CTRL_DATA, size); + r = erts_iolist_to_buf(data, bufp, size); + ASSERT(r == 0); + } + else { + /* Try with an 8KB buffer first (will often be enough I guess). */ + size = 8*1024; + bufp = erts_alloc(ERTS_ALC_T_TMP, size); + tmp_alloced = 1; + + r = erts_iolist_to_buf(data, bufp, size); + if (ERTS_IOLIST_TO_BUF_SUCCEEDED(r)) { + size -= r; + } else { + if (r == ERTS_IOLIST_TO_BUF_TYPE_ERROR) { /* Type error */ + erts_free(ERTS_ALC_T_TMP, bufp); + return ERTS_PORT_OP_BADARG; + } + else { + ASSERT(r == ERTS_IOLIST_TO_BUF_OVERFLOW); /* Overflow */ + erts_free(ERTS_ALC_T_TMP, bufp); + if (erts_iolist_size(data, &size)) + return ERTS_PORT_OP_BADARG; /* Type error */ + } + bufp = erts_alloc(ERTS_ALC_T_TMP, size); + r = erts_iolist_to_buf(data, bufp, size); + ASSERT(r == 0); + } + } + } + + if (try_call) { + char resp_buf[ERL_ONHEAP_BIN_LIMIT]; + char* resp_bufp = &resp_buf[0]; + ErlDrvSizeT resp_size = sizeof(resp_buf); + ErtsTryImmDrvCallResult try_call_res; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + 0, + am_control); + + try_call_res = try_imm_drv_call(&try_call_state); + switch (try_call_res) { + case ERTS_TRY_IMM_DRV_CALL_OK: { + Eterm *hp; + Uint hsz; + int control_flags; + + res = call_driver_control(c_p->common.id, + prt, + command, + bufp, + size, + &resp_bufp, + &resp_size); + finalize_imm_drv_call(&try_call_state); + if (tmp_alloced) + erts_free(ERTS_ALC_T_TMP, bufp); + if (res == ERTS_PORT_OP_BADARG) { + return ERTS_PORT_OP_BADARG; + } + + control_flags = prt->control_flags; + + hsz = port_control_result_size(control_flags, + resp_bufp, + &resp_size, + &resp_buf[0]); + hp = HAlloc(c_p, hsz); + *retvalp = write_port_control_result(control_flags, + resp_bufp, + resp_size, + &resp_buf[0], + &hp, + NULL, + &c_p->off_heap); + return ERTS_PORT_OP_DONE; + } + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + if (tmp_alloced) + erts_free(ERTS_ALC_T_TMP, bufp); + return ERTS_PORT_OP_BADARG; + default: + /* Schedule control() call instead... */ + break; + } + } + + /* Convert data into something that can be scheduled */ + + copy = tmp_alloced; + + binp = NULL; + + if (is_binary(data) && binary_bitoffset(data) == 0) { + Eterm *ebinp = binary_val_rel(data, NULL); + ASSERT(!tmp_alloced); + if (*ebinp == HEADER_SUB_BIN) + ebinp = binary_val_rel(((ErlSubBin *) ebinp)->orig, NULL); + if (*ebinp != HEADER_PROC_BIN) + copy = 1; + else { + binp = ((ProcBin *) ebinp)->val; + ASSERT(bufp < bufp + size); + ASSERT(binp->orig_bytes <= bufp + && bufp + size <= binp->orig_bytes + binp->orig_size); + erts_refc_inc(&binp->refc, 1); + } + } + + if (copy) { + char *old_bufp = bufp; + bufp = erts_alloc(ERTS_ALC_T_DRV_CTRL_DATA, size); + sys_memcpy(bufp, old_bufp, size); + if (tmp_alloced) + erts_free(ERTS_ALC_T_TMP, old_bufp); + } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = 0; + sigdp->u.control.binp = binp; + sigdp->u.control.command = command; + sigdp->u.control.bufp = bufp; + sigdp->u.control.size = size; + + res = erts_schedule_proc2port_signal(c_p, + prt, + c_p->common.id, + retvalp, + sigdp, + 0, + port_sig_control); + if (res != ERTS_PORT_OP_SCHEDULED) { + cleanup_scheduled_control(binp, bufp); + return ERTS_PORT_OP_BADARG; + } + return res; +} + +static ERTS_INLINE ErtsPortOpResult +call_driver_call(Eterm caller, + Port *prt, + unsigned int command, + char *bufp, + ErlDrvSizeT size, + char **resp_bufp, + ErlDrvSizeT *from_size, + unsigned *ret_flagsp) { - ErtsLink *lnk; - Eterm rreason; - erts_aint32_t state; - - ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); + ErlDrvSSizeT cres; - rreason = (reason == am_kill) ? am_killed : reason; + if (!prt->drv_ptr->call) + return ERTS_PORT_OP_BADARG; #ifdef USE_VM_PROBES - if (DTRACE_ENABLED(port_exit)) { - DTRACE_CHARBUF(from_str, DTRACE_TERM_BUF_SIZE); - DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); - DTRACE_CHARBUF(rreason_str, 64); + if (DTRACE_ENABLED(driver_call)) { + DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); + DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); - erts_snprintf(from_str, sizeof(from_str), "%T", from); - dtrace_port_str(p, port_str); - erts_snprintf(rreason_str, sizeof(rreason_str), "%T", rreason); - DTRACE4(port_exit, from_str, port_str, p->name, rreason_str); - } + dtrace_pid_str(caller, process_str); + dtrace_port_str(prt, port_str); + DTRACE5(driver_call, process_str, port_str, prt->name, command, size); + } #endif - state = erts_atomic32_read_nob(&p->state); - if ((state & (ERTS_PORT_SFLGS_DEAD|ERTS_PORT_SFLG_EXITING)) - || ((reason == am_normal) && - ((from != ERTS_PORT_GET_CONNECTED(p)) && (from != p->common.id)))) { - return; - } + prt->caller = caller; + cres = prt->drv_ptr->call((ErlDrvData) prt->drv_data, + command, + bufp, + size, + resp_bufp, + *from_size, + ret_flagsp); + prt->caller = NIL; - if (IS_TRACED_FL(p, F_TRACE_PORTS)) { - trace_port(p, am_closed, reason); - } + if (cres <= 0 + || ((byte) (*resp_bufp)[0]) != VERSION_MAGIC) + return ERTS_PORT_OP_BADARG; - erts_trace_check_exiting(p->common.id); + *from_size = (ErlDrvSizeT) cres; - /* - * Setting the port to not busy here, frees the list of pending - * processes and makes them runnable. - */ - set_busy_port((ErlDrvPort) p, 0); + return ERTS_PORT_OP_DONE; +} - if (p->common.u.alive.reg != NULL) - (void) erts_unregister_name(NULL, 0, p, p->common.u.alive.reg->name); - state = erts_atomic32_read_bor_relb(&p->state, ERTS_PORT_SFLG_EXITING); +static +void cleanup_scheduled_call(char *bufp) +{ + if (bufp) + erts_free(ERTS_ALC_T_DRV_CALL_DATA, bufp); +} - { - SweepContext sc = {p->common.id, rreason}; - lnk = ERTS_P_LINKS(p); - ERTS_P_LINKS(p) = NULL; - erts_sweep_links(lnk, &sweep_one_link, &sc); - } - DRV_MONITOR_LOCK_PDL(p); - { - ErtsMonitor *moni = ERTS_P_MONITORS(p); - ERTS_P_MONITORS(p) = NULL; - erts_sweep_monitors(moni, &sweep_one_monitor, NULL); - } - DRV_MONITOR_UNLOCK_PDL(p); +static int +port_sig_call(Port *prt, + erts_aint32_t state, + int op, + ErtsProc2PortSigData *sigdp) +{ + char resp_buf[256]; + ErlDrvSizeT resp_size = sizeof(resp_buf); + char *resp_bufp = &resp_buf[0]; + unsigned ret_flags = 0U; + + + ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY); + + if (op == ERTS_PROC2PORT_SIG_EXEC) { + ErtsPortOpResult res; + + res = call_driver_call(sigdp->caller, + prt, + sigdp->u.call.command, + sigdp->u.call.bufp, + sigdp->u.call.size, + &resp_bufp, + &resp_size, + &ret_flags); + + if (res == ERTS_PORT_OP_DONE) { + Eterm msg; + Eterm *hp; + ErlHeapFragment *bp; + ErlOffHeap *ohp; + Process *rp; + ErtsProcLocks rp_locks = 0; + Uint hsz; + + rp = erts_proc_lookup_raw(sigdp->caller); + if (!rp) + goto done; - if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && p->dist_entry) { - erts_do_net_exits(p->dist_entry, rreason); - erts_deref_dist_entry(p->dist_entry); - p->dist_entry = NULL; - erts_atomic32_read_band_relb(&p->state, - ~ERTS_PORT_SFLG_DISTRIBUTION); - } - - if ((reason != am_kill) && !is_port_ioq_empty(p)) { - /* must turn exiting flag off */ - erts_atomic32_read_bset_relb(&p->state, - (ERTS_PORT_SFLG_EXITING - | ERTS_PORT_SFLG_CLOSING), - ERTS_PORT_SFLG_CLOSING); - flush_port(p); - } - else { - terminate_port(p); - } -} + hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size); + if (hsz >= 0) { + Eterm *hp_start; + byte *endp; + + hsz += 3; /* ok tuple */ + hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; + + hp_start = hp = erts_alloc_message_heap(hsz, + &bp, + &ohp, + rp, + &rp_locks); + endp = (byte *) resp_bufp; + msg = erts_decode_ext(&hp, ohp, &endp); + if (is_value(msg)) { + msg = TUPLE2(hp, am_ok, msg); + hp += 3; + + queue_port_sched_op_reply(rp, + &rp_locks, + hp_start, + hp, + hsz, + bp, + sigdp->ref, + msg); + + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + goto done; + } + if (bp) + free_message_buffer(bp); + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } + } + } -/* About the states ERTS_PORT_SFLG_EXITING and ERTS_PORT_SFLG_CLOSING used above. -** -** ERTS_PORT_SFLG_EXITING is a recursion protection for erts_do_exit_port(). -** It is unclear whether this state is necessary or not, it might be possible -** to merge it with ERTS_PORT_SFLG_CLOSING. ERTS_PORT_SFLG_EXITING only persists -** over a section of sequential (but highly recursive) code. -** -** ERTS_PORT_SFLG_CLOSING is a state where the port is in Limbo, waiting to -** pass on. All links are removed, and the port receives in/out-put events so -** as soon as the port queue gets empty terminate_port() is called. -*/ + port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); +done: + if (resp_bufp != &resp_buf[0] && !(ret_flags & DRIVER_CALL_KEEP_BUFFER)) + driver_free(resp_bufp); -/* Command should be of the form -** {PID, close} -** {PID, {command, io-list}} -** {PID, {connect, New_PID}} -** -** -*/ -void erts_port_command(Process *proc, - Eterm caller_id, - Port *port, - Eterm command) + cleanup_scheduled_call(sigdp->u.call.bufp); + + return ERTS_PORT_REDS_CALL; +} + + +ErtsPortOpResult +erts_port_call(Process* c_p, + Port *prt, + unsigned int command, + Eterm data, + Eterm *retvalp) { - Eterm *tp; - Eterm pid; + ErtsPortOpResult res; + char input_buf[256]; + char *bufp; + byte *endp; + ErlDrvSizeT size; + int try_call; + erts_aint32_t sched_flags; + ErtsProc2PortSigData *sigdp; - if (!port) - return; + sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + if (sched_flags & ERTS_PTS_FLG_EXIT) { + return ERTS_PORT_OP_BADARG; + } - erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_MAIN); - ERTS_SMP_CHK_NO_PROC_LOCKS; - ASSERT(!INVALID_PORT(port, port->common.id)); + try_call = !(sched_flags & ERTS_PTS_FLGS_FORCE_SCHEDULE_OP); - if (is_tuple_arity(command, 2)) { - tp = tuple_val(command); - if ((pid = ERTS_PORT_GET_CONNECTED(port)) == tp[1]) { - /* PID must be connected */ - if (tp[2] == am_close) { - erts_atomic32_read_bor_relb(&port->state, - ERTS_PORT_SFLG_SEND_CLOSED); - erts_do_exit_port(port, pid, am_normal); + size = erts_encode_ext_size(data); -#ifdef USE_VM_PROBES - if(DTRACE_ENABLED(port_command)) { - DTRACE_FORMAT_COMMON_PROC_AND_PORT(proc, port) - DTRACE4(port_command, process_str, port_str, port->name, "close"); - } -#endif - goto done; - } else if (is_tuple_arity(tp[2], 2)) { - tp = tuple_val(tp[2]); - if (tp[1] == am_command) { - if (erts_write_to_port(caller_id, port, tp[2]) == 0) - goto done; - } else if ((tp[1] == am_connect) && is_internal_pid(tp[2])) { -#ifdef USE_VM_PROBES - if(DTRACE_ENABLED(port_command)) { - DTRACE_FORMAT_COMMON_PROC_AND_PORT(proc, port) - DTRACE4(port_command, process_str, port_str, port->name, "connect"); - } -#endif - ERTS_PORT_SET_CONNECTED_RELB(port, tp[2]); - deliver_result(port->common.id, pid, am_connected); - goto done; - } - } + if (!try_call) + bufp = erts_alloc(ERTS_ALC_T_DRV_CALL_DATA, size); + else if (size <= sizeof(input_buf)) + bufp = &input_buf[0]; + else + bufp = erts_alloc(ERTS_ALC_T_TMP, size); + + endp = (byte *) bufp; + erts_encode_ext(data, &endp); + + if (endp - (byte *) bufp > size) + ERTS_INTERNAL_ERROR("erts_internal:port_call() - Buffer overflow"); + + size = endp - (byte *) bufp; + + if (try_call) { + char resp_buf[255]; + char* resp_bufp = &resp_buf[0]; + ErlDrvSizeT resp_size = sizeof(resp_buf); + ErtsTryImmDrvCallResult try_call_res; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + 0, + am_call); + + try_call_res = try_imm_drv_call(&try_call_state); + switch (try_call_res) { + case ERTS_TRY_IMM_DRV_CALL_OK: { + Eterm *hp, *hp_end; + Uint hsz; + unsigned ret_flags = 0U; + Eterm term; + + res = call_driver_call(c_p->common.id, + prt, + command, + bufp, + size, + &resp_bufp, + &resp_size, + &ret_flags); + + finalize_imm_drv_call(&try_call_state); + if (bufp != &input_buf[0]) + erts_free(ERTS_ALC_T_TMP, bufp); + if (res == ERTS_PORT_OP_BADARG) + return ERTS_PORT_OP_BADARG; + hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size); + if (hsz < 0) + return ERTS_PORT_OP_BADARG; + hsz += 3; + hp = HAlloc(c_p, hsz); + hp_end = hp + hsz; + endp = (byte *) resp_bufp; + term = erts_decode_ext(&hp, &MSO(c_p), &endp); + if (term == THE_NON_VALUE) + return ERTS_PORT_OP_BADARG; + *retvalp = TUPLE2(hp, am_ok, term); + hp += 3; + HRelease(c_p, hp_end, hp); + if (resp_buf != &resp_buf[0] + && !(ret_flags & DRIVER_CALL_KEEP_BUFFER)) + driver_free(resp_buf); + return ERTS_PORT_OP_DONE; + } + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + if (bufp != &input_buf[0]) + erts_free(ERTS_ALC_T_TMP, bufp); + return ERTS_PORT_OP_BADARG; + default: + /* Schedule call() call instead... */ + break; } } - { - ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - Process* rp = erts_pid2proc(NULL, 0, - ERTS_PORT_GET_CONNECTED(port), rp_locks); - if (rp) { - (void) erts_send_exit_signal(NULL, - port->common.id, - rp, - &rp_locks, - am_badsig, - NIL, - NULL, - 0); - erts_smp_proc_unlock(rp, rp_locks); - } + /* Convert data into something that can be scheduled */ + if (bufp == &input_buf[0] || try_call) { + char *new_bufp = erts_alloc(ERTS_ALC_T_DRV_CALL_DATA, size); + sys_memcpy(new_bufp, bufp, size); + if (bufp != &input_buf[0]) + erts_free(ERTS_ALC_T_TMP, bufp); + bufp = new_bufp; } - done: - erts_smp_proc_lock(proc, ERTS_PROC_LOCK_MAIN); -} -/* - * Control a port synchronously. - * Returns either a list or a binary. - */ -Eterm -erts_port_control(Process* p, Port* prt, Uint command, Eterm iolist) -{ - byte* to_port = NULL; /* Buffer to write to port. */ - /* Initialization is for shutting up - warning about use before set. */ - Uint to_len = 0; /* Length of buffer. */ - int must_free = 0; /* True if the buffer should be freed. */ - char port_result[ERL_ONHEAP_BIN_LIMIT]; /* Default buffer for result from port. */ - char* port_resp; /* Pointer to result buffer. */ - ErlDrvSSizeT n; - ErlDrvSSizeT (*control) - (ErlDrvData, unsigned, char*, ErlDrvSizeT, char**, ErlDrvSizeT); - int fpe_was_unmasked; + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = 0; + sigdp->u.call.command = command; + sigdp->u.call.bufp = bufp; + sigdp->u.call.size = size; + + res = erts_schedule_proc2port_signal(c_p, + prt, + c_p->common.id, + retvalp, + sigdp, + 0, + port_sig_call); + if (res != ERTS_PORT_OP_SCHEDULED) { + cleanup_scheduled_call(bufp); + return ERTS_PORT_OP_BADARG; + } + return res; +} - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); +static Eterm +make_port_info_term(Eterm **hpp_start, + Eterm **hpp, + Uint *hszp, + ErlHeapFragment **bpp, + Port *prt, + Eterm item) +{ + ErlOffHeap *ohp; - if ((control = prt->drv_ptr->control) == NULL) { - return THE_NON_VALUE; + if (is_value(item)) { + if (erts_bld_port_info(NULL, NULL, hszp, prt, item) == am_false) + return THE_NON_VALUE; + if (*hszp) { + *bpp = new_message_buffer(*hszp); + *hpp_start = *hpp = (*bpp)->mem; + ohp = &(*bpp)->off_heap; + } + else { + *bpp = NULL; + *hpp_start = *hpp = NULL; + ohp = NULL; + } + return erts_bld_port_info(hpp, ohp, NULL, prt, item); } + else { + int i; + int len; + int start; + static Eterm item[] = ERTS_PORT_INFO_1_ITEMS; + static Eterm value[sizeof(item)/sizeof(item[0])]; + + start = 0; + len = sizeof(item)/sizeof(item[0]); + + for (i = start; i < sizeof(item)/sizeof(item[0]); i++) { + ASSERT(is_atom(item[i])); + value[i] = erts_bld_port_info(NULL, NULL, hszp, prt, item[i]); + } - /* - * Convert the iolist to a buffer, pointed to by to_port, - * and with its length in to_len. - */ - if (is_binary(iolist) && binary_bitoffset(iolist) == 0) { - ERTS_DECLARE_DUMMY(Uint bitoffs); - ERTS_DECLARE_DUMMY(Uint bitsize); - ERTS_GET_BINARY_BYTES(iolist, to_port, bitoffs, bitsize); - to_len = binary_size(iolist); - } else { - int r; + if (value[0] == am_undefined) { + start++; + len--; + } - /* Try with an 8KB buffer first (will often be enough I guess). */ - to_len = 8*1024; - to_port = erts_alloc(ERTS_ALC_T_TMP, to_len); - must_free = 1; + erts_bld_list(NULL, hszp, len, &value[start]); - /* - * In versions before R10B, we used to reserve random - * amounts of extra memory. From R10B, we allocate the - * exact amount. - */ - r = io_list_to_buf(iolist, (char*) to_port, to_len); - if (r >= 0) { - to_len -= r; - } else if (r == -2) { /* Type error */ - erts_free(ERTS_ALC_T_TMP, (void *) to_port); - return THE_NON_VALUE; - } else { - ASSERT(r == -1); /* Overflow */ - erts_free(ERTS_ALC_T_TMP, (void *) to_port); - if (erts_iolist_size(iolist, &to_len)) { /* Type error */ - return THE_NON_VALUE; - } - must_free = 1; - to_port = erts_alloc(ERTS_ALC_T_TMP, to_len); - r = io_list_to_buf(iolist, (char*) to_port, to_len); - ASSERT(r == 0); + *bpp = new_message_buffer(*hszp); + *hpp_start = *hpp = (*bpp)->mem; + ohp = &(*bpp)->off_heap; + + for (i = start; i < sizeof(item)/sizeof(item[0]); i++) + value[i] = erts_bld_port_info(hpp, ohp, NULL, prt, item[i]); + + return erts_bld_list(hpp, NULL, len, &value[start]); + } +} + +static int +port_sig_info(Port *prt, + erts_aint32_t state, + int op, + ErtsProc2PortSigData *sigdp) +{ + ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY); + if (op != ERTS_PROC2PORT_SIG_EXEC) + port_sched_op_reply(sigdp->caller, sigdp->ref, am_undefined); + else { + Eterm *hp, *hp_start; + Uint hsz; + ErlHeapFragment *bp; + Eterm value; + Process *rp; + ErtsProcLocks rp_locks = 0; + + rp = erts_proc_lookup_raw(sigdp->caller); + if (!rp) + return ERTS_PORT_REDS_INFO; + + hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; + value = make_port_info_term(&hp_start, + &hp, + &hsz, + &bp, + prt, + sigdp->u.info.item); + if (is_value(value)) { + queue_port_sched_op_reply(rp, + &rp_locks, + hp_start, + hp, + hsz, + bp, + sigdp->ref, + value); + } + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } + return ERTS_PORT_REDS_INFO; +} + +ErtsPortOpResult +erts_port_info(Process* c_p, + Port *prt, + Eterm item, + Eterm *retvalp) +{ + ErtsProc2PortSigData *sigdp; + ErtsTryImmDrvCallResult try_call_res; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + 0, + am_info); + + try_call_res = try_imm_drv_call(&try_call_state); + switch (try_call_res) { + case ERTS_TRY_IMM_DRV_CALL_OK: { + Eterm *hp, *hp_start; + ErlHeapFragment *bp; + Uint hsz = 0; + Eterm value = make_port_info_term(&hp_start, &hp, &hsz, &bp, prt, item); + finalize_imm_drv_call(&try_call_state); + if (is_non_value(value)) + return ERTS_PORT_OP_BADARG; + else if (is_immed(value)) + *retvalp = value; + else { + Uint used_h_size = hp - hp_start; + hp = HAlloc(c_p, used_h_size); + *retvalp = copy_struct(value, used_h_size, &hp, &MSO(c_p)); + free_message_buffer(bp); } + return ERTS_PORT_OP_DONE; + } + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_DROPPED; + case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: + case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: + /* Schedule call instead... */ + break; } - prt->caller = p->common.id; /* Internal pid */ + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = 0; + sigdp->u.info.item = item; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN); - ERTS_SMP_CHK_NO_PROC_LOCKS; + return erts_schedule_proc2port_signal(c_p, + prt, + c_p->common.id, + retvalp, + sigdp, + 0, + port_sig_info); +} -#ifdef USE_VM_PROBES - if (DTRACE_ENABLED(port_control) || DTRACE_ENABLED(driver_control)) { - DTRACE_FORMAT_COMMON_PROC_AND_PORT(p, prt); - DTRACE4(port_control, process_str, port_str, prt->name, command); - DTRACE5(driver_control, process_str, port_str, prt->name, - command, to_len); - } -#endif +static int +port_sig_set_data(Port *prt, + erts_aint32_t state, + int op, + ErtsProc2PortSigData *sigdp) +{ + ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY); - /* - * Call the port's control routine. - */ + if (op == ERTS_PROC2PORT_SIG_EXEC) { + if (prt->bp) + free_message_buffer(prt->bp); + prt->bp = sigdp->u.set_data.bp; + prt->data = sigdp->u.set_data.data; + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true); + } + else { + if (sigdp->u.set_data.bp) + free_message_buffer(sigdp->u.set_data.bp); + port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); + } + return ERTS_PORT_REDS_SET_DATA; +} - port_resp = port_result; - fpe_was_unmasked = erts_block_fpe(); - n = control((ErlDrvData)prt->drv_data, command, (char*)to_port, to_len, - &port_resp, sizeof(port_result)); - erts_unblock_fpe(fpe_was_unmasked); - if (must_free) { - erts_free(ERTS_ALC_T_TMP, (void *) to_port); +ErtsPortOpResult +erts_port_set_data(Process* c_p, + Port *prt, + Eterm data, + Eterm *refp) +{ + ErtsPortOpResult res; + Eterm set_data; + ErlHeapFragment *bp; + ErtsProc2PortSigData *sigdp; + ErtsTryImmDrvCallResult try_call_res; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + !refp, + am_set_data); + + if (is_immed(data)) { + set_data = data; + bp = NULL; + } + else { + Eterm *hp; + Uint sz = size_object(data); + bp = new_message_buffer(sz); + hp = bp->mem; + set_data = copy_struct(data, sz, &hp, &bp->off_heap); + } + + try_call_res = try_imm_drv_call(&try_call_state); + switch (try_call_res) { + case ERTS_TRY_IMM_DRV_CALL_OK: + if (prt->bp) + free_message_buffer(prt->bp); + prt->bp = bp; + prt->data = set_data; + finalize_imm_drv_call(&try_call_state); + return ERTS_PORT_OP_DONE; + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_DROPPED; + case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: + case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: + /* Schedule call instead... */ + break; } - prt->caller = NIL; -#ifdef ERTS_SMP - if (prt->xports) - erts_smp_xports_unlock(prt); - ASSERT(!prt->xports); -#endif - erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN); - /* - * Handle the result. - */ + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = 0; + sigdp->u.set_data.data = set_data; + sigdp->u.set_data.bp = bp; + + res = erts_schedule_proc2port_signal(c_p, + prt, + c_p->common.id, + refp, + sigdp, + 0, + port_sig_set_data); + if (res != ERTS_PORT_OP_SCHEDULED && bp) + free_message_buffer(bp); + return res; +} - if (n < 0) { - return THE_NON_VALUE; - } +static int +port_sig_get_data(Port *prt, + erts_aint32_t state, + int op, + ErtsProc2PortSigData *sigdp) +{ + ASSERT(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY); + if (op != ERTS_PROC2PORT_SIG_EXEC) + port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); + else { + Process *rp; + ErtsProcLocks rp_locks = 0; - if ((prt->control_flags & PORT_CONTROL_FLAG_BINARY) == 0) { /* List result */ - Eterm ret; - Eterm* hp = HAlloc(p, 2*n); - ret = buf_to_intlist(&hp, port_resp, n, NIL); - if (port_resp != port_result) { - driver_free(port_resp); + rp = erts_proc_lookup_raw(sigdp->caller); + if (rp) { + Uint hsz; + Eterm *hp, *hp_start; + Eterm data, msg; + ErlHeapFragment *bp; + ErlOffHeap *ohp; + + hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; + hsz += 3; + if (prt->bp) + hsz += prt->bp->used_size; + + hp_start = hp = erts_alloc_message_heap(hsz, + &bp, + &ohp, + rp, + &rp_locks); + + if (is_immed(prt->data)) + data = prt->data; + else + data = copy_struct(prt->data, + prt->bp->used_size, + &hp, + &bp->off_heap); + + + + msg = TUPLE2(hp, am_ok, data); + hp += 3; + + queue_port_sched_op_reply(rp, + &rp_locks, + hp_start, + hp, + hsz, + bp, + sigdp->ref, + msg); + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); } - return ret; - } - else if (port_resp == NULL) { - return NIL; - } - else { /* Binary result */ - ErlDrvBinary *dbin; - ErlHeapBin *hbin; - if (port_resp != port_result) { - dbin = (ErlDrvBinary *) port_resp; - if (dbin->orig_size > ERL_ONHEAP_BIN_LIMIT) { - ProcBin* pb = (ProcBin *) HAlloc(p, PROC_BIN_SIZE); - pb->thing_word = HEADER_PROC_BIN; - pb->size = dbin->orig_size; - pb->next = MSO(p).first; - MSO(p).first = (struct erl_off_heap_header*)pb; - pb->val = ErlDrvBinary2Binary(dbin); - pb->bytes = (byte*) dbin->orig_bytes; - pb->flags = 0; - OH_OVERHEAD(&(MSO(p)), dbin->orig_size / sizeof(Eterm)); - return make_binary(pb); - } - port_resp = dbin->orig_bytes; - n = dbin->orig_size; - } else { - dbin = NULL; + } + return ERTS_PORT_REDS_GET_DATA; +} + +ErtsPortOpResult +erts_port_get_data(Process* c_p, + Port *prt, + Eterm *retvalp) +{ + ErtsProc2PortSigData *sigdp; + ErtsTryImmDrvCallResult try_call_res; + ErtsTryImmDrvCallState try_call_state + = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( + c_p, + prt, + ERTS_PORT_SFLGS_INVALID_LOOKUP, + 0, + 0, + am_get_data); + + try_call_res = try_imm_drv_call(&try_call_state); + switch (try_call_res) { + case ERTS_TRY_IMM_DRV_CALL_OK: { + Eterm *hp; + Eterm data; + ErlHeapFragment *bp; + Uint sz; + if (is_immed(prt->data)) { + bp = NULL; + data = prt->data; } - hbin = (ErlHeapBin*) HAlloc(p, heap_bin_size(n)); - ASSERT(n <= ERL_ONHEAP_BIN_LIMIT); - hbin->thing_word = header_heap_bin(n); - hbin->size = n; - sys_memcpy(hbin->data, port_resp, n); - if (dbin != NULL) { - driver_free_binary(dbin); + else { + bp = new_message_buffer(prt->bp->used_size); + data = copy_struct(prt->data, + prt->bp->used_size, + &hp, + &bp->off_heap); + } + finalize_imm_drv_call(&try_call_state); + if (is_immed(data)) + sz = 0; + else + sz = bp->used_size; + + hp = HAlloc(c_p, sz + 3); + if (is_not_immed(data)) { + data = copy_struct(data, bp->used_size, &hp, &MSO(c_p)); + free_message_buffer(bp); } - return make_binary(hbin); + *retvalp = TUPLE2(hp, am_ok, data); + return ERTS_PORT_OP_DONE; + } + case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: + return ERTS_PORT_OP_DROPPED; + case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS: + case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK: + /* Schedule call instead... */ + break; } + + sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = 0; + + return erts_schedule_proc2port_signal(c_p, + prt, + c_p->common.id, + retvalp, + sigdp, + 0, + port_sig_get_data); } typedef struct { @@ -2459,7 +4591,7 @@ print_port_info(Port *p, int to, void *arg) } void -set_busy_port(ErlDrvPort port_num, int on) +set_busy_port(ErlDrvPort dprt, int on) { Port *prt; erts_aint32_t flags; @@ -2470,34 +4602,38 @@ set_busy_port(ErlDrvPort port_num, int on) ERTS_SMP_CHK_NO_PROC_LOCKS; - prt = erts_drvport2port_raw(port_num); + prt = erts_drvport2port_raw(dprt); if (!prt) return; if (on) { - flags = erts_smp_atomic32_read_bor_nob(&prt->sched.flags, - ERTS_PTS_FLG_BUSY); + flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags, + ERTS_PTS_FLG_BUSY); if (flags & ERTS_PTS_FLG_BUSY) return; /* Already busy */ + + if (flags & ERTS_PTS_FLG_HAVE_NS_TASKS) + erts_port_task_abort_nosuspend_tasks(prt); + #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_busy)) { erts_snprintf(port_str, sizeof(port_str), - "%T", prt->id); + "%T", prt->common.id); DTRACE1(port_busy, port_str); } #endif } else { ErtsProcList *plp; - flags = erts_smp_atomic32_read_band_nob(&prt->sched.flags, - ~ERTS_PTS_FLG_BUSY); + flags = erts_smp_atomic32_read_band_acqb(&prt->sched.flags, + ~ERTS_PTS_FLG_BUSY); if (!(flags & ERTS_PTS_FLG_BUSY)) return; /* Already non-busy */ #ifdef USE_VM_PROBES if (DTRACE_ENABLED(port_not_busy)) { erts_snprintf(port_str, sizeof(port_str), - "%T", prt->id); + "%T", prt->common.id); DTRACE1(port_not_busy, port_str); } #endif @@ -2539,7 +4675,7 @@ set_busy_port(ErlDrvPort port_num, int on) ErtsProcList* plp2 = plp; erts_snprintf(port_str, sizeof(port_str), - "%T", erts_port[port_num]); + "%T", prt->common.id); while (plp2 != NULL) { erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); DTRACE2(process_port_unblocked, pid_str, port_str); @@ -3748,8 +5884,7 @@ ErlDrvBinary* driver_realloc_binary(ErlDrvBinary* bin, ErlDrvSizeT size) } -void driver_free_binary(dbin) -ErlDrvBinary* dbin; +void driver_free_binary(ErlDrvBinary* dbin) { Binary *bin; if (!dbin) { @@ -4705,13 +6840,13 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) } else if (eof && (state & ERTS_PORT_SFLG_SOFT_EOF)) { deliver_result(prt->common.id, ERTS_PORT_GET_CONNECTED(prt), am_eof); } else { - /* XXX UGLY WORK AROUND, Let do_exit_port terminate the port */ + /* XXX UGLY WORK AROUND, Let erts_deliver_port_exit() terminate the port */ if (prt->port_data_lock) driver_pdl_lock(prt->port_data_lock); prt->ioq.size = 0; if (prt->port_data_lock) driver_pdl_unlock(prt->port_data_lock); - erts_do_exit_port(prt, prt->common.id, eof ? am_normal : term); + erts_deliver_port_exit(prt, prt->common.id, eof ? am_normal : term, 0); } return 0; } -- cgit v1.2.3 From 43ebafb5fb40aee326b951d18c1880e6e5fdef6b Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Sun, 11 Nov 2012 21:13:44 +0100 Subject: Add driver callback epilogue --- erts/emulator/beam/io.c | 38 +++++++++++++++----------------------- 1 file changed, 15 insertions(+), 23 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 2a55a7c09f..9024e9ab52 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -680,7 +680,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ } #ifdef ERTS_SMP if (port->xports) - erts_smp_xports_unlock(port); + erts_port_handle_xports(port); ASSERT(!port->xports); #endif } @@ -826,9 +826,9 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ } #ifdef ERTS_SMP -void -erts_smp_xports_unlock(Port *prt) +int erts_port_handle_xports(Port *prt) { + int reds = 0; ErtsXPortsList *xplp; ASSERT(prt); @@ -839,16 +839,20 @@ erts_smp_xports_unlock(Port *prt) ErtsXPortsList *free_xplp; erts_aint32_t state; if (rprt->xports) - erts_smp_xports_unlock(rprt); + reds += erts_port_handle_xports(rprt); state = erts_atomic32_read_nob(&rprt->state); - if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(rprt)) + if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(rprt)) { terminate_port(rprt); + reds += ERTS_PORT_REDS_TERMINATE; + } erts_port_release(rprt); free_xplp = xplp; xplp = xplp->next; xports_list_free(free_xplp); + reds++; } prt->xports = NULL; + return reds; } #endif @@ -1234,13 +1238,9 @@ finalize_imm_drv_call(ErtsTryImmDrvCallState *sp) Port *prt = sp->port; Process *c_p = sp->c_p; - erts_unblock_fpe(sp->fpe_was_unmasked); + erts_port_driver_callback_epilogue(prt, NULL); -#ifdef ERTS_SMP - if (prt->xports) - erts_smp_xports_unlock(prt); - ASSERT(!prt->xports); -#endif + erts_unblock_fpe(sp->fpe_was_unmasked); if (IS_TRACED_FL(prt, F_TRACE_SCHED_PORTS)) trace_sched_ports_where(prt, am_out, sp->port_op); @@ -3110,7 +3110,7 @@ static void flush_port(Port *p) } #ifdef ERTS_SMP if (p->xports) - erts_smp_xports_unlock(p); + erts_port_handle_xports(p); ASSERT(!p->xports); #endif } @@ -3165,7 +3165,7 @@ terminate_port(Port *prt) erts_unblock_fpe(fpe_was_unmasked); #ifdef ERTS_SMP if (prt->xports) - erts_smp_xports_unlock(prt); + erts_port_handle_xports(prt); ASSERT(!prt->xports); #endif } @@ -4757,9 +4757,7 @@ int async_ready(Port *p, void* data) ERTS_SMP_CHK_NO_PROC_LOCKS; if (p) { - erts_aint32_t state = erts_atomic32_read_nob(&p->state); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); - ASSERT(!(state & ERTS_PORT_SFLGS_DEAD)); if (p->drv_ptr->ready_async != NULL) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(driver_ready_async)) { @@ -4769,15 +4767,9 @@ int async_ready(Port *p, void* data) #endif (*p->drv_ptr->ready_async)((ErlDrvData)p->drv_data, data); need_free = 0; -#ifdef ERTS_SMP - if (p->xports) - erts_smp_xports_unlock(p); - ASSERT(!p->xports); -#endif - } - if ((state & ERTS_PORT_SFLG_CLOSING) && is_port_ioq_empty(p)) { - terminate_port(p); + } + erts_port_driver_callback_epilogue(p, NULL); } return need_free; } -- cgit v1.2.3 From 9e4895da833b7777e69efc173f5dc777aaea3201 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Thu, 29 Nov 2012 01:24:43 +0100 Subject: Add support for busy port message queue --- erts/emulator/beam/io.c | 163 +++++++++++++++++++++++++++--------------------- 1 file changed, 93 insertions(+), 70 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 9024e9ab52..32f3a675fa 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -270,9 +270,10 @@ static Port *create_port(char *name, Eterm pid, int *enop) { + ErtsPortTaskBusyPortQ *busy_port_queue; Port *prt; char *p; - size_t port_size, size; + size_t port_size, busy_port_queue_size, size; erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; erts_aint32_t x_pts_flgs = 0; #ifdef DEBUG @@ -288,7 +289,13 @@ static Port *create_port(char *name, } else #endif - port_size = size = sizeof(Port); + port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); + + busy_port_queue_size + = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ) + ? 0 + : ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ))); + size += busy_port_queue_size; size += sys_strlen(name) + 1; @@ -302,6 +309,13 @@ static Port *create_port(char *name, prt = (Port *) p; p += port_size; + if (!busy_port_queue_size) + busy_port_queue = NULL; + else { + busy_port_queue = (ErtsPortTaskBusyPortQ *) p; + p += busy_port_queue_size; + } + #ifdef ERTS_SMP if (driver_lock) { prt->lock = driver_lock; @@ -320,6 +334,8 @@ static Port *create_port(char *name, prt->cleanup = 0; #endif + erts_port_task_pre_init_sched(&prt->sched, busy_port_queue); + prt->name = p; sys_strcpy(p, name); prt->drv_ptr = driver; @@ -503,8 +519,7 @@ erts_save_suspend_process_on_port(Port *prt, Process *process) erts_aint32_t flags; erts_port_task_sched_lock(&prt->sched); flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - saved = ((flags & (ERTS_PTS_FLG_BUSY - | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY); + saved = (flags & ERTS_PTS_FLGS_BUSY) && !(flags & ERTS_PTS_FLG_EXIT); if (saved) erts_proclist_store_last(&prt->suspended, erts_proclist_create(process)); erts_port_task_sched_unlock(&prt->sched); @@ -1530,7 +1545,7 @@ bad_port_signal(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = (flags & ~ERTS_P2P_SIG_TYPE_MASK) | ERTS_P2P_SIG_TYPE_BAD; return erts_schedule_proc2port_signal(c_p, prt, @@ -1736,7 +1751,7 @@ erts_port_output(Process *c_p, erts_driver_t *drv = prt->drv_ptr; size_t size; int try_call; - erts_aint32_t sched_flags, busy_flag, invalid_flags; + erts_aint32_t sched_flags, busy_flgs, invalid_flags; int task_flags; ErtsProc2PortSigCallback port_sig_callback; ErlDrvBinary *cbin = NULL; @@ -1747,10 +1762,10 @@ erts_port_output(Process *c_p, | ERTS_PORT_SIG_FLG_NOSUSPEND | ERTS_PORT_SIG_FLG_FORCE)) == 0); - busy_flag = ((flags & ERTS_PORT_SIG_FLG_FORCE) + busy_flgs = ((flags & ERTS_PORT_SIG_FLG_FORCE) ? ((erts_aint32_t) 0) - : ERTS_PTS_FLG_BUSY); - invalid_flags = busy_flag; + : ERTS_PTS_FLGS_BUSY); + invalid_flags = busy_flgs; if (!refp) invalid_flags |= ERTS_PTS_FLG_PARALLELISM; @@ -1759,10 +1774,10 @@ erts_port_output(Process *c_p, */ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - if (sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) - return ((sched_flags & busy_flag) - ? ERTS_PORT_OP_BUSY - : ERTS_PORT_OP_DROPPED); + if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT)) + return ((sched_flags & ERTS_PTS_FLG_EXIT) + ? ERTS_PORT_OP_DROPPED + : ERTS_PORT_OP_BUSY); try_call = !(sched_flags & (invalid_flags|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP)); @@ -1949,6 +1964,7 @@ erts_port_output(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV; sigdp->u.outputv.from = from; sigdp->u.outputv.evp = evp; sigdp->u.outputv.cbinp = cbin; @@ -2038,6 +2054,7 @@ erts_port_output(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); + sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT; sigdp->u.output.from = from; sigdp->u.output.bufp = buf; sigdp->u.output.size = size; @@ -2045,7 +2062,7 @@ erts_port_output(Process *c_p, } task_flags = ERTS_PT_FLG_WAIT_BUSY; - sigdp->flags = flags; + sigdp->flags |= flags; if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) { task_flags = 0; if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE) @@ -2070,7 +2087,7 @@ erts_port_output(Process *c_p, return res; } - if ((sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY) + if (!(sched_flags & ERTS_PTS_FLG_EXIT) && (sched_flags & busy_flgs)) return ERTS_PORT_OP_BUSY_SCHEDULED; return res; @@ -2201,7 +2218,7 @@ erts_port_exit(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = ERTS_P2P_SIG_TYPE_EXIT | flags; sigdp->u.exit.from = from; if (is_immed(reason)) { @@ -2367,7 +2384,7 @@ erts_port_connect(Process *c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = flags; + sigdp->flags = ERTS_P2P_SIG_TYPE_CONNECT | flags; sigdp->u.connect.from = from; sigdp->u.connect.connected = connect_id; @@ -2424,7 +2441,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp) } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_UNLINK; sigdp->u.unlink.from = from; return erts_schedule_proc2port_signal(c_p, @@ -2509,7 +2526,7 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_LINK; sigdp->u.link.port = prt->common.id; sigdp->u.link.to = to; @@ -3851,7 +3868,7 @@ erts_port_control(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL; sigdp->u.control.binp = binp; sigdp->u.control.command = command; sigdp->u.control.bufp = bufp; @@ -4131,7 +4148,7 @@ erts_port_call(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_CALL; sigdp->u.call.command = command; sigdp->u.call.bufp = bufp; sigdp->u.call.size = size; @@ -4298,7 +4315,7 @@ erts_port_info(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_INFO; sigdp->u.info.item = item; return erts_schedule_proc2port_signal(c_p, @@ -4383,7 +4400,7 @@ erts_port_set_data(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_SET_DATA; sigdp->u.set_data.data = set_data; sigdp->u.set_data.bp = bp; @@ -4516,7 +4533,7 @@ erts_port_get_data(Process* c_p, } sigdp = erts_port_task_alloc_p2p_sig_data(); - sigdp->flags = 0; + sigdp->flags = ERTS_P2P_SIG_TYPE_GET_DATA; return erts_schedule_proc2port_signal(c_p, prt, @@ -4608,8 +4625,8 @@ set_busy_port(ErlDrvPort dprt, int on) if (on) { flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags, - ERTS_PTS_FLG_BUSY); - if (flags & ERTS_PTS_FLG_BUSY) + ERTS_PTS_FLG_BUSY_PORT); + if (flags & ERTS_PTS_FLG_BUSY_PORT) return; /* Already busy */ if (flags & ERTS_PTS_FLG_HAVE_NS_TASKS) @@ -4623,11 +4640,9 @@ set_busy_port(ErlDrvPort dprt, int on) } #endif } else { - ErtsProcList *plp; - flags = erts_smp_atomic32_read_band_acqb(&prt->sched.flags, - ~ERTS_PTS_FLG_BUSY); - if (!(flags & ERTS_PTS_FLG_BUSY)) + ~ERTS_PTS_FLG_BUSY_PORT); + if (!(flags & ERTS_PTS_FLG_BUSY_PORT)) return; /* Already non-busy */ #ifdef USE_VM_PROBES @@ -4645,52 +4660,60 @@ set_busy_port(ErlDrvPort dprt, int on) erts_dist_port_not_busy(prt); } - /* - * Resume, in a round-robin fashion, all processes waiting on the port. - * - * This version submitted by Tony Rogvall. The earlier version used - * to resume the processes in order, which caused starvation of all but - * the first process. - */ + if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q)) + erts_port_resume_procs(prt); + } +} - erts_port_task_sched_lock(&prt->sched); - plp = prt->suspended; - prt->suspended = NULL; - erts_port_task_sched_unlock(&prt->sched); +void +erts_port_resume_procs(Port *prt) +{ + /* + * Resume, in a round-robin fashion, all processes waiting on the port. + * + * This version submitted by Tony Rogvall. The earlier version used + * to resume the processes in order, which caused starvation of all but + * the first process. + */ + ErtsProcList *plp; - if (erts_proclist_fetch(&plp, NULL)) { + erts_port_task_sched_lock(&prt->sched); + plp = prt->suspended; + prt->suspended = NULL; + erts_port_task_sched_unlock(&prt->sched); + + if (erts_proclist_fetch(&plp, NULL)) { #ifdef USE_VM_PROBES - /* - * Hrm, for blocked dist ports, plp always seems to be NULL. - * That's not so fun. - * Well, another way to get the same info is using a D - * script to correlate an earlier process-port_blocked+pid - * event with a later process-scheduled event. That's - * subject to the multi-CPU races with how events are - * handled, but hey, that way works most of the time. - */ - if (DTRACE_ENABLED(process_port_unblocked)) { - DTRACE_CHARBUF(pid_str, 16); - ErtsProcList* plp2 = plp; - - erts_snprintf(port_str, sizeof(port_str), - "%T", prt->common.id); - while (plp2 != NULL) { - erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); - DTRACE2(process_port_unblocked, pid_str, port_str); - } - } -#endif + /* + * Hrm, for blocked dist ports, plp always seems to be NULL. + * That's not so fun. + * Well, another way to get the same info is using a D + * script to correlate an earlier process-port_blocked+pid + * event with a later process-scheduled event. That's + * subject to the multi-CPU races with how events are + * handled, but hey, that way works most of the time. + */ + if (DTRACE_ENABLED(process_port_unblocked)) { + DTRACE_CHARBUF(port_str, 16); + DTRACE_CHARBUF(pid_str, 16); + ErtsProcList* plp2 = plp; - /* First proc should be resumed last */ - if (plp->next) { - plp->next->prev = NULL; - erts_resume_processes(plp->next); - plp->next = NULL; + erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); + while (plp2 != NULL) { + erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); + DTRACE2(process_port_unblocked, pid_str, port_str); } - erts_resume_processes(plp); - } + } +#endif + + /* First proc should be resumed last */ + if (plp->next) { + plp->next->prev = NULL; + erts_resume_processes(plp->next); + plp->next = NULL; + } + erts_resume_processes(plp); } } -- cgit v1.2.3