diff options
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 190 |
1 files changed, 140 insertions, 50 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 85013af3ad..e4a5f2b6b6 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -298,7 +298,6 @@ static Port *create_port(char *name, erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; erts_aint32_t x_pts_flgs = 0; - ErtsRunQueue *runq; if (!driver_lock) { /* Align size for mutex following port struct */ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); @@ -347,11 +346,16 @@ static Port *create_port(char *name, p += sizeof(erts_mtx_t); state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } - if (erts_get_scheduler_data()) - runq = erts_get_runq_current(NULL); - else - runq = ERTS_RUNQ_IX(0); - erts_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); + + { + ErtsRunQueue *runq; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + if (esdp) + runq = erts_get_runq_current(esdp); + else + runq = ERTS_RUNQ_IX(0); + erts_init_runq_port(prt, runq); + } prt->xports = NULL; @@ -584,7 +588,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ */ for (d = driver_list; d; d = d->next) { - if (strcmp(d->name, name) == 0 && + if (sys_strcmp(d->name, name) == 0 && erts_ddll_driver_ok(d->handle)) { driver = d; break; @@ -634,7 +638,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ trace_port_open(port, pid, erts_atom_put((byte *) port->name, - strlen(port->name), + sys_strlen(port->name), ERTS_ATOM_ENC_LATIN1, 1)); } @@ -2925,7 +2929,7 @@ erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) { break; case -2: { char *str = erl_errno_id(errno); - resp = erts_atom_put((byte *) str, strlen(str), + resp = erts_atom_put((byte *) str, sys_strlen(str), ERTS_ATOM_ENC_LATIN1, 1); break; } @@ -3384,24 +3388,11 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) { listp = buf_to_intlist(&hp, buf, len, listp); } else if (buf != NULL) { - ProcBin* pb; - Binary* bptr; - - bptr = erts_bin_nrml_alloc(len); + Binary* bptr = erts_bin_nrml_alloc(len); sys_memcpy(bptr->orig_bytes, buf, len); - pb = (ProcBin *) hp; - pb->thing_word = HEADER_PROC_BIN; - pb->size = len; - pb->next = ohp->first; - ohp->first = (struct erl_off_heap_header*)pb; - pb->val = bptr; - pb->bytes = (byte*) bptr->orig_bytes; - pb->flags = 0; + listp = erts_build_proc_bin(ohp, hp, bptr); hp += PROC_BIN_SIZE; - - OH_OVERHEAD(ohp, pb->size / sizeof(Eterm)); - listp = make_binary(pb); } /* Prepend the header */ @@ -3829,11 +3820,12 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) ErtsDistLinkData dld; ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, port_id, lnk->pid, dep); erts_destroy_dist_link(&dld); @@ -4203,17 +4195,9 @@ write_port_control_result(int control_flags, else { dbin = (ErlDrvBinary *) resp_bufp; if (dbin->orig_size > ERL_ONHEAP_BIN_LIMIT) { - ProcBin* pb = (ProcBin *) *hpp; + res = erts_build_proc_bin(ohp, *hpp, ErlDrvBinary2Binary(dbin)); *hpp += PROC_BIN_SIZE; - pb->thing_word = HEADER_PROC_BIN; - pb->size = dbin->orig_size; - pb->next = ohp->first; - ohp->first = (struct erl_off_heap_header *) pb; - pb->val = ErlDrvBinary2Binary(dbin); - pb->bytes = (byte*) dbin->orig_bytes; - pb->flags = 0; - OH_OVERHEAD(ohp, dbin->orig_size / sizeof(Eterm)); - return make_binary(pb); + return res; } resp_bufp = dbin->orig_bytes; resp_size = dbin->orig_size; @@ -5095,6 +5079,93 @@ static void prt_one_lnk(ErtsLink *lnk, void *vprtd) erts_print(prtd->to, prtd->arg, "%T", lnk->pid); } +static void dump_port_state(fmtfn_t to, void *arg, erts_aint32_t state) +{ + erts_aint32_t rest; + int unknown = 0; + char delim = ' '; + + erts_print(to, arg, "State:"); + + rest = state; + while (rest) { + erts_aint32_t chk = (rest ^ (rest-1)) & rest; /* lowest set bit */ + char* s; + + rest &= ~chk; + switch (chk) { + case ERTS_PORT_SFLG_CONNECTED: s = "CONNECTED"; break; + case ERTS_PORT_SFLG_EXITING: s = "EXITING"; break; + case ERTS_PORT_SFLG_DISTRIBUTION: s = "DISTR"; break; + case ERTS_PORT_SFLG_BINARY_IO: s = "BINARY_IO"; break; + case ERTS_PORT_SFLG_SOFT_EOF: s = "SOFT_EOF"; break; + case ERTS_PORT_SFLG_CLOSING: s = "CLOSING"; break; + case ERTS_PORT_SFLG_SEND_CLOSED: s = "SEND_CLOSED"; break; + case ERTS_PORT_SFLG_LINEBUF_IO: s = "LINEBUF_IO"; break; + case ERTS_PORT_SFLG_FREE: s = "FREE"; break; + case ERTS_PORT_SFLG_INITIALIZING: s = "INITIALIZING"; break; + case ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK: s = "PORT_LOCK"; break; + case ERTS_PORT_SFLG_INVALID: s = "INVALID"; break; + case ERTS_PORT_SFLG_HALT: s = "HALT"; break; +#ifdef DEBUG + case ERTS_PORT_SFLG_PORT_DEBUG: s = "DEBUG"; break; +#endif + default: + unknown = 1; + continue; + } + erts_print(to, arg, "%c%s", delim, s); + delim = '|'; + } + if (unknown || !state) + erts_print(to, arg, "%c0x%x\n", delim, state); + else + erts_print(to, arg, "\n"); +} + +static void dump_port_task_flags(fmtfn_t to, void *arg, Port* p) +{ + erts_aint32_t flags = erts_atomic32_read_nob(&p->sched.flags); + erts_aint32_t unknown = 0; + char delim = ' '; + + if (!flags) + return; + + erts_print(to, arg, "Task Flags:"); + + while (flags) { + erts_aint32_t chk = (flags ^ (flags-1)) & flags; /* lowest set bit */ + char* s; + + flags &= ~chk; + switch (chk) { + case ERTS_PTS_FLG_IN_RUNQ: s = "IN_RUNQ"; break; + case ERTS_PTS_FLG_EXEC: s = "EXEC"; break; + case ERTS_PTS_FLG_HAVE_TASKS: s = "HAVE_TASKS"; break; + case ERTS_PTS_FLG_EXIT: s = "EXIT"; break; + case ERTS_PTS_FLG_BUSY_PORT: s = "BUSY_PORT"; break; + case ERTS_PTS_FLG_BUSY_PORT_Q: s = "BUSY_Q"; break; + case ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q: s = "CHK_UNSET_BUSY_Q"; break; + case ERTS_PTS_FLG_HAVE_BUSY_TASKS: s = "BUSY_TASKS"; break; + case ERTS_PTS_FLG_HAVE_NS_TASKS: s = "NS_TASKS"; break; + case ERTS_PTS_FLG_PARALLELISM: s = "PARALLELISM"; break; + case ERTS_PTS_FLG_FORCE_SCHED: s = "FORCE_SCHED"; break; + case ERTS_PTS_FLG_EXITING: s = "EXITING"; break; + case ERTS_PTS_FLG_EXEC_IMM: s = "EXEC_IMM"; break; + default: + unknown |= chk; + continue; + } + erts_print(to, arg, "%c%s", delim, s); + delim = '|'; + } + if (unknown) + erts_print(to, arg, "%cUNKNOWN(0x%x)\n", delim, unknown); + else + erts_print(to, arg, "\n"); +} + void print_port_info(Port *p, fmtfn_t to, void *arg) { @@ -5104,6 +5175,8 @@ print_port_info(Port *p, fmtfn_t to, void *arg) return; erts_print(to, arg, "=port:%T\n", p->common.id); + dump_port_state(to, arg, state); + dump_port_task_flags(to, arg, p); erts_print(to, arg, "Slot: %d\n", internal_port_index(p->common.id)); if (state & ERTS_PORT_SFLG_CONNECTED) { erts_print(to, arg, "Connected: %T", ERTS_PORT_GET_CONNECTED(p)); @@ -5126,6 +5199,10 @@ print_port_info(Port *p, fmtfn_t to, void *arg) erts_doforall_monitors(ERTS_P_MONITORS(p), &prt_one_monitor, &prtd); erts_print(to, arg, "\n"); } + if (p->suspended) { + erts_print(to, arg, "Suspended: "); + erts_proclist_dump(to, arg, p->suspended); + } if (p->common.u.alive.reg != NULL) erts_print(to, arg, "Registered as: %T\n", p->common.u.alive.reg->name); @@ -5143,6 +5220,14 @@ print_port_info(Port *p, fmtfn_t to, void *arg) } else { erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name); } + erts_print(to, arg, "Input: %beu\n", p->bytes_in); + erts_print(to, arg, "Output: %beu\n", p->bytes_out); + erts_print(to, arg, "Queue: %beu\n", erts_ioq_size(&p->ioq)); + { + Eterm port_data = erts_port_data_read(p); + if (port_data != am_undefined) + erts_print(to, arg, "Port Data: %T\n", port_data); + } } void @@ -5977,21 +6062,12 @@ driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len) mess = make_binary(hbp); } else { - ProcBin* pbp; + Eterm* hp; Binary* bp = erts_bin_nrml_alloc(size); ASSERT(bufp); sys_memcpy((void *) bp->orig_bytes, (void *) bufp, size); - pbp = (ProcBin *) erts_produce_heap(&factory, - PROC_BIN_SIZE, HEAP_EXTRA); - pbp->thing_word = HEADER_PROC_BIN; - pbp->size = size; - pbp->next = factory.off_heap->first; - factory.off_heap->first = (struct erl_off_heap_header*)pbp; - pbp->val = bp; - pbp->bytes = (byte*) bp->orig_bytes; - pbp->flags = 0; - OH_OVERHEAD(factory.off_heap, pbp->size / sizeof(Eterm)); - mess = make_binary(pbp); + hp = erts_produce_heap(&factory, PROC_BIN_SIZE, HEAP_EXTRA); + mess = erts_build_proc_bin(factory.off_heap, hp, bp); } ptr += 2; break; @@ -7248,7 +7324,7 @@ int driver_failure_atom(ErlDrvPort ix, char* string) { return driver_failure_term(ix, erts_atom_put((byte *) string, - strlen(string), + sys_strlen(string), ERTS_ATOM_ENC_LATIN1, 1), 0); @@ -7714,13 +7790,27 @@ int null_func(void) int erl_drv_putenv(const char *key, char *value) { - return erts_sys_putenv_raw((char*)key, value); + switch (erts_sys_explicit_8bit_putenv((char*)key, value)) { + case -1: /* Insufficient buffer space */ + return 1; + case 1: /* Success */ + return 0; + default: /* Not found */ + return -1; + } } int erl_drv_getenv(const char *key, char *value, size_t *value_size) { - return erts_sys_getenv_raw((char*)key, value, value_size); + switch (erts_sys_explicit_8bit_getenv((char*)key, value, value_size)) { + case -1: /* Insufficient buffer space */ + return 1; + case 1: /* Success */ + return 0; + default: /* Not found */ + return -1; + } } /* get heart_port |