aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c281
1 files changed, 122 insertions, 159 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 9ae973e108..ccc7da265e 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -47,6 +47,8 @@
#include "external.h"
#include "dtrace-wrapper.h"
#include "erl_map.h"
+#include "erl_bif_unique.h"
+#include "erl_hl_timer.h"
extern ErlDrvEntry fd_driver_entry;
#ifndef __OSE__
@@ -378,11 +380,7 @@ static Port *create_port(char *name,
prt->dist_entry = NULL;
ERTS_PORT_INIT_CONNECTED(prt, pid);
prt->common.u.alive.reg = NULL;
-#ifdef ERTS_SMP
- prt->common.u.alive.ptimer = NULL;
-#else
- sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer));
-#endif
+ ERTS_PTMR_INIT(prt);
erts_port_task_handle_init(&prt->timeout_task);
prt->psd = NULL;
prt->drv_data = (SWord) 0;
@@ -391,7 +389,7 @@ static Port *create_port(char *name,
/* Set default tracing */
erts_get_default_tracing(&ERTS_TRACE_FLAGS(prt), &ERTS_TRACER_PROC(prt));
- ASSERT(((char *) prt) == ((char *) &prt->common));
+ ERTS_CT_ASSERT(offsetof(Port,common) == 0);
#if !ERTS_PORT_INIT_INSTR_NEED_ID
/*
@@ -462,11 +460,7 @@ erts_port_free(Port *prt)
| ERTS_PORT_SFLG_FREE));
ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
-#ifdef ERTS_SMP
- ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0);
-#else
- ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0);
-#endif
+ ERTS_LC_ASSERT(erts_atomic_read_nob(&prt->common.refc.atmc) == 0);
erts_port_task_fini_sched(&prt->sched);
@@ -735,11 +729,7 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
/*
* Must clean up the port.
*/
-#ifdef ERTS_SMP
- erts_cancel_smp_ptimer(port->common.u.alive.ptimer);
-#else
- erts_cancel_timer(&(port->common.u.alive.tm));
-#endif
+ erts_cancel_port_timer(port);
stopq(port);
if (port->linebuf != NULL) {
erts_free(ERTS_ALC_T_LINEBUF,
@@ -1429,15 +1419,7 @@ queue_port_sched_op_reply(Process *rp,
bp = erts_resize_message_buffer(bp, used_h_size, &msg, 1);
}
- erts_queue_message(rp,
- rp_locksp,
- bp,
- msg,
- NIL
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, rp_locksp, bp, msg, NIL);
}
static void
@@ -2805,7 +2787,8 @@ void erts_init_io(int port_tab_size,
port_tab_size,
common_element_size, /* Doesn't need to be excact */
"port_table",
- legacy_port_tab);
+ legacy_port_tab,
+ 1);
erts_smp_atomic_init_nob(&erts_bytes_out, 0);
erts_smp_atomic_init_nob(&erts_bytes_in, 0);
@@ -3072,7 +3055,7 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
rp = (scheduler
? erts_proc_lookup(pid)
- : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_INC_REFC));
if (rp) {
Eterm tuple;
@@ -3085,16 +3068,12 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, bp, tuple, NIL
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tuple, NIL);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
}
@@ -3138,7 +3117,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
rp = (scheduler
? erts_proc_lookup(to)
- : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp)
return;
@@ -3153,8 +3132,6 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
Binary* bptr;
bptr = erts_bin_nrml_alloc(len);
- bptr->flags = 0;
- bptr->orig_size = len;
erts_refc_init(&bptr->refc, 1);
sys_memcpy(bptr->orig_bytes, buf, len);
@@ -3187,15 +3164,11 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
/*
@@ -3281,7 +3254,7 @@ deliver_vec_message(Port* prt, /* Port */
rp = (scheduler
? erts_proc_lookup(to)
- : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp)
return;
@@ -3358,14 +3331,10 @@ deliver_vec_message(Port* prt, /* Port */
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
@@ -3455,11 +3424,8 @@ terminate_port(Port *prt)
send_closed_port_id = NIL;
}
-#ifdef ERTS_SMP
- erts_cancel_smp_ptimer(prt->common.u.alive.ptimer);
-#else
- erts_cancel_timer(&prt->common.u.alive.tm);
-#endif
+ if (ERTS_PTMR_IS_SET(prt))
+ erts_cancel_port_timer(prt);
drv = prt->drv_ptr;
if ((drv != NULL) && (drv->stop != NULL)) {
@@ -4087,6 +4053,9 @@ erts_port_control(Process* c_p,
size,
&resp_bufp,
&resp_size);
+
+ control_flags = prt->control_flags;
+
finalize_imm_drv_call(&try_call_state);
if (tmp_alloced)
erts_free(ERTS_ALC_T_TMP, bufp);
@@ -4094,8 +4063,6 @@ erts_port_control(Process* c_p,
return ERTS_PORT_OP_BADARG;
}
- control_flags = prt->control_flags;
-
hsz = port_control_result_size(control_flags,
resp_bufp,
&resp_size,
@@ -4484,7 +4451,7 @@ make_port_info_term(Eterm **hpp_start,
int len;
int start;
static Eterm item[] = ERTS_PORT_INFO_1_ITEMS;
- static Eterm value[sizeof(item)/sizeof(item[0])];
+ Eterm value[sizeof(item)/sizeof(item[0])];
start = 0;
len = sizeof(item)/sizeof(item[0]);
@@ -5005,24 +4972,6 @@ erts_free_port_names(ErtsPortNames *pnp)
erts_free(ERTS_ALC_T_PORT_NAMES, pnp);
}
-static void schedule_port_timeout(Port *p)
-{
- /*
- * Scheduling of port timeouts can be done without port locking, but
- * since the task handle is stored in the port structure and the ptimer
- * structure is protected by the port lock we require the port to be
- * locked for now...
- *
- * TODO: Implement scheduling of port timeouts without locking
- * the port.
- * /Rickard
- */
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
- erts_port_task_schedule(p->common.id,
- &p->timeout_task,
- ERTS_PORT_TASK_TIMEOUT);
-}
-
ErlDrvTermData driver_mk_term_nil(void)
{
return driver_term_nil;
@@ -5051,7 +5000,7 @@ void driver_report_exit(ErlDrvPort ix, int status)
rp = (scheduler
? erts_proc_lookup(pid)
- : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp)
return;
@@ -5061,15 +5010,11 @@ void driver_report_exit(ErlDrvPort ix, int status)
hp += 3;
tuple = TUPLE2(hp, prt->common.id, tuple);
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
#define ERTS_B2T_STATES_DEF_STATES_SZ 5
@@ -5351,7 +5296,11 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
case ERL_DRV_MAP: { /* int */
ERTS_DDT_CHK_ENOUGH_ARGS(1);
if ((int) ptr[0] < 0) ERTS_DDT_FAIL;
- need += MAP_HEADER_SIZE + 1 + 2*ptr[0];
+ if (ptr[0] > MAP_SMALL_MAP_LIMIT) {
+ need += hashmap_over_estimated_heap_size(ptr[0]);
+ } else {
+ need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0];
+ }
depth -= 2*ptr[0];
if (depth < 0) ERTS_DDT_FAIL;
ptr++;
@@ -5381,7 +5330,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
scheduler = erts_get_scheduler_id() != 0;
rp = (scheduler
? erts_proc_lookup(to)
- : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC));
+ : erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_INC_REFC));
if (!rp) {
res = 0;
goto done;
@@ -5506,8 +5455,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
ProcBin* pbp;
Binary* bp = erts_bin_nrml_alloc(size);
ASSERT(bufp);
- bp->flags = 0;
- bp->orig_size = (SWord) size;
erts_refc_init(&bp->refc, 1);
sys_memcpy((void *) bp->orig_bytes, (void *) bufp, size);
pbp = (ProcBin *) hp;
@@ -5581,7 +5528,9 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
mess = make_float(hp);
f.fd = *((double *) ptr[0]);
- PUT_DOUBLE(f, hp);
+ if (!erts_isfinite(f.fd))
+ ERTS_DDT_FAIL;
+ PUT_DOUBLE(f, hp);
hp += FLOAT_SIZE_OBJECT;
ptr++;
break;
@@ -5597,31 +5546,52 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
case ERL_DRV_MAP: { /* int */
int size = (int)ptr[0];
- Eterm* tp = hp;
- Eterm* vp;
- map_t *mp;
-
- *tp = make_arityval(size);
-
- hp += 1 + size;
- mp = (map_t*)hp;
- mp->thing_word = MAP_HEADER;
- mp->size = size;
- mp->keys = make_tuple(tp);
- mess = make_map(mp);
-
- hp += MAP_HEADER_SIZE + size; /* advance "heap" pointer */
-
- tp += size; /* point at last key */
- vp = hp - 1; /* point at last value */
-
- while(size--) {
- *vp-- = ESTACK_POP(stack);
- *tp-- = ESTACK_POP(stack);
- }
- if (!erts_validate_and_sort_map(mp))
- ERTS_DDT_FAIL;
- ptr++;
+ if (size > MAP_SMALL_MAP_LIMIT) {
+ int ix = 2*size;
+ ErtsHeapFactory factory;
+ Eterm* leafs = hp;
+
+ hp += 2*size;
+ while(ix--) { *--hp = ESTACK_POP(stack); }
+
+ hp += 2*size;
+ factory.p = NULL;
+ factory.hp = hp;
+ /* We assume heap will suffice (see hashmap_over_estimated_heap_size) */
+
+ mess = erts_hashmap_from_array(&factory, leafs, size, 1);
+
+ if (is_non_value(mess))
+ ERTS_DDT_FAIL;
+
+ hp = factory.hp;
+ } else {
+ Eterm* tp = hp;
+ Eterm* vp;
+ flatmap_t *mp;
+
+ *tp = make_arityval(size);
+
+ hp += 1 + size;
+ mp = (flatmap_t*)hp;
+ mp->thing_word = MAP_HEADER_FLATMAP;
+ mp->size = size;
+ mp->keys = make_tuple(tp);
+ mess = make_flatmap(mp);
+
+ hp += MAP_HEADER_FLATMAP_SZ + size;
+
+ tp += size; /* point at last key */
+ vp = hp - 1; /* point at last value */
+
+ while(size--) {
+ *vp-- = ESTACK_POP(stack);
+ *tp-- = ESTACK_POP(stack);
+ }
+ if (!erts_validate_and_sort_flatmap(mp))
+ ERTS_DDT_FAIL;
+ }
+ ptr++;
break;
}
@@ -5642,11 +5612,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
HRelease(rp, hp_end, hp);
}
/* send message */
- erts_queue_message(rp, &rp_locks, bp, mess, am_undefined
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, mess, am_undefined);
}
else {
if (b2t.ix > b2t.used)
@@ -5659,14 +5625,12 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
HRelease(rp, hp_end, hp);
}
}
-#ifdef ERTS_SMP
if (rp) {
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
- erts_smp_proc_dec_refc(rp);
+ erts_proc_dec_refc(rp);
}
-#endif
cleanup_b2t_states(&b2t);
DESTROY_ESTACK(stack);
return res;
@@ -5999,9 +5963,7 @@ driver_alloc_binary(ErlDrvSizeT size)
bin = erts_bin_drv_alloc_fnf((Uint) size);
if (!bin)
return NULL; /* The driver write must take action */
- bin->flags = BIN_FLAG_DRV;
erts_refc_init(&bin->refc, 1);
- bin->orig_size = (SWord) size;
return Binary2ErlDrvBinary(bin);
}
@@ -6031,7 +5993,6 @@ ErlDrvBinary* driver_realloc_binary(ErlDrvBinary* bin, ErlDrvSizeT size)
if (!newbin)
return NULL;
- newbin->orig_size = size;
return Binary2ErlDrvBinary(newbin);
}
@@ -6615,18 +6576,6 @@ int driver_pushq(ErlDrvPort ix, char* buffer, ErlDrvSizeT len)
return code;
}
-static ERTS_INLINE void
-drv_cancel_timer(Port *prt)
-{
-#ifdef ERTS_SMP
- erts_cancel_smp_ptimer(prt->common.u.alive.ptimer);
-#else
- erts_cancel_timer(&prt->common.u.alive.tm);
-#endif
- if (erts_port_task_is_scheduled(&prt->timeout_task))
- erts_port_task_abort(&prt->timeout_task);
-}
-
int driver_set_timer(ErlDrvPort ix, unsigned long t)
{
Port* prt = erts_drvport2port(ix);
@@ -6638,19 +6587,8 @@ int driver_set_timer(ErlDrvPort ix, unsigned long t)
if (prt->drv_ptr->timeout == NULL)
return -1;
- drv_cancel_timer(prt);
-#ifdef ERTS_SMP
- erts_create_smp_ptimer(&prt->common.u.alive.ptimer,
- prt->common.id,
- (ErlTimeoutProc) schedule_port_timeout,
- t);
-#else
- erts_set_timer(&prt->common.u.alive.tm,
- (ErlTimeoutProc) schedule_port_timeout,
- NULL,
- prt,
- t);
-#endif
+
+ erts_set_port_timer(prt, (Sint64) t);
return 0;
}
@@ -6660,28 +6598,28 @@ int driver_cancel_timer(ErlDrvPort ix)
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- drv_cancel_timer(prt);
+ erts_cancel_port_timer(prt);
return 0;
}
-
int
driver_read_timer(ErlDrvPort ix, unsigned long* t)
{
Port* prt = erts_drvport2port(ix);
+ Sint64 left;
ERTS_SMP_CHK_NO_PROC_LOCKS;
if (prt == ERTS_INVALID_ERL_DRV_PORT)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-#ifdef ERTS_SMP
- *t = (prt->common.u.alive.ptimer
- ? erts_time_left(&prt->common.u.alive.ptimer->timer.tm)
- : 0);
-#else
- *t = erts_time_left(&prt->common.u.alive.tm);
-#endif
+
+ left = erts_read_port_timer(prt);
+ if (left < 0)
+ left = 0;
+
+ *t = (unsigned long) left;
+
return 0;
}
@@ -6705,7 +6643,7 @@ static void ref_to_driver_monitor(Eterm ref, ErlDrvMonitor *mon)
{
RefThing *refp;
ASSERT(is_internal_ref(ref));
- ASSERT(sizeof(RefThing) <= sizeof(ErlDrvMonitor));
+ ERTS_CT_ASSERT(sizeof(RefThing) <= sizeof(ErlDrvMonitor));
refp = ref_thing_ptr(ref);
memset(mon,0,sizeof(ErlDrvMonitor));
memcpy(mon,refp,sizeof(RefThing));
@@ -7349,6 +7287,8 @@ no_stop_select_callback(ErlDrvEvent event, void* private)
erts_send_error_to_logger_nogl(dsbufp);
}
+#define IS_DRIVER_VERSION_GE(DE,MAJOR,MINOR) \
+ ((DE)->major_version >= (MAJOR) && (DE)->minor_version >= (MINOR))
static int
init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
@@ -7396,6 +7336,7 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
drv->timeout = de->timeout ? de->timeout : no_timeout_callback;
drv->ready_async = de->ready_async;
drv->process_exit = de->process_exit;
+ drv->emergency_close = IS_DRIVER_VERSION_GE(de,3,2) ? de->emergency_close : NULL;
if (de->stop_select)
drv->stop_select = de->stop_select;
else
@@ -7414,6 +7355,8 @@ init_driver(erts_driver_t *drv, ErlDrvEntry *de, DE_Handle *handle)
}
}
+#undef IS_DRIVER_VERSION_GE
+
void
erts_destroy_driver(erts_driver_t *drv)
{
@@ -7557,7 +7500,7 @@ Port *erts_get_heart_port(void)
if (!port)
continue;
/* only examine undead or alive ports */
- if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_DEAD)
+ if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
continue;
/* immediate atom compare */
reg = port->common.u.alive.reg;
@@ -7568,3 +7511,23 @@ Port *erts_get_heart_port(void)
return NULL;
}
+
+void erts_emergency_close_ports(void)
+{
+ int ix, max = erts_ptab_max(&erts_port);
+
+ for (ix = 0; ix < max; ix++) {
+ Port *port = erts_pix2port(ix);
+
+ if (!port)
+ continue;
+ /* only examine undead or alive ports */
+ if (erts_atomic32_read_nob(&port->state) & ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP)
+ continue;
+
+ /* emergency close socket */
+ if (port->drv_ptr->emergency_close) {
+ port->drv_ptr->emergency_close((ErlDrvData) port->drv_data);
+ }
+ }
+}