diff options
Diffstat (limited to 'erts/emulator/beam/erl_nif.c')
-rw-r--r-- | erts/emulator/beam/erl_nif.c | 512 |
1 files changed, 236 insertions, 276 deletions
diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index b96db5d054..d2000aa71e 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -138,7 +138,7 @@ execution_state(ErlNifEnv *env, Process **c_pp, int *schedp) Process *c_p = env->proc; if (!(c_p->static_flags & ERTS_STC_FLG_SHADOW_PROC)) { - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); } else { @@ -220,7 +220,7 @@ void erts_pre_nif(ErlNifEnv* env, Process* p, struct erl_module_nif* mod_nif, ASSERT(esdp); if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { - erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); + erts_aint32_t state = erts_atomic32_read_nob(&p->state); ASSERT(p->scheduler_data == esdp); ASSERT((state & (ERTS_PSFLG_RUNNING @@ -237,9 +237,11 @@ static void cache_env(ErlNifEnv* env); static void full_flush_env(ErlNifEnv *env); static void flush_env(ErlNifEnv* env); -/* Temporary object header, auto-deallocated when NIF returns - * or when independent environment is cleared. - */ +/* Temporary object header, auto-deallocated when NIF returns or when + * independent environment is cleared. + * + * The payload can be accessed with &tmp_obj_ptr[1] but keep in mind that its + * first element must not require greater alignment than `next`. */ struct enif_tmp_obj_t { struct enif_tmp_obj_t* next; void (*dtor)(struct enif_tmp_obj_t*); @@ -256,6 +258,46 @@ static ERTS_INLINE void free_tmp_objs(ErlNifEnv* env) } } +/* Whether the given environment is bound to a process and will be cleaned up + * when the NIF returns. It's safe to use temp_alloc for objects in + * env->tmp_obj_list when this is true. */ +static ERTS_INLINE int is_proc_bound(ErlNifEnv *env) +{ + return env->mod_nif != NULL; +} + +/* Allocates and attaches an object to the given environment, running its + * destructor when the environment is cleared. To avoid temporary variables the + * address of the allocated object is returned instead of the enif_tmp_obj_t. + * + * The destructor *must* call `erts_free(tmp_obj->allocator, tmp_obj)` to free + * the object. If the destructor needs to refer to the allocated object its + * address will be &tmp_obj[1]. */ +static ERTS_INLINE void *alloc_tmp_obj(ErlNifEnv *env, size_t size, + void (*dtor)(struct enif_tmp_obj_t*)) { + struct enif_tmp_obj_t *tmp_obj; + ErtsAlcType_t allocator; + + allocator = is_proc_bound(env) ? ERTS_ALC_T_TMP : ERTS_ALC_T_NIF; + + tmp_obj = erts_alloc(allocator, sizeof(struct enif_tmp_obj_t) + MAX(1, size)); + + tmp_obj->next = env->tmp_obj_list; + tmp_obj->allocator = allocator; + tmp_obj->dtor = dtor; + + env->tmp_obj_list = tmp_obj; + + return (void*)&tmp_obj[1]; +} + +/* Generic destructor for objects allocated through alloc_tmp_obj that don't + * care about their payload. */ +static void tmp_alloc_dtor(struct enif_tmp_obj_t *tmp_obj) +{ + erts_free(tmp_obj->allocator, tmp_obj); +} + void erts_post_nif(ErlNifEnv* env) { erts_unblock_fpe(env->fpe_was_unmasked); @@ -287,12 +329,12 @@ schedule(ErlNifEnv* env, NativeFunPtr direct_fp, NativeFunPtr indirect_fp, else dirty_shadow_proc = env->proc; - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN & erts_proc_lc_my_proc_locks(c_p)); + ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN & erts_proc_lc_my_proc_locks(c_p)); ep = erts_nif_export_schedule(c_p, dirty_shadow_proc, c_p->current, c_p->cp, - (BeamInstr) em_call_nif, + BeamOpCodeAddr(op_call_nif), direct_fp, indirect_fp, mod, func_name, argc, (const Eterm *) argv); @@ -304,7 +346,6 @@ schedule(ErlNifEnv* env, NativeFunPtr direct_fp, NativeFunPtr indirect_fp, return (ERL_NIF_TERM) THE_NON_VALUE; } -#ifdef ERTS_DIRTY_SCHEDULERS static ERL_NIF_TERM dirty_nif_finalizer(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); static ERL_NIF_TERM dirty_nif_exception(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); @@ -320,7 +361,7 @@ erts_call_dirty_nif(ErtsSchedulerData *esdp, Process *c_p, BeamInstr *I, Eterm * ErlNifEnv env; ERL_NIF_TERM result; #ifdef DEBUG - erts_aint32_t state = erts_smp_atomic32_read_nob(&c_p->state); + erts_aint32_t state = erts_atomic32_read_nob(&c_p->state); ASSERT(nep == ERTS_PROC_GET_NIF_TRAP_EXPORT(c_p)); @@ -343,14 +384,14 @@ erts_call_dirty_nif(ErtsSchedulerData *esdp, Process *c_p, BeamInstr *I, Eterm * ASSERT(ERTS_SCHEDULER_IS_DIRTY(erts_proc_sched_data(c_p))); - erts_smp_atomic32_read_band_mb(&c_p->state, ~(ERTS_PSFLG_DIRTY_CPU_PROC + erts_atomic32_read_band_mb(&c_p->state, ~(ERTS_PSFLG_DIRTY_CPU_PROC | ERTS_PSFLG_DIRTY_IO_PROC)); - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); result = (*dirty_nif)(&env, codemfa->arity, argv); /* Call dirty NIF */ - erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); + erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); ASSERT(env.proc->static_flags & ERTS_STC_FLG_SHADOW_PROC); ASSERT(env.proc->next == c_p); @@ -394,24 +435,19 @@ erts_call_dirty_nif(ErtsSchedulerData *esdp, Process *c_p, BeamInstr *I, Eterm * return exiting; } -#endif static void full_flush_env(ErlNifEnv* env) { flush_env(env); -#ifdef ERTS_DIRTY_SCHEDULERS if (env->proc->static_flags & ERTS_STC_FLG_SHADOW_PROC) /* Dirty nif call using shadow process struct */ erts_flush_dirty_shadow_proc(env->proc); -#endif } static void full_cache_env(ErlNifEnv* env) { -#ifdef ERTS_DIRTY_SCHEDULERS if (env->proc->static_flags & ERTS_STC_FLG_SHADOW_PROC) erts_cache_dirty_shadow_proc(env->proc); -#endif cache_env(env); } @@ -452,6 +488,7 @@ static void cache_env(ErlNifEnv* env) env->hp_end = env->heap_frag->mem + env->heap_frag->alloc_size; } } + void* enif_priv_data(ErlNifEnv* env) { return env->mod_nif->priv_data; @@ -566,10 +603,9 @@ void enif_clear_env(ErlNifEnv* env) ASSERT(!is_offheap(&MSO(p))); } -#ifdef ERTS_SMP #ifdef DEBUG static int enif_send_delay = 0; -#define ERTS_FORCE_ENIF_SEND_DELAY() (enif_send_delay++ % 2 == 0) +#define ERTS_FORCE_ENIF_SEND_DELAY() (enif_send_delay++ % 32 == 0) #else #ifdef ERTS_PROC_LOCK_OWN_IMPL #define ERTS_FORCE_ENIF_SEND_DELAY() 0 @@ -592,9 +628,9 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks) /* Only one thread at a time is allowed to flush trace messages, so we require the main lock to be held when doing the flush */ - ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(c_p); + ERTS_CHK_HAVE_ONLY_MAIN_PROC_LOCK(c_p); - erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_TRACE); + erts_proc_lock(c_p, ERTS_PROC_LOCK_TRACE); msgq = c_p->trace_msg_q; @@ -613,7 +649,7 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks) msgq->first = NULL; msgq->last = &msgq->first; msgq->len = 0; - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_TRACE); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_TRACE); ASSERT(len != 0); @@ -626,13 +662,13 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks) if (rp->common.id == c_p->common.id) rp_locks &= ~c_p_locks; if (rp_locks) - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); reds += len; } else { erts_cleanup_messages(first); } reds += 1; - erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_TRACE); + erts_proc_lock(c_p, ERTS_PROC_LOCK_TRACE); msgq = msgq->next; } while (msgq); @@ -649,21 +685,18 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks) } error: - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_TRACE); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_TRACE); return reds; } -#endif int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, ErlNifEnv* msg_env, ERL_NIF_TERM msg) { struct enif_msg_environment_t* menv = (struct enif_msg_environment_t*)msg_env; ErtsProcLocks rp_locks = 0; -#ifdef ERTS_SMP ErtsProcLocks lc_locks = 0; -#endif Process* rp; Process* c_p; ErtsMessage *mp; @@ -672,13 +705,6 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, execution_state(env, &c_p, &scheduler); -#ifndef ERTS_SMP - if (!scheduler) { - erts_exit(ERTS_ABORT_EXIT, - "enif_send: called from non-scheduler thread on non-SMP VM"); - return 0; - } -#endif if (scheduler > 0) { /* Normal scheduler */ rp = erts_proc_lookup(receiver); @@ -692,7 +718,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, return 0; if (env->proc->static_flags & ERTS_STC_FLG_SHADOW_PROC) { - erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); + erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); } } @@ -701,7 +727,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, ERTS_P2P_FLG_INC_REFC); if (!rp) { if (c_p && (env->proc->static_flags & ERTS_STC_FLG_SHADOW_PROC)) - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); return 0; } } @@ -736,7 +762,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, full_cache_env(env); } else { - erts_aint_t state = erts_smp_atomic32_read_nob(&rp->state); + erts_aint_t state = erts_atomic32_read_nob(&rp->state); if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) { mp = erts_alloc_message(sz, &hp); ohp = sz == 0 ? NULL : &mp->hfrag.off_heap; @@ -762,7 +788,6 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, full_cache_env(env); } } -#ifdef ERTS_SMP else { /* This clause is taken when the nif is called in the context of a traced process. We do not know which locks we have @@ -773,7 +798,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, Process *t_p = env->tracee; - erts_smp_proc_lock(t_p, ERTS_PROC_LOCK_TRACE); + erts_proc_lock(t_p, ERTS_PROC_LOCK_TRACE); msgq = t_p->trace_msg_q; @@ -790,7 +815,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, #endif if (ERTS_FORCE_ENIF_SEND_DELAY() || msgq || rp_locks & ERTS_PROC_LOCK_MSGQ || - erts_smp_proc_trylock(rp, ERTS_PROC_LOCK_MSGQ) == EBUSY) { + erts_proc_trylock(rp, ERTS_PROC_LOCK_MSGQ) == EBUSY) { if (!msgq) { msgq = erts_alloc(ERTS_ALC_T_TRACE_MSG_QUEUE, @@ -804,36 +829,33 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, msgq->next = t_p->trace_msg_q; t_p->trace_msg_q = msgq; - erts_smp_proc_unlock(t_p, ERTS_PROC_LOCK_TRACE); + erts_proc_unlock(t_p, ERTS_PROC_LOCK_TRACE); erts_schedule_flush_trace_messages(t_p, 0); } else { msgq->len++; *msgq->last = mp; msgq->last = &mp->next; - erts_smp_proc_unlock(t_p, ERTS_PROC_LOCK_TRACE); + erts_proc_unlock(t_p, ERTS_PROC_LOCK_TRACE); } goto done; } else { - erts_smp_proc_unlock(t_p, ERTS_PROC_LOCK_TRACE); + erts_proc_unlock(t_p, ERTS_PROC_LOCK_TRACE); rp_locks &= ~ERTS_PROC_LOCK_TRACE; rp_locks |= ERTS_PROC_LOCK_MSGQ; } } -#endif /* ERTS_SMP */ erts_queue_message(rp, rp_locks, mp, msg, c_p ? c_p->common.id : am_undefined); -#ifdef ERTS_SMP done: if (c_p == rp) rp_locks &= ~ERTS_PROC_LOCK_MAIN; if (rp_locks & ~lc_locks) - erts_smp_proc_unlock(rp, rp_locks & ~lc_locks); + erts_proc_unlock(rp, rp_locks & ~lc_locks); if (c_p && (env->proc->static_flags & ERTS_STC_FLG_SHADOW_PROC)) - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); -#endif + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); if (scheduler <= 0) erts_proc_dec_refc(rp); @@ -863,15 +885,9 @@ enif_port_command(ErlNifEnv *env, const ErlNifPort* to_port, if (scheduler > 0) prt = erts_port_lookup(to_port->port_id, iflags); else { -#ifdef ERTS_SMP if (ERTS_PROC_IS_EXITING(c_p)) return 0; prt = erts_thr_port_lookup(to_port->port_id, iflags); -#else - erts_exit(ERTS_ABORT_EXIT, - "enif_port_command: called from non-scheduler " - "thread on non-SMP VM"); -#endif } if (!prt) @@ -1046,11 +1062,6 @@ int enif_is_number(ErlNifEnv* env, ERL_NIF_TERM term) return is_number(term); } -static ERTS_INLINE int is_proc_bound(ErlNifEnv* env) -{ - return env->mod_nif != NULL; -} - static void aligned_binary_dtor(struct enif_tmp_obj_t* obj) { erts_free_aligned_binary_bytes_extra((byte*)obj, obj->allocator); @@ -1085,22 +1096,14 @@ int enif_inspect_binary(ErlNifEnv* env, Eterm bin_term, ErlNifBinary* bin) u.tmp->dtor = &aligned_binary_dtor; env->tmp_obj_list = u.tmp; } - bin->bin_term = bin_term; bin->size = binary_size(bin_term); bin->ref_bin = NULL; ADD_READONLY_CHECK(env, bin->data, bin->size); return 1; } -static void tmp_alloc_dtor(struct enif_tmp_obj_t* obj) -{ - erts_free(obj->allocator, obj); -} - int enif_inspect_iolist_as_binary(ErlNifEnv* env, Eterm term, ErlNifBinary* bin) { - struct enif_tmp_obj_t* tobj; - ErtsAlcType_t allocator; ErlDrvSizeT sz; if (is_binary(term)) { return enif_inspect_binary(env,term,bin); @@ -1108,7 +1111,6 @@ int enif_inspect_iolist_as_binary(ErlNifEnv* env, Eterm term, ErlNifBinary* bin) if (is_nil(term)) { bin->data = (unsigned char*) &bin->data; /* dummy non-NULL */ bin->size = 0; - bin->bin_term = THE_NON_VALUE; bin->ref_bin = NULL; return 1; } @@ -1116,16 +1118,8 @@ int enif_inspect_iolist_as_binary(ErlNifEnv* env, Eterm term, ErlNifBinary* bin) return 0; } - allocator = is_proc_bound(env) ? ERTS_ALC_T_TMP : ERTS_ALC_T_NIF; - tobj = erts_alloc(allocator, sz + sizeof(struct enif_tmp_obj_t)); - tobj->allocator = allocator; - tobj->next = env->tmp_obj_list; - tobj->dtor = &tmp_alloc_dtor; - env->tmp_obj_list = tobj; - - bin->data = (unsigned char*) &tobj[1]; + bin->data = alloc_tmp_obj(env, sz, &tmp_alloc_dtor); bin->size = sz; - bin->bin_term = THE_NON_VALUE; bin->ref_bin = NULL; erts_iolist_to_buf(term, (char*) bin->data, sz); ADD_READONLY_CHECK(env, bin->data, bin->size); @@ -1143,7 +1137,6 @@ int enif_alloc_binary(size_t size, ErlNifBinary* bin) bin->size = size; bin->data = (unsigned char*) refbin->orig_bytes; - bin->bin_term = THE_NON_VALUE; bin->ref_bin = refbin; return 1; } @@ -1177,12 +1170,10 @@ void enif_release_binary(ErlNifBinary* bin) { if (bin->ref_bin != NULL) { Binary* refbin = bin->ref_bin; - ASSERT(bin->bin_term == THE_NON_VALUE); erts_bin_release(refbin); } #ifdef DEBUG bin->data = NULL; - bin->bin_term = THE_NON_VALUE; bin->ref_bin = NULL; #endif } @@ -1230,11 +1221,12 @@ size_t enif_binary_to_term(ErlNifEnv *dst_env, Sint size; ErtsHeapFactory factory; byte *bp = (byte*) data; + Uint32 flags = 0; - ERTS_CT_ASSERT(ERL_NIF_BIN2TERM_SAFE == ERTS_DIST_EXT_BTT_SAFE); - - if (opts & ~ERL_NIF_BIN2TERM_SAFE) { - return 0; + switch ((Uint32)opts) { + case 0: break; + case ERL_NIF_BIN2TERM_SAFE: flags = ERTS_DIST_EXT_BTT_SAFE; break; + default: return 0; } if ((size = erts_decode_ext_size(bp, data_sz)) < 0) return 0; @@ -1246,7 +1238,7 @@ size_t enif_binary_to_term(ErlNifEnv *dst_env, erts_factory_dummy_init(&factory); } - *term = erts_decode_ext(&factory, &bp, (Uint32)opts); + *term = erts_decode_ext(&factory, &bp, flags); if (is_non_value(*term)) { return 0; @@ -1338,39 +1330,51 @@ int enif_get_string(ErlNifEnv *env, ERL_NIF_TERM list, char* buf, unsigned len, Eterm enif_make_binary(ErlNifEnv* env, ErlNifBinary* bin) { - if (bin->bin_term != THE_NON_VALUE) { - return bin->bin_term; - } - else if (bin->ref_bin != NULL) { - Binary* bptr = bin->ref_bin; - ProcBin* pb; - Eterm bin_term; - - /* !! Copy-paste from new_binary() !! */ - pb = (ProcBin *) alloc_heap(env, PROC_BIN_SIZE); - pb->thing_word = HEADER_PROC_BIN; - pb->size = bptr->orig_size; - pb->next = MSO(env->proc).first; - MSO(env->proc).first = (struct erl_off_heap_header*) pb; - pb->val = bptr; - pb->bytes = (byte*) bptr->orig_bytes; - pb->flags = 0; - - OH_OVERHEAD(&(MSO(env->proc)), pb->size / sizeof(Eterm)); - bin_term = make_binary(pb); - if (erts_refc_read(&bptr->intern.refc, 1) == 1) { - /* Total ownership transfer */ - bin->ref_bin = NULL; - bin->bin_term = bin_term; - } - return bin_term; - } - else { - flush_env(env); - bin->bin_term = new_binary(env->proc, bin->data, bin->size); - cache_env(env); - return bin->bin_term; + Eterm bin_term; + + if (bin->ref_bin != NULL) { + Binary* binary = bin->ref_bin; + + /* If the binary is smaller than the heap binary limit we'll return a + * heap binary to reduce the number of small refc binaries in the + * system. We can't simply release the refc binary right away however; + * the documentation states that the binary should be considered + * read-only from this point on, which implies that it should still be + * readable. + * + * We could keep it alive until we return by adding it to the temporary + * object list, but that requires an off-heap allocation which is + * potentially quite slow, so we create a dummy ProcBin instead and + * rely on the next minor GC to get rid of it. */ + if (bin->size <= ERL_ONHEAP_BIN_LIMIT) { + ErlHeapBin* hb; + + hb = (ErlHeapBin*)alloc_heap(env, heap_bin_size(bin->size)); + hb->thing_word = header_heap_bin(bin->size); + hb->size = bin->size; + + sys_memcpy(hb->data, bin->data, bin->size); + + erts_build_proc_bin(&MSO(env->proc), + alloc_heap(env, PROC_BIN_SIZE), + binary); + + bin_term = make_binary(hb); + } else { + bin_term = erts_build_proc_bin(&MSO(env->proc), + alloc_heap(env, PROC_BIN_SIZE), + binary); + } + + /* Our (possibly shared) ownership has been transferred to the term. */ + bin->ref_bin = NULL; + } else { + flush_env(env); + bin_term = new_binary(env->proc, bin->data, bin->size); + cache_env(env); } + + return bin_term; } Eterm enif_make_sub_binary(ErlNifEnv* env, ERL_NIF_TERM bin_term, @@ -1854,18 +1858,11 @@ int enif_is_process_alive(ErlNifEnv* env, ErlNifPid *proc) if (scheduler > 0) return !!erts_proc_lookup(proc->pid); else { -#ifdef ERTS_SMP Process* rp = erts_pid2proc_opt(NULL, 0, proc->pid, 0, ERTS_P2P_FLG_INC_REFC); if (rp) erts_proc_dec_refc(rp); return !!rp; -#else - erts_exit(ERTS_ABORT_EXIT, "enif_is_process_alive: " - "called from non-scheduler thread " - "in non-smp emulator"); - return 0; -#endif } } @@ -1881,17 +1878,10 @@ int enif_is_port_alive(ErlNifEnv *env, ErlNifPort *port) if (scheduler > 0) return !!erts_port_lookup(port->port_id, iflags); else { -#ifdef ERTS_SMP Port *prt = erts_thr_port_lookup(port->port_id, iflags); if (prt) erts_port_dec_refc(prt); return !!prt; -#else - erts_exit(ERTS_ABORT_EXIT, "enif_is_port_alive: " - "called from non-scheduler thread " - "in non-smp emulator"); - return 0; -#endif } } @@ -2099,7 +2089,7 @@ ErlNifResourceType* open_resource_type(ErlNifEnv* env, ErlNifResourceFlags op = flags; Eterm module_am, name_am; - ASSERT(erts_smp_thr_progress_is_blocking()); + ASSERT(erts_thr_progress_is_blocking()); module_am = make_atom(env->mod_nif->mod->module); name_am = enif_make_atom(env, name_str); @@ -2238,19 +2228,14 @@ static void destroy_one_monitor(ErtsMonitor* mon, void* context) rp = erts_proc_lookup(mon->u.pid); } else { -#ifdef ERTS_SMP rp = erts_proc_lookup_inc_refc(mon->u.pid); -#else - ASSERT(!"nif monitor destruction in non-scheduler thread"); - rp = NULL; -#endif } if (!rp) { is_exiting = 1; } if (rp) { - erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); + erts_proc_lock(rp, ERTS_PROC_LOCK_LINK); if (ERTS_PROC_IS_EXITING(rp)) { is_exiting = 1; } else { @@ -2258,11 +2243,9 @@ static void destroy_one_monitor(ErtsMonitor* mon, void* context) ASSERT(rmon); is_exiting = 0; } - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); -#ifdef ERTS_SMP + erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); if (ctx->scheduler <= 0) erts_proc_dec_refc(rp); -#endif } if (is_exiting) { ctx->resource->monitors->pending_failed_fire++; @@ -2288,34 +2271,7 @@ static void destroy_all_monitors(ErtsMonitor* monitors, ErtsResource* resource) } -#ifdef ERTS_SMP # define NIF_RESOURCE_DTOR &nif_resource_dtor -#else -# define NIF_RESOURCE_DTOR &nosmp_nif_resource_dtor_prologue - -/* - * NO-SMP: Always run resource destructor on scheduler thread - * as we may have to remove process monitors. - */ -static int nif_resource_dtor(Binary*); - -static void nosmp_nif_resource_dtor_scheduled(void* vbin) -{ - erts_bin_free((Binary*)vbin); -} - -static int nosmp_nif_resource_dtor_prologue(Binary* bin) -{ - if (is_scheduler()) { - return nif_resource_dtor(bin); - } - else { - erts_schedule_misc_aux_work(1, nosmp_nif_resource_dtor_scheduled, bin); - return 0; /* do not free */ - } -} - -#endif /* !ERTS_SMP */ static int nif_resource_dtor(Binary* bin) { @@ -2327,7 +2283,7 @@ static int nif_resource_dtor(Binary* bin) ErtsResourceMonitors* rm = resource->monitors; ASSERT(type->down); - erts_smp_mtx_lock(&rm->lock); + erts_mtx_lock(&rm->lock); ASSERT(erts_refc_read(&bin->intern.refc, 0) == 0); if (rm->root) { ASSERT(!rm->is_dying); @@ -2349,11 +2305,11 @@ static int nif_resource_dtor(Binary* bin) */ ASSERT(!rm->is_dying); rm->is_dying = 1; - erts_smp_mtx_unlock(&rm->lock); + erts_mtx_unlock(&rm->lock); return 0; } - erts_smp_mtx_unlock(&rm->lock); - erts_smp_mtx_destroy(&rm->lock); + erts_mtx_unlock(&rm->lock); + erts_mtx_destroy(&rm->lock); } if (type->dtor != NULL) { @@ -2394,12 +2350,12 @@ void erts_fire_nif_monitor(ErtsResource* resource, Eterm pid, Eterm ref) ASSERT(rmp); ASSERT(resource->type->down); - erts_smp_mtx_lock(&rmp->lock); + erts_mtx_lock(&rmp->lock); rmon = erts_remove_monitor(&rmp->root, ref); if (!rmon) { int free_me = (--rmp->pending_failed_fire == 0) && rmp->is_dying; ASSERT(rmp->pending_failed_fire >= 0); - erts_smp_mtx_unlock(&rmp->lock); + erts_mtx_unlock(&rmp->lock); if (free_me) { ASSERT(erts_refc_read(&bin->binary.intern.refc, 0) == 0); @@ -2415,10 +2371,10 @@ void erts_fire_nif_monitor(ErtsResource* resource, Eterm pid, Eterm ref) * we avoid calling 'down' and just silently remove the monitor. * This can happen even for non smp as destructor calls may be scheduled. */ - erts_smp_mtx_unlock(&rmp->lock); + erts_mtx_unlock(&rmp->lock); } else { - erts_smp_mtx_unlock(&rmp->lock); + erts_mtx_unlock(&rmp->lock); ASSERT(rmon->u.pid == pid); erts_ref_to_driver_monitor(ref, &nif_monitor); @@ -2463,7 +2419,7 @@ void* enif_alloc_resource(ErlNifResourceType* type, size_t data_sz) erts_refc_inc(&resource->type->refc, 2); if (type->down) { resource->monitors = (ErtsResourceMonitors*) (resource->data + monitors_offs); - erts_smp_mtx_init(&resource->monitors->lock, "resource_monitors", NIL, + erts_mtx_init(&resource->monitors->lock, "resource_monitors", NIL, ERTS_LOCK_FLAGS_CATEGORY_GENERIC); resource->monitors->root = NULL; resource->monitors->pending_failed_fire = 0; @@ -2662,7 +2618,6 @@ nif_export_restore(Process *c_p, NifExport *ep, Eterm res) } -#ifdef ERTS_DIRTY_SCHEDULERS /* * Finalize a dirty NIF call. This function is scheduled to cause the VM to @@ -2732,7 +2687,7 @@ schedule_dirty_nif(ErlNifEnv* env, int flags, NativeFunPtr fp, execution_state(env, &proc, NULL); - (void) erts_smp_atomic32_read_bset_nob(&proc->state, + (void) erts_atomic32_read_bset_nob(&proc->state, (ERTS_PSFLG_DIRTY_CPU_PROC | ERTS_PSFLG_DIRTY_IO_PROC), (flags == ERL_NIF_DIRTY_JOB_CPU_BOUND @@ -2770,7 +2725,7 @@ static_schedule_dirty_nif(ErlNifEnv* env, erts_aint32_t dirty_psflg, ASSERT(is_atom(mod) && is_atom(func)); ASSERT(fp); - (void) erts_smp_atomic32_read_bset_nob(&proc->state, + (void) erts_atomic32_read_bset_nob(&proc->state, (ERTS_PSFLG_DIRTY_CPU_PROC | ERTS_PSFLG_DIRTY_IO_PROC), dirty_psflg); @@ -2790,7 +2745,6 @@ static_schedule_dirty_cpu_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[ return static_schedule_dirty_nif(env, ERTS_PSFLG_DIRTY_CPU_PROC, argc, argv); } -#endif /* ERTS_DIRTY_SCHEDULERS */ /* * NIF execution wrapper used by enif_schedule_nif() for regular NIFs. It @@ -2864,24 +2818,20 @@ enif_schedule_nif(ErlNifEnv* env, const char* fun_name, int flags, if (scheduler <= 0) { if (scheduler == 0) enif_make_badarg(env); - erts_smp_proc_lock(proc, ERTS_PROC_LOCK_MAIN); + erts_proc_lock(proc, ERTS_PROC_LOCK_MAIN); } if (flags == 0) result = schedule(env, execute_nif, fp, proc->current->module, fun_name_atom, argc, argv); else if (!(flags & ~(ERL_NIF_DIRTY_JOB_IO_BOUND|ERL_NIF_DIRTY_JOB_CPU_BOUND))) { -#ifdef ERTS_DIRTY_SCHEDULERS result = schedule_dirty_nif(env, flags, fp, fun_name_atom, argc, argv); -#else - result = enif_raise_exception(env, am_notsup); -#endif } else result = enif_make_badarg(env); if (scheduler < 0) - erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_MAIN); + erts_proc_unlock(proc, ERTS_PROC_LOCK_MAIN); return result; } @@ -2897,12 +2847,10 @@ enif_thread_type(void) switch (esdp->type) { case ERTS_SCHED_NORMAL: return ERL_NIF_THR_NORMAL_SCHEDULER; -#ifdef ERTS_DIRTY_SCHEDULERS case ERTS_SCHED_DIRTY_CPU: return ERL_NIF_THR_DIRTY_CPU_SCHEDULER; case ERTS_SCHED_DIRTY_IO: return ERL_NIF_THR_DIRTY_IO_SCHEDULER; -#endif default: ERTS_INTERNAL_ERROR("Invalid scheduler type"); return -1; @@ -3223,27 +3171,19 @@ int enif_monitor_process(ErlNifEnv* env, void* obj, const ErlNifPid* target_pid, execution_state(env, NULL, &scheduler); -#ifdef ERTS_SMP if (scheduler > 0) /* Normal scheduler */ rp = erts_proc_lookup_raw(target_pid->pid); else rp = erts_proc_lookup_raw_inc_refc(target_pid->pid); -#else - if (scheduler <= 0) { - erts_exit(ERTS_ABORT_EXIT, "enif_monitor_process: called from " - "non-scheduler thread on non-SMP VM"); - } - rp = erts_proc_lookup(target_pid->pid); -#endif if (!rp) return 1; ref = erts_make_ref_in_buffer(tmp); - erts_smp_mtx_lock(&rsrc->monitors->lock); - erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); - if (ERTS_PSFLG_FREE & erts_smp_atomic32_read_nob(&rp->state)) { + erts_mtx_lock(&rsrc->monitors->lock); + erts_proc_lock(rp, ERTS_PROC_LOCK_LINK); + if (ERTS_PSFLG_FREE & erts_atomic32_read_nob(&rp->state)) { retval = 1; } else { @@ -3251,13 +3191,11 @@ int enif_monitor_process(ErlNifEnv* env, void* obj, const ErlNifPid* target_pid, erts_add_monitor(&ERTS_P_MONITORS(rp), MON_NIF_TARGET, ref, (UWord)rsrc, NIL); retval = 0; } - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - erts_smp_mtx_unlock(&rsrc->monitors->lock); + erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + erts_mtx_unlock(&rsrc->monitors->lock); -#ifdef ERTS_SMP if (scheduler <= 0) erts_proc_dec_refc(rp); -#endif if (monitor) erts_ref_to_driver_monitor(ref,monitor); @@ -3285,35 +3223,27 @@ int enif_demonitor_process(ErlNifEnv* env, void* obj, const ErlNifMonitor* monit ref = erts_driver_monitor_to_ref(ref_heap, monitor); - erts_smp_mtx_lock(&rsrc->monitors->lock); + erts_mtx_lock(&rsrc->monitors->lock); mon = erts_remove_monitor(&rsrc->monitors->root, ref); if (mon == NULL) { - erts_smp_mtx_unlock(&rsrc->monitors->lock); + erts_mtx_unlock(&rsrc->monitors->lock); return 1; } ASSERT(mon->type == MON_ORIGIN); ASSERT(is_internal_pid(mon->u.pid)); -#ifdef ERTS_SMP if (scheduler > 0) /* Normal scheduler */ rp = erts_proc_lookup(mon->u.pid); else rp = erts_proc_lookup_inc_refc(mon->u.pid); -#else - if (scheduler <= 0) { - erts_exit(ERTS_ABORT_EXIT, "enif_demonitor_process: called from " - "non-scheduler thread on non-SMP VM"); - } - rp = erts_proc_lookup(mon->u.pid); -#endif if (!rp) { is_exiting = 1; } else { - erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); + erts_proc_lock(rp, ERTS_PROC_LOCK_LINK); if (ERTS_PROC_IS_EXITING(rp)) { is_exiting = 1; } else { @@ -3321,17 +3251,15 @@ int enif_demonitor_process(ErlNifEnv* env, void* obj, const ErlNifMonitor* monit ASSERT(rmon); is_exiting = 0; } - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); -#ifdef ERTS_SMP if (scheduler <= 0) erts_proc_dec_refc(rp); -#endif } if (is_exiting) { rsrc->monitors->pending_failed_fire++; } - erts_smp_mtx_unlock(&rsrc->monitors->lock); + erts_mtx_unlock(&rsrc->monitors->lock); if (rmon) { ASSERT(rmon->type == MON_NIF_TARGET); @@ -3492,7 +3420,6 @@ static void marshal_iovec_binary(Eterm binary, ErlNifBinary *copy_buffer, parent_header = binary_val(parent_binary); result->size = binary_size(binary); - result->bin_term = binary; if (thing_subtag(*parent_header) == REFC_BINARY_SUBTAG) { ProcBin *pb = (ProcBin*)parent_header; @@ -3520,6 +3447,7 @@ static void marshal_iovec_binary(Eterm binary, ErlNifBinary *copy_buffer, * and reference that instead. */ if (result->ref_bin == NULL || bit_offset != 0) { + ASSERT(copy_buffer->ref_bin != NULL && copy_buffer->data != NULL); ASSERT(result->size <= (copy_buffer->size - *copy_offset)); if (bit_offset == 0) { @@ -3541,8 +3469,8 @@ static void marshal_iovec_binary(Eterm binary, ErlNifBinary *copy_buffer, static int fill_iovec_with_slice(ErlNifEnv *env, iovec_slice_t *slice, ErlNifIOVec *iovec) { + ErlNifBinary copy_buffer = {0}; UWord copy_offset, iovec_idx; - ErlNifBinary copy_buffer; Eterm sublist_iterator; /* Set up a common refc binary for all on-heap and unaligned binaries. */ @@ -3550,11 +3478,8 @@ static int fill_iovec_with_slice(ErlNifEnv *env, if (!enif_alloc_binary(slice->copied_size, ©_buffer)) { return 0; } - } else { -#ifdef DEBUG - copy_buffer.data = NULL; - copy_buffer.size = 0; -#endif + + ASSERT(copy_buffer.ref_bin != NULL); } sublist_iterator = slice->sublist_start; @@ -3604,9 +3529,11 @@ static int fill_iovec_with_slice(ErlNifEnv *env, } } else { if (slice->copied_size > 0) { - /* Attach the binary to our environment and let the GC take care of - * it after returning. */ - enif_make_binary(env, ©_buffer); + /* Attach the binary to our environment and let the next minor GC + * get rid of it. This is slightly faster than using the tmp object + * list since it avoids off-heap allocations. */ + erts_build_proc_bin(&MSO(env->proc), + alloc_heap(env, PROC_BIN_SIZE), copy_buffer.ref_bin); } } @@ -3632,19 +3559,14 @@ static int create_iovec_from_slice(ErlNifEnv *env, alloc_size = binv_offset; alloc_size += slice->iovec_len * sizeof(Binary*); - /* If we have an environment we'll attach the allocated data to it. The - * GC will take care of releasing it later on. */ + /* When the user passes an environment, we attach the iovec to it so + * the user won't have to bother managing it (similar to + * enif_inspect_binary). It'll disappear once the environment is + * cleaned up. */ if (env != NULL) { - ErlNifBinary gc_bin; - - if (!enif_alloc_binary(alloc_size, &gc_bin)) { - return 0; - } - - alloc_base = (char*)gc_bin.data; - enif_make_binary(env, &gc_bin); + alloc_base = alloc_tmp_obj(env, alloc_size, &tmp_alloc_dtor); } else { - alloc_base = enif_alloc(alloc_size); + alloc_base = erts_alloc(ERTS_ALC_T_NIF, alloc_size); } iovec = (ErlNifIOVec*)alloc_base; @@ -3658,7 +3580,7 @@ static int create_iovec_from_slice(ErlNifEnv *env, if(!fill_iovec_with_slice(env, slice, iovec)) { if (env == NULL && !(iovec->flags & ERL_NIF_IOVEC_FLAGS_PREALLOC)) { - enif_free(iovec); + erts_free(ERTS_ALC_T_NIF, iovec); } return 0; @@ -3725,6 +3647,55 @@ int enif_ioq_deq(ErlNifIOQueue *q, size_t elems, size_t *size) return 1; } +int enif_ioq_peek_head(ErlNifEnv *env, ErlNifIOQueue *q, size_t *size, ERL_NIF_TERM *bin_term) { + SysIOVec *iov_entry; + Binary *ref_bin; + + if (q->size == 0) { + return 0; + } + + ASSERT(q->b_head != q->b_tail && q->v_head != q->v_tail); + + ref_bin = &q->b_head[0]->nif; + iov_entry = &q->v_head[0]; + + if (size != NULL) { + *size = iov_entry->iov_len; + } + + if (iov_entry->iov_len > ERL_ONHEAP_BIN_LIMIT) { + ProcBin *pb = (ProcBin*)alloc_heap(env, PROC_BIN_SIZE); + + pb->thing_word = HEADER_PROC_BIN; + pb->next = MSO(env->proc).first; + pb->val = ref_bin; + pb->flags = 0; + + ASSERT((byte*)iov_entry->iov_base >= (byte*)ref_bin->orig_bytes); + ASSERT(iov_entry->iov_len <= ref_bin->orig_size); + + pb->bytes = (byte*)iov_entry->iov_base; + pb->size = iov_entry->iov_len; + + MSO(env->proc).first = (struct erl_off_heap_header*) pb; + OH_OVERHEAD(&(MSO(env->proc)), pb->size / sizeof(Eterm)); + + erts_refc_inc(&ref_bin->intern.refc, 2); + *bin_term = make_binary(pb); + } else { + ErlHeapBin* hb = (ErlHeapBin*)alloc_heap(env, heap_bin_size(iov_entry->iov_len)); + + hb->thing_word = header_heap_bin(iov_entry->iov_len); + hb->size = iov_entry->iov_len; + + sys_memcpy(hb->data, iov_entry->iov_base, iov_entry->iov_len); + *bin_term = make_binary(hb); + } + + return 1; +} + SysIOVec *enif_ioq_peek(ErlNifIOQueue *q, int *iovlen) { return erts_ioq_peekq(q, iovlen); @@ -3741,7 +3712,7 @@ static ErtsCodeInfo** get_func_pp(BeamCodeHeader* mod_code, Eterm f_atom, unsign int j; for (j = 0; j < n; ++j) { ErtsCodeInfo* ci = mod_code->functions[j]; - ASSERT(ci->op == (BeamInstr) BeamOp(op_i_func_info_IaaI)); + ASSERT(BeamIsOpCode(ci->op, op_i_func_info_IaaI)); if (f_atom == ci->mfa.function && arity == ci->mfa.arity) { return mod_code->functions+j; @@ -3937,8 +3908,8 @@ BIF_RETTYPE load_nif_2(BIF_ALIST_2) } /* Block system (is this the right place to do it?) */ - erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - erts_smp_thr_progress_block(); + erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_block(); /* Find calling module */ ASSERT(BIF_P->current != NULL); @@ -4043,14 +4014,9 @@ BIF_RETTYPE load_nif_2(BIF_ALIST_2) * dirty scheduler support, treat a non-zero flags field as * a load error. */ -#ifdef ERTS_DIRTY_SCHEDULERS if (f->flags != ERL_NIF_DIRTY_JOB_IO_BOUND && f->flags != ERL_NIF_DIRTY_JOB_CPU_BOUND) ret = load_nif_error(BIF_P, bad_lib, "Illegal flags field value %d for NIF %T:%s/%u", f->flags, mod_atom, f->name, f->arity); -#else - ret = load_nif_error(BIF_P, bad_lib, "NIF %T:%s/%u requires a runtime with dirty scheduler support.", - mod_atom, f->name, f->arity); -#endif } else if (erts_codeinfo_to_code(ci_pp[1]) - erts_codeinfo_to_code(ci_pp[0]) < BEAM_NIF_MIN_FUNC_SZ) @@ -4115,15 +4081,13 @@ BIF_RETTYPE load_nif_2(BIF_ALIST_2) code_ptr = erts_codeinfo_to_code(ci); if (ci->u.gen_bp == NULL) { - code_ptr[0] = (BeamInstr) BeamOp(op_call_nif); + code_ptr[0] = BeamOpCodeAddr(op_call_nif); } else { /* Function traced, patch the original instruction word */ GenericBp* g = ci->u.gen_bp; - ASSERT(code_ptr[0] == - (BeamInstr) BeamOp(op_i_generic_breakpoint)); - g->orig_instr = (BeamInstr) BeamOp(op_call_nif); + ASSERT(BeamIsOpCode(code_ptr[0], op_i_generic_breakpoint)); + g->orig_instr = BeamOpCodeAddr(op_call_nif); } -#ifdef ERTS_DIRTY_SCHEDULERS if (f->flags) { code_ptr[3] = (BeamInstr) f->fptr; code_ptr[1] = (f->flags == ERL_NIF_DIRTY_JOB_IO_BOUND) ? @@ -4131,7 +4095,6 @@ BIF_RETTYPE load_nif_2(BIF_ALIST_2) (BeamInstr) static_schedule_dirty_cpu_nif; } else -#endif code_ptr[1] = (BeamInstr) f->fptr; code_ptr[2] = (BeamInstr) lib; } @@ -4149,8 +4112,8 @@ BIF_RETTYPE load_nif_2(BIF_ALIST_2) erts_sys_ddll_free_error(&errdesc); } - erts_smp_thr_progress_unblock(); - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_unblock(); + erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); erts_release_code_write_permission(); erts_free(ERTS_ALC_T_TMP, lib_name); @@ -4163,7 +4126,7 @@ erts_unload_nif(struct erl_module_nif* lib) { ErlNifResourceType* rt; ErlNifResourceType* next; - ASSERT(erts_smp_thr_progress_is_blocking()); + ASSERT(erts_thr_progress_is_blocking()); ASSERT(lib != NULL); ASSERT(lib->mod != NULL); @@ -4235,8 +4198,8 @@ Eterm erts_nif_call_function(Process *p, Process *tracee, break; ASSERT(i < mod->entry.num_of_funcs); if (p) - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) & ERTS_PROC_LOCK_MAIN - || erts_smp_thr_progress_is_blocking()); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(p) & ERTS_PROC_LOCK_MAIN + || erts_thr_progress_is_blocking()); #endif if (p) { /* This is almost a normal nif call like in beam_emu, @@ -4307,34 +4270,31 @@ static unsigned calc_checksum(unsigned char* ptr, unsigned size); struct readonly_check_t { - struct enif_tmp_obj_t hdr; unsigned char* ptr; unsigned size; unsigned checksum; }; static void add_readonly_check(ErlNifEnv* env, unsigned char* ptr, unsigned sz) { - ErtsAlcType_t allocator = is_proc_bound(env) ? ERTS_ALC_T_TMP : ERTS_ALC_T_NIF; - struct readonly_check_t* obj = erts_alloc(allocator, - sizeof(struct readonly_check_t)); - obj->hdr.allocator = allocator; - obj->hdr.next = env->tmp_obj_list; - env->tmp_obj_list = &obj->hdr; - obj->hdr.dtor = &readonly_check_dtor; + struct readonly_check_t* obj; + + obj = alloc_tmp_obj(env, sizeof(struct readonly_check_t), + &readonly_check_dtor); + obj->ptr = ptr; obj->size = sz; - obj->checksum = calc_checksum(ptr, sz); + obj->checksum = calc_checksum(ptr, sz); } -static void readonly_check_dtor(struct enif_tmp_obj_t* o) +static void readonly_check_dtor(struct enif_tmp_obj_t* tmp_obj) { - struct readonly_check_t* obj = (struct readonly_check_t*) o; - unsigned chksum = calc_checksum(obj->ptr, obj->size); - if (chksum != obj->checksum) { + struct readonly_check_t* ro_check = (struct readonly_check_t*)&tmp_obj[1]; + unsigned chksum = calc_checksum(ro_check->ptr, ro_check->size); + if (chksum != ro_check->checksum) { fprintf(stderr, "\r\nReadonly data written by NIF, checksums differ" - " %x != %x\r\nABORTING\r\n", chksum, obj->checksum); + " %x != %x\r\nABORTING\r\n", chksum, ro_check->checksum); abort(); } - erts_free(obj->hdr.allocator, obj); + erts_free(tmp_obj->allocator, tmp_obj); } static unsigned calc_checksum(unsigned char* ptr, unsigned size) { |