aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_nif.c
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2017-02-06 17:15:52 +0100
committerSverker Eriksson <[email protected]>2017-02-09 15:51:26 +0100
commit75fdce43ef567668bb89508b9b8ce0df7efaa569 (patch)
tree4b71d35042235a8a847683fe877890e22c7e874f /erts/emulator/beam/erl_nif.c
parent7111434c0eb1fdd6576a99ca94cdc2b20be9b9af (diff)
downloadotp-75fdce43ef567668bb89508b9b8ce0df7efaa569.tar.gz
otp-75fdce43ef567668bb89508b9b8ce0df7efaa569.tar.bz2
otp-75fdce43ef567668bb89508b9b8ce0df7efaa569.zip
erts: Add enif_monitor_process and enif_demonitor_process
Diffstat (limited to 'erts/emulator/beam/erl_nif.c')
-rw-r--r--erts/emulator/beam/erl_nif.c402
1 files changed, 388 insertions, 14 deletions
diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c
index b7425b9e45..62191c4abf 100644
--- a/erts/emulator/beam/erl_nif.c
+++ b/erts/emulator/beam/erl_nif.c
@@ -1913,8 +1913,6 @@ int enif_snprintf(char *buffer, size_t size, const char* format, ...)
/* dummy node in circular list */
struct enif_resource_type_t resource_type_list;
-#define SIZEOF_ErlNifResource(SIZE) (offsetof(ErtsResource,data) + (SIZE))
-
static ErlNifResourceType* find_resource_type(Eterm module, Eterm name)
{
ErlNifResourceType* type;
@@ -2083,6 +2081,7 @@ static void commit_opened_resource_types(struct erl_module_nif* lib)
type->owner = lib;
type->dtor = ort->new_callbacks.dtor;
type->stop = ort->new_callbacks.stop;
+ type->down = ort->new_callbacks.down;
if (type->dtor != NULL) {
erts_refc_inc(&lib->rt_dtor_cnt, 1);
@@ -2108,12 +2107,148 @@ static void rollback_opened_resource_types(void)
}
}
+struct destroy_monitor_ctx
+{
+ Binary* resource_bin;
+ int exiting_procs;
+ int scheduler;
+};
+
+static void destroy_one_monitor(ErtsMonitor* mon, void* context)
+{
+ struct destroy_monitor_ctx* ctx = (struct destroy_monitor_ctx*) context;
+ Process* rp;
+ ErtsMonitor *rmon = NULL;
+ int is_exiting;
+
+ ASSERT(mon->type == MON_ORIGIN);
+ ASSERT(is_internal_pid(mon->u.pid));
+ ASSERT(is_internal_ref(mon->ref));
+
+ if (ctx->scheduler > 0) { /* Normal scheduler */
+ rp = erts_proc_lookup(mon->u.pid);
+ }
+ else {
+#ifdef ERTS_SMP
+ rp = erts_proc_lookup_inc_refc(mon->u.pid);
+#else
+ ASSERT(!"nif monitor destruction in non-scheduler thread");
+ rp = NULL;
+#endif
+ }
+
+ if (!rp) {
+ is_exiting = 1;
+ }
+ if (rp) {
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
+ if (ERTS_PROC_IS_EXITING(rp)) {
+ is_exiting = 1;
+ } else {
+ rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
+ ASSERT(rmon);
+ is_exiting = 0;
+ }
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+#ifdef ERTS_SMP
+ if (ctx->scheduler <= 0)
+ erts_proc_dec_refc(rp);
+#endif
+ }
+ if (is_exiting) {
+ ctx->exiting_procs++;
+ /* +1 for exiting process */
+ erts_refc_inc(&ctx->resource_bin->refc, ctx->exiting_procs);
+ }
+
+ /* ToDo: Delay destruction after monitor_locks */
+ if (rmon) {
+ ASSERT(rmon->type == MON_NIF_TARGET);
+ ASSERT(&ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(rmon->u.pid)->binary == ctx->resource_bin);
+ erts_destroy_monitor(rmon);
+ }
+ erts_destroy_monitor(mon);
+}
+
+static int destroy_all_monitors(ErtsMonitor* monitors, Binary* bin)
+{
+ struct destroy_monitor_ctx ctx;
+
+ execution_state(NULL, NULL, &ctx.scheduler);
+
+ ctx.resource_bin = bin;
+ ctx.exiting_procs = 0;
+ erts_sweep_monitors(monitors, &destroy_one_monitor, &ctx);
+ return ctx.exiting_procs == 0;
+}
+
+
+#ifdef ERTS_SMP
+# define NIF_RESOURCE_DTOR &nif_resource_dtor
+#else
+# define NIF_RESOURCE_DTOR &nosmp_nif_resource_dtor_prologue
+
+/*
+ * NO-SMP: Always run resource destructor on scheduler thread
+ * as we may have to remove process monitors.
+ */
+static int nif_resource_dtor(Binary*);
+
+static void nosmp_nif_resource_dtor_scheduled(void* vbin)
+{
+ erts_bin_free((Binary*)vbin);
+}
+
+static int nosmp_nif_resource_dtor_prologue(Binary* bin)
+{
+ if (is_scheduler()) {
+ return nif_resource_dtor(bin);
+ }
+ else {
+ erts_schedule_misc_aux_work(1, nosmp_nif_resource_dtor_scheduled, bin);
+ return 0; /* do not free */
+ }
+}
+
+#endif /* !ERTS_SMP */
static int nif_resource_dtor(Binary* bin)
{
ErtsResource* resource = (ErtsResource*) ERTS_MAGIC_BIN_UNALIGNED_DATA(bin);
ErlNifResourceType* type = resource->type;
- ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == &nif_resource_dtor);
+ ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == NIF_RESOURCE_DTOR);
+
+ if (resource->monitors) {
+ ErtsResourceMonitors* rm = resource->monitors;
+ ErtsMonitor* root;
+
+ ASSERT(type->down);
+ erts_smp_mtx_lock(&rm->lock);
+ ASSERT(erts_refc_read(&bin->refc, 0) == 0);
+ root = rm->root;
+ if (root) {
+ rm->root = NULL;
+ if (!destroy_all_monitors(root, bin)) {
+ /*
+ * Resource death struggle prolonged to serve exiting process(es).
+ * Destructor will be called again when last exiting process
+ * tries to fire its MON_NIF_TARGET monitor (and fails).
+ *
+ * This resource is doomed. It has no "real" references and
+ * should get not get called upon to do anything except the
+ * final destructor call.
+ */
+ ASSERT(erts_refc_read(&bin->refc, 1));
+#ifdef DEBUG
+ resource->dbg_is_dying = 1;
+#endif
+ erts_smp_mtx_unlock(&rm->lock);
+ return 0;
+ }
+ }
+ erts_smp_mtx_unlock(&rm->lock);
+ erts_smp_mtx_destroy(&rm->lock);
+ }
if (type->dtor != NULL) {
struct enif_msg_environment_t msg_env;
@@ -2141,20 +2276,99 @@ void erts_resource_stop(ErtsResource* resource, ErlNifEvent e,
post_nif_noproc(&msg_env);
}
-void* enif_alloc_resource(ErlNifResourceType* type, size_t size)
+/* SVERK SVERK: Move to ?.h */
+void erts_ref_to_driver_monitor(Eterm ref, ErlDrvMonitor*);
+
+void erts_fire_nif_monitor(ErtsResource* resource, Eterm pid, Eterm ref)
{
- Binary* bin = erts_create_magic_binary_x(SIZEOF_ErlNifResource(size),
- &nif_resource_dtor,
- 1); /* unaligned */
- ErtsResource* resource = ERTS_MAGIC_BIN_UNALIGNED_DATA(bin);
+ ErtsMonitor* rmon;
+ ErtsBinary* bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource);
+ struct enif_msg_environment_t msg_env;
+ ErlNifPid nif_pid;
+ ErlNifMonitor nif_monitor;
+ ErtsResourceMonitors* rmp = resource->monitors;
+
+ ASSERT(rmp);
+ ASSERT(resource->type->down);
+
+ erts_smp_mtx_lock(&rmp->lock);
+ rmon = erts_remove_monitor(&rmp->root, ref);
+ if (!rmon) {
+ erts_smp_mtx_unlock(&rmp->lock);
+ /* -1 for exiting process */
+ if (erts_refc_dectest(&bin->binary.refc, 0) == 0) {
+ erts_bin_free(&bin->binary);
+ }
+ return;
+ }
+ ASSERT(!resource->dbg_is_dying);
+ if (erts_refc_inctest(&bin->binary.refc, 1) < 2) {
+ /*
+ * Racing resource destruction.
+ * To avoid a more complex refc-dance with destructing thread
+ * we avoid calling 'down' and just silently remove the monitor.
+ * There are no real references left to this resource and we have
+ * monitors_lock, so it's safe to reset refc back to zero.
+ * This can happen even for non smp as destructor calls may be scheduled.
+ */
+ erts_refc_init(&bin->binary.refc, 0);
+ erts_smp_mtx_unlock(&rmp->lock);
+ }
+ else {
+ erts_smp_mtx_unlock(&rmp->lock);
+
+ ASSERT(rmon->u.pid == pid);
+ erts_ref_to_driver_monitor(ref, &nif_monitor);
+ nif_pid.pid = pid;
+ pre_nif_noproc(&msg_env, resource->type->owner, NULL);
+ resource->type->down(&msg_env.env, resource->data, &nif_pid, &nif_monitor);
+ post_nif_noproc(&msg_env);
+
+ if (erts_refc_dectest(&bin->binary.refc, 0) == 0) {
+ erts_bin_free(&bin->binary);
+ }
+ }
+ erts_destroy_monitor(rmon);
+}
+
+void* enif_alloc_resource(ErlNifResourceType* type, size_t data_sz)
+{
+ size_t magic_sz = offsetof(ErtsResource,data);
+ Binary* bin;
+ ErtsResource* resource;
+ size_t monitors_offs;
+
+ if (type->down) {
+ /* Put ErtsResourceMonitors after user data and properly aligned */
+ monitors_offs = ((data_sz + ERTS_ALLOC_ALIGN_BYTES - 1)
+ & ~((size_t)ERTS_ALLOC_ALIGN_BYTES - 1));
+ magic_sz += monitors_offs + sizeof(ErtsResourceMonitors);
+ }
+ else {
+ ERTS_UNDEF(monitors_offs, 0);
+ magic_sz += data_sz;
+ }
+ bin = erts_create_magic_binary_x(magic_sz, NIF_RESOURCE_DTOR,
+ 1); /* unaligned */
+ resource = ERTS_MAGIC_BIN_UNALIGNED_DATA(bin);
ASSERT(type->owner && type->next && type->prev); /* not allowed in load/upgrade */
resource->type = type;
erts_refc_inc(&bin->refc, 1);
#ifdef DEBUG
erts_refc_init(&resource->nif_refc, 1);
+ resource->dbg_is_dying = 0;
#endif
erts_refc_inc(&resource->type->refc, 2);
+ if (type->down) {
+ resource->monitors = (ErtsResourceMonitors*) (resource->data + monitors_offs);
+ erts_smp_mtx_init(&resource->monitors->lock, "resource_monitors");
+ resource->monitors->root = NULL;
+ resource->monitors->user_data_sz = data_sz;
+ }
+ else {
+ resource->monitors = NULL;
+ }
return resource->data;
}
@@ -2163,7 +2377,8 @@ void enif_release_resource(void* obj)
ErtsResource* resource = DATA_TO_RESOURCE(obj);
ErtsBinary* bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource);
- ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == &nif_resource_dtor);
+ ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == NIF_RESOURCE_DTOR);
+ ASSERT(!resource->dbg_is_dying);
#ifdef DEBUG
erts_refc_dec(&resource->nif_refc, 0);
#endif
@@ -2177,18 +2392,27 @@ void enif_keep_resource(void* obj)
ErtsResource* resource = DATA_TO_RESOURCE(obj);
ErtsBinary* bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource);
- ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == &nif_resource_dtor);
+ ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == NIF_RESOURCE_DTOR);
+ ASSERT(!resource->dbg_is_dying);
#ifdef DEBUG
erts_refc_inc(&resource->nif_refc, 1);
#endif
erts_refc_inc(&bin->binary.refc, 2);
}
+Eterm erts_bld_resource_ref(Eterm** hpp, ErlOffHeap* oh, ErtsResource* resource)
+{
+ ErtsBinary* bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource);
+ ASSERT(!resource->dbg_is_dying);
+ return erts_mk_magic_binary_term(hpp, oh, &bin->binary);
+}
+
ERL_NIF_TERM enif_make_resource(ErlNifEnv* env, void* obj)
{
ErtsResource* resource = DATA_TO_RESOURCE(obj);
ErtsBinary* bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource);
Eterm* hp = alloc_heap(env,PROC_BIN_SIZE);
+ ASSERT(!resource->dbg_is_dying);
return erts_mk_magic_binary_term(&hp, &MSO(env->proc), &bin->binary);
}
@@ -2217,7 +2441,7 @@ int enif_get_resource(ErlNifEnv* env, ERL_NIF_TERM term, ErlNifResourceType* typ
}*/
mbin = pb->val;
resource = (ErtsResource*) ERTS_MAGIC_BIN_UNALIGNED_DATA(mbin);
- if (ERTS_MAGIC_BIN_DESTRUCTOR(mbin) != &nif_resource_dtor
+ if (ERTS_MAGIC_BIN_DESTRUCTOR(mbin) != NIF_RESOURCE_DTOR
|| resource->type != type) {
return 0;
}
@@ -2228,8 +2452,13 @@ int enif_get_resource(ErlNifEnv* env, ERL_NIF_TERM term, ErlNifResourceType* typ
size_t enif_sizeof_resource(void* obj)
{
ErtsResource* resource = DATA_TO_RESOURCE(obj);
- Binary* bin = &ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource)->binary;
- return ERTS_MAGIC_BIN_UNALIGNED_DATA_SIZE(bin) - offsetof(ErtsResource,data);
+ if (resource->monitors) {
+ return resource->monitors->user_data_sz;
+ }
+ else {
+ Binary* bin = &ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource)->binary;
+ return ERTS_MAGIC_BIN_UNALIGNED_DATA_SIZE(bin) - offsetof(ErtsResource,data);
+ }
}
@@ -2830,6 +3059,150 @@ int enif_map_iterator_get_pair(ErlNifEnv *env,
return 0;
}
+int enif_monitor_process(ErlNifEnv* env, void* obj, const ErlNifPid* target_pid,
+ ErlNifMonitor* monitor)
+{
+ int scheduler;
+ ErtsResource* rsrc = DATA_TO_RESOURCE(obj);
+ Process *rp;
+ Eterm tmp[REF_THING_SIZE];
+ Eterm ref;
+ int retval;
+
+ ASSERT(ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(rsrc)->magic_binary.destructor
+ == NIF_RESOURCE_DTOR);
+ ASSERT(!rsrc->dbg_is_dying);
+ ASSERT(!rsrc->monitors == !rsrc->type->down);
+
+
+ if (!rsrc->monitors) {
+ ASSERT(!rsrc->type->down);
+ return -1;
+ }
+ ASSERT(rsrc->type->down);
+
+ execution_state(env, NULL, &scheduler);
+
+#ifdef ERTS_SMP
+ if (scheduler > 0) /* Normal scheduler */
+ rp = erts_proc_lookup_raw(target_pid->pid);
+ else
+ rp = erts_proc_lookup_raw_inc_refc(target_pid->pid);
+#else
+ if (scheduler <= 0) {
+ erts_exit(ERTS_ABORT_EXIT, "enif_monitor_process: called from "
+ "non-scheduler thread on non-SMP VM");
+ }
+ rp = erts_proc_lookup(target_pid->pid);
+#endif
+
+ if (!rp)
+ return 1;
+
+ ref = erts_make_ref_in_buffer(tmp);
+
+ erts_smp_mtx_lock(&rsrc->monitors->lock);
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
+ if (ERTS_PSFLG_FREE & erts_smp_atomic32_read_nob(&rp->state)) {
+ retval = 1;
+ }
+ else {
+ erts_add_monitor(&rsrc->monitors->root, MON_ORIGIN, ref, rp->common.id, NIL);
+ erts_add_monitor(&ERTS_P_MONITORS(rp), MON_NIF_TARGET, ref, (UWord)rsrc, NIL);
+ retval = 0;
+ }
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ erts_smp_mtx_unlock(&rsrc->monitors->lock);
+
+#ifdef ERTS_SMP
+ if (scheduler <= 0)
+ erts_proc_dec_refc(rp);
+#endif
+ if (monitor)
+ erts_ref_to_driver_monitor(ref,monitor);
+
+ return retval;
+}
+
+int enif_demonitor_process(ErlNifEnv* env, void* obj, const ErlNifMonitor* monitor)
+{
+ int scheduler;
+ ErtsResource* rsrc = DATA_TO_RESOURCE(obj);
+ ErtsBinary* bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(rsrc);
+ Process *rp;
+ ErtsMonitor *mon;
+ ErtsMonitor *rmon = NULL;
+ Eterm ref_heap[REF_THING_SIZE];
+ Eterm ref;
+ int is_exiting;
+
+ ASSERT(bin->magic_binary.destructor == NIF_RESOURCE_DTOR);
+ ASSERT(!rsrc->dbg_is_dying);
+
+ execution_state(env, NULL, &scheduler);
+
+ memcpy(ref_heap, monitor, sizeof(Eterm)*REF_THING_SIZE);
+ ref = make_internal_ref(ref_heap);
+ erts_smp_mtx_lock(&rsrc->monitors->lock);
+ mon = erts_remove_monitor(&rsrc->monitors->root, ref);
+
+ if (mon == NULL) {
+ erts_smp_mtx_unlock(&rsrc->monitors->lock);
+ return 1;
+ }
+
+ ASSERT(mon->type == MON_ORIGIN);
+ ASSERT(is_internal_pid(mon->u.pid));
+
+#ifdef ERTS_SMP
+ if (scheduler > 0) /* Normal scheduler */
+ rp = erts_proc_lookup(mon->u.pid);
+ else
+ rp = erts_proc_lookup_inc_refc(mon->u.pid);
+#else
+ if (scheduler <= 0) {
+ erts_exit(ERTS_ABORT_EXIT, "enif_demonitor_process: called from "
+ "non-scheduler thread on non-SMP VM");
+ }
+ rp = erts_proc_lookup(mon->u.pid);
+#endif
+
+ if (!rp) {
+ is_exiting = 1;
+ }
+ else {
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
+ if (ERTS_PROC_IS_EXITING(rp)) {
+ is_exiting = 1;
+ } else {
+ rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref);
+ ASSERT(rmon);
+ is_exiting = 0;
+ }
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+
+#ifdef ERTS_SMP
+ if (scheduler <= 0)
+ erts_proc_dec_refc(rp);
+#endif
+ }
+ if (is_exiting) {
+ /* +1 for exiting process */
+ erts_refc_inc(&bin->binary.refc, 2);
+ }
+ erts_smp_mtx_unlock(&rsrc->monitors->lock);
+
+ if (rmon) {
+ ASSERT(rmon->type == MON_NIF_TARGET);
+ ASSERT(rmon->u.resource == rsrc);
+ erts_destroy_monitor(rmon);
+ }
+ erts_destroy_monitor(mon);
+
+ return 0;
+}
+
+
/***************************************************************************
** load_nif/2 **
***************************************************************************/
@@ -3303,7 +3676,8 @@ erts_unload_nif(struct erl_module_nif* lib)
void erl_nif_init()
{
- ERTS_CT_ASSERT((offsetof(ErtsResource,data) % 8) == ERTS_MAGIC_BIN_BYTES_TO_ALIGN);
+ ERTS_CT_ASSERT((offsetof(ErtsResource,data) % 8)
+ == ERTS_MAGIC_BIN_BYTES_TO_ALIGN);
resource_type_list.next = &resource_type_list;
resource_type_list.prev = &resource_type_list;