aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2016-11-23 15:58:15 +0100
committerSverker Eriksson <[email protected]>2016-11-30 20:26:47 +0100
commit0763a36867a702e3075b682973a079e0390144ce (patch)
tree2fffe305a52e44ed0df4b445a691d4a78380738e /erts
parent16b9292ff0914f77ee7ab7e169def914a190f79b (diff)
downloadotp-0763a36867a702e3075b682973a079e0390144ce.tar.gz
otp-0763a36867a702e3075b682973a079e0390144ce.tar.bz2
otp-0763a36867a702e3075b682973a079e0390144ce.zip
erts: Add enif_select & enif_open_resource_type_x
Diffstat (limited to 'erts')
-rw-r--r--erts/doc/src/erl_nif.xml84
-rw-r--r--erts/emulator/beam/erl_alloc.c2
-rw-r--r--erts/emulator/beam/erl_alloc.types1
-rw-r--r--erts/emulator/beam/erl_driver.h9
-rw-r--r--erts/emulator/beam/erl_drv_nif.h7
-rw-r--r--erts/emulator/beam/erl_nif.c90
-rw-r--r--erts/emulator/beam/erl_nif.h20
-rw-r--r--erts/emulator/beam/erl_nif_api_funcs.h4
-rw-r--r--erts/emulator/beam/global.h29
-rw-r--r--erts/emulator/sys/common/erl_check_io.c759
-rw-r--r--erts/emulator/sys/common/erl_check_io.h18
-rw-r--r--erts/emulator/sys/common/erl_poll.c1
-rw-r--r--erts/emulator/sys/unix/sys.c10
-rw-r--r--erts/emulator/test/nif_SUITE.erl43
-rw-r--r--erts/emulator/test/nif_SUITE_data/nif_SUITE.c190
15 files changed, 1159 insertions, 108 deletions
diff --git a/erts/doc/src/erl_nif.xml b/erts/doc/src/erl_nif.xml
index 906c1be17b..e7073a962f 100644
--- a/erts/doc/src/erl_nif.xml
+++ b/erts/doc/src/erl_nif.xml
@@ -699,12 +699,30 @@ typedef struct {
Each resource type has a unique name and a destructor function that
is called when objects of its type are released.</p>
</item>
+ <tag><marker id="ErlNifResourceTypeInit"/><c>ErlNifResourceTypeInit</c></tag>
+ <item>
+ <code type="none">
+typedef struct {
+ ErlNifResourceDtor* dtor;
+ ErlNifResourceStop* stop;
+} ErlNifResourceTypeInit;</code>
+ <p>Initialization structure read by <seealso marker="#enif_open_resource_type_x">
+ enif_open_resource_type_x</seealso>.</p>
+ </item>
<tag><marker id="ErlNifResourceDtor"/><c>ErlNifResourceDtor</c></tag>
<item>
<code type="none">
typedef void ErlNifResourceDtor(ErlNifEnv* env, void* obj);</code>
<p>The function prototype of a resource destructor function.</p>
</item>
+ <tag><marker id="ErlNifResourceStop"/><c>ErlNifResourceStop</c></tag>
+ <item>
+ <code type="none">
+typedef void ErlNifResourceStop(ErlNifEnv* env, void* obj);</code>
+ <p>The function prototype of a resource stop function,
+ called on the behalf of <seealso marker="#enif_select">
+ enif_select</seealso>.</p>
+ </item>
<tag><marker id="ErlNifCharEncoding"/><c>ErlNifCharEncoding</c></tag>
<item>
<code type="none">
@@ -2239,6 +2257,24 @@ enif_map_iterator_destroy(env, &amp;iter);</code>
</func>
<func>
+ <name><ret>ErlNifResourceType *</ret>
+ <nametext>enif_open_resource_type_x(ErlNifEnv* env, const char* name,
+ ErlNifResourceTypeInit* init,
+ ErlNifResourceFlags flags, ErlNifResourceFlags* tried)</nametext>
+ </name>
+ <fsummary>Create or takeover a resource type.</fsummary>
+ <desc>
+ <p>Same as <seealso marker="#enif_open_resource_type"><c>enif_open_resource_type</c></seealso>
+ except is also accept a <c>stop</c> callback for resource types that are
+ used together with <seealso marker="#enif_select"><c>enif_select</c></seealso>.</p>
+ <p>Argument <c>init</c> is a pointer to an
+ <seealso marker="#ErlNifResourceTypeInit"><c>ErlNifResourceTypeInit</c></seealso>
+ structure that contains the function pointers for the destructor and the stop callback
+ of the resource type.</p>
+ </desc>
+ </func>
+
+ <func>
<name><ret>int</ret><nametext>enif_port_command(ErlNifEnv* env, const
ErlNifPort* to_port, ErlNifEnv *msg_env, ERL_NIF_TERM msg)</nametext>
</name>
@@ -2345,7 +2381,7 @@ enif_map_iterator_destroy(env, &amp;iter);</code>
<nametext>enif_release_resource(void* obj)</nametext></name>
<fsummary>Release a resource object.</fsummary>
<desc>
- <p>Removes a reference to resource object <c>obj</c>obtained from
+ <p>Removes a reference to resource object <c>obj</c> obtained from
<seealso marker="#enif_alloc_resource">
<c>enif_alloc_resource</c></seealso>.
The resource object is destructed when the last reference is removed.
@@ -2488,6 +2524,52 @@ enif_map_iterator_destroy(env, &amp;iter);</code>
</func>
<func>
+ <name><ret>int</ret>
+ <nametext>enif_select(ErlNifEnv* env, ErlNifEvent event,
+ enum ErlNifSelectFlags mode, void* obj, Eterm ref)</nametext>
+ </name>
+ <fsummary>Manage subscription on IO event.</fsummary>
+ <desc>
+ <p>This function can be used to receive asynchronous notifications
+ when OS-specific event objects become ready for either read or write operations.</p>
+ <p>Argument <c>event</c> identifies the event object. On Unix
+ systems, the functions <c>select</c>/<c>poll</c> are used. The event
+ object must be a socket, pipe or other file descriptor object that
+ <c>select</c>/<c>poll</c> can use.</p>
+ <p>Argument <c>mode</c> describes the type of events to wait for. It can be
+ <c>ERL_NIF_SELECT_READ</c>, <c>ERL_NIF_SELECT_WRITE</c> or a bitwise
+ OR combination to wait for both. It can also be <c>ERL_NIF_SELECT_STOP</c>
+ which is described further below. When a read or write event is triggerred,
+ a notification message like this is sent to the Erlang process that called
+ <c>enif_select</c>:</p>
+ <code type="none"><c>{select, Obj, Ref, ready_input | ready_output}</c></code>
+ <p><c>ready_input</c> or <c>ready_output</c> indicates if the event object
+ is ready for reading or writing.</p>
+ <p>Argument <c>obj</c> is a resource object obtained from
+ <seealso marker="#enif_alloc_resource"><c>enif_alloc_resource</c></seealso>.
+ The purpose of the resource objects is as a container of the event object
+ to manage its state and lifetime. A handle to the resource is received
+ in the notification message as <c>Obj</c>.</p>
+ <p>Argument <c>ref</c> must be either a reference obtained from
+ <seealso marker="erlang#make_ref-0"><c>erlang:make_ref/0</c></seealso>
+ or the atom <c>undefined</c>. It will be passed as <c>Ref</c> in the notifications.
+ If a selective <c>receive</c> statement is used to wait for the notification
+ then a reference created just before the <c>receive</c> will exploit a runtime
+ optimization that bypasses all earlier received messages in the queue.</p>
+ <p>The notifications are one-shot only. To receive further notifications of the same
+ type (read or write), repeated calls to <c>enif_select</c> must be made.</p>
+ <p>Use <c>ERL_NIF_SELECT_STOP</c> as <c>mode</c> in order to safely
+ close an event object that has been passed to <c>enif_select</c>. The
+ <seealso marker="#ErlNifResourceStop"><c>stop</c></seealso> callback
+ of the resource <c>obj</c> will be called when it is safe to close
+ the event object. This safe way of closing event objects must be used
+ even if all notifications have been received and no further calls to
+ <c>enif_select</c> have been made.</p>
+ <p>Returns 0 on success, or -1 if invalid arguments.</p>
+ </desc>
+ </func>
+
+ <func>
<name><ret>ErlNifPid *</ret>
<nametext>enif_self(ErlNifEnv* caller_env, ErlNifPid* pid)</nametext>
</name>
diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c
index 40a45c961f..3ddf7a53e2 100644
--- a/erts/emulator/beam/erl_alloc.c
+++ b/erts/emulator/beam/erl_alloc.c
@@ -663,6 +663,8 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop)
= sizeof(ErtsDrvEventDataState);
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_DRV_SEL_D_STATE)]
= sizeof(ErtsDrvSelectDataState);
+ fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_NIF_SEL_D_STATE)]
+ = sizeof(ErtsNifSelectDataState);
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_MSG_REF)]
= sizeof(ErtsMessageRef);
#ifdef ERTS_SMP
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 6e8710eb8a..7ea8c98008 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -396,6 +396,7 @@ type DRV_TAB LONG_LIVED SYSTEM drv_tab
type DRV_EV_STATE LONG_LIVED SYSTEM driver_event_state
type DRV_EV_D_STATE FIXED_SIZE SYSTEM driver_event_data_state
type DRV_SEL_D_STATE FIXED_SIZE SYSTEM driver_select_data_state
+type NIF_SEL_D_STATE FIXED_SIZE SYSTEM enif_select_data_state
type FD_LIST SHORT_LIVED SYSTEM fd_list
type ACTIVE_FD_ARR SHORT_LIVED SYSTEM active_fd_array
type POLLSET LONG_LIVED SYSTEM pollset
diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h
index 97a69140c3..5bea92e198 100644
--- a/erts/emulator/beam/erl_driver.h
+++ b/erts/emulator/beam/erl_driver.h
@@ -76,11 +76,10 @@ typedef struct {
# endif
#endif
-/* Values for mode arg to driver_select() */
-#define ERL_DRV_READ (1 << 0)
-#define ERL_DRV_WRITE (1 << 1)
-#define ERL_DRV_USE (1 << 2)
-#define ERL_DRV_USE_NO_CALLBACK (ERL_DRV_USE | (1 << 3))
+#define ERL_DRV_READ ((int)ERL_NIF_SELECT_READ)
+#define ERL_DRV_WRITE ((int)ERL_NIF_SELECT_WRITE)
+#define ERL_DRV_USE ((int)ERL_NIF_SELECT_STOP)
+#define ERL_DRV_USE_NO_CALLBACK (ERL_DRV_USE | (ERL_DRV_USE << 1))
/* Old deprecated */
#define DO_READ ERL_DRV_READ
diff --git a/erts/emulator/beam/erl_drv_nif.h b/erts/emulator/beam/erl_drv_nif.h
index 2489099b5c..e4ebcdb1d4 100644
--- a/erts/emulator/beam/erl_drv_nif.h
+++ b/erts/emulator/beam/erl_drv_nif.h
@@ -49,6 +49,13 @@ typedef enum {
ERL_DIRTY_JOB_IO_BOUND = 2
} ErlDirtyJobFlags;
+/* Values for enif_select AND mode arg for driver_select() */
+enum ErlNifSelectFlags {
+ ERL_NIF_SELECT_READ = (1 << 0),
+ ERL_NIF_SELECT_WRITE = (1 << 1),
+ ERL_NIF_SELECT_STOP = (1 << 2)
+};
+
#ifdef SIZEOF_CHAR
# define SIZEOF_CHAR_SAVED__ SIZEOF_CHAR
# undef SIZEOF_CHAR
diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c
index 202f713f67..27abba7cfd 100644
--- a/erts/emulator/beam/erl_nif.c
+++ b/erts/emulator/beam/erl_nif.c
@@ -1904,37 +1904,10 @@ int enif_snprintf(char *buffer, size_t size, const char* format, ...)
** Memory managed (GC'ed) "resource" objects **
***********************************************************/
-
-struct enif_resource_type_t
-{
- struct enif_resource_type_t* next; /* list of all resource types */
- struct enif_resource_type_t* prev;
- struct erl_module_nif* owner; /* that created this type and thus implements the destructor*/
- ErlNifResourceDtor* dtor; /* user destructor function */
- erts_refc_t refc; /* num of resources of this type (HOTSPOT warning)
- +1 for active erl_module_nif */
- Eterm module;
- Eterm name;
-};
-
/* dummy node in circular list */
struct enif_resource_type_t resource_type_list;
-typedef struct enif_resource_t
-{
- struct enif_resource_type_t* type;
-#ifdef DEBUG
- erts_refc_t nif_refc;
-# ifdef ARCH_32
- byte align__[4];
-# endif
-#endif
-
- char data[1];
-}ErlNifResource;
-
#define SIZEOF_ErlNifResource(SIZE) (offsetof(ErlNifResource,data) + (SIZE))
-#define DATA_TO_RESOURCE(PTR) ((ErlNifResource*)((char*)(PTR) - offsetof(ErlNifResource,data)))
static ErlNifResourceType* find_resource_type(Eterm module, Eterm name)
{
@@ -1997,24 +1970,23 @@ struct opened_resource_type
ErlNifResourceFlags op;
ErlNifResourceType* type;
- ErlNifResourceDtor* new_dtor;
+ ErlNifResourceTypeInit new_callbacks;
};
static struct opened_resource_type* opened_rt_list = NULL;
-ErlNifResourceType*
-enif_open_resource_type(ErlNifEnv* env,
- const char* module_str,
- const char* name_str,
- ErlNifResourceDtor* dtor,
- ErlNifResourceFlags flags,
- ErlNifResourceFlags* tried)
+static
+ErlNifResourceType* open_resource_type(ErlNifEnv* env,
+ const char* name_str,
+ const ErlNifResourceTypeInit* init,
+ ErlNifResourceFlags flags,
+ ErlNifResourceFlags* tried,
+ size_t sizeof_init)
{
ErlNifResourceType* type = NULL;
ErlNifResourceFlags op = flags;
Eterm module_am, name_am;
ASSERT(erts_smp_thr_progress_is_blocking());
- ASSERT(module_str == NULL); /* for now... */
module_am = make_atom(env->mod_nif->mod->module);
name_am = enif_make_atom(env, name_str);
@@ -2048,7 +2020,9 @@ enif_open_resource_type(ErlNifEnv* env,
sizeof(struct opened_resource_type));
ort->op = op;
ort->type = type;
- ort->new_dtor = dtor;
+ sys_memzero(&ort->new_callbacks, sizeof(ErlNifResourceTypeInit));
+ ASSERT(sizeof_init > 0 && sizeof_init <= sizeof(ErlNifResourceTypeInit));
+ sys_memcpy(&ort->new_callbacks, init, sizeof_init);
ort->next = opened_rt_list;
opened_rt_list = ort;
}
@@ -2058,6 +2032,31 @@ enif_open_resource_type(ErlNifEnv* env,
return type;
}
+ErlNifResourceType*
+enif_open_resource_type(ErlNifEnv* env,
+ const char* module_str,
+ const char* name_str,
+ ErlNifResourceDtor* dtor,
+ ErlNifResourceFlags flags,
+ ErlNifResourceFlags* tried)
+{
+ ErlNifResourceTypeInit init = {dtor, NULL};
+ ASSERT(module_str == NULL); /* for now... */
+ return open_resource_type(env, name_str, &init, flags, tried,
+ sizeof(init));
+}
+
+ErlNifResourceType*
+enif_open_resource_type_x(ErlNifEnv* env,
+ const char* name_str,
+ const ErlNifResourceTypeInit* init,
+ ErlNifResourceFlags flags,
+ ErlNifResourceFlags* tried)
+{
+ return open_resource_type(env, name_str, init, flags, tried,
+ env->mod_nif->entry.sizeof_ErlNifResourceTypeInit);
+}
+
static void commit_opened_resource_types(struct erl_module_nif* lib)
{
while (opened_rt_list) {
@@ -2076,7 +2075,8 @@ static void commit_opened_resource_types(struct erl_module_nif* lib)
}
type->owner = lib;
- type->dtor = ort->new_dtor;
+ type->dtor = ort->new_callbacks.dtor;
+ type->stop = ort->new_callbacks.stop;
if (type->dtor != NULL) {
erts_refc_inc(&lib->rt_dtor_cnt, 1);
@@ -2124,6 +2124,15 @@ static void nif_resource_dtor(Binary* bin)
}
}
+void erts_resource_stop(ErlNifResource* resource)
+{
+ struct enif_msg_environment_t msg_env;
+ ASSERT(resource->type->stop);
+ pre_nif_noproc(&msg_env, resource->type->owner, NULL);
+ resource->type->stop(&msg_env.env, resource->data);
+ post_nif_noproc(&msg_env);
+}
+
void* enif_alloc_resource(ErlNifResourceType* type, size_t size)
{
Binary* bin = erts_create_magic_binary_x(SIZEOF_ErlNifResource(size),
@@ -3174,6 +3183,11 @@ static struct erl_module_nif* create_lib(const ErlNifEntry* src)
dst->funcs = lib->_funcs_copy_;
dst->options = 0;
}
+ if (AT_LEAST_VERSION(src, 2, 12)) {
+ dst->sizeof_ErlNifResourceTypeInit = src->sizeof_ErlNifResourceTypeInit;
+ } else {
+ dst->sizeof_ErlNifResourceTypeInit = 0;
+ }
return lib;
};
diff --git a/erts/emulator/beam/erl_nif.h b/erts/emulator/beam/erl_nif.h
index 413b4f7343..ce8caaf729 100644
--- a/erts/emulator/beam/erl_nif.h
+++ b/erts/emulator/beam/erl_nif.h
@@ -52,7 +52,7 @@
** 2.11: 19.0 enif_snprintf
*/
#define ERL_NIF_MAJOR_VERSION 2
-#define ERL_NIF_MINOR_VERSION 11
+#define ERL_NIF_MINOR_VERSION 12
/*
* The emulator will refuse to load a nif-lib with a major version
@@ -122,6 +122,9 @@ typedef struct enif_entry_t
/* Added in 2.7 */
unsigned options; /* Unused. Can be set to 0 or 1 (dirty sched config) */
+
+ /* Added in 2.12 */
+ size_t sizeof_ErlNifResourceTypeInit;
}ErlNifEntry;
@@ -135,8 +138,20 @@ typedef struct
void* ref_bin;
}ErlNifBinary;
+typedef struct {
+ void (*dtor)(ErlNifEnv* env, void* obj);
+ void (*stop)(ErlNifEnv* env, void* obj); /* at ERL_NIF_SELECT_STOP event */
+} ErlNifResourceTypeInit;
+
typedef struct enif_resource_type_t ErlNifResourceType;
typedef void ErlNifResourceDtor(ErlNifEnv*, void*);
+typedef void ErlNifResourceStop(ErlNifEnv*, void*);
+typedef void ErlNifResourceExit(ErlNifEnv*, void*);
+
+//#ifndef ERL_SYS_DRV
+typedef int ErlNifEvent; /* An event to be selected on. */
+//#endif
+
typedef enum
{
ERL_NIF_RT_CREATE = 1,
@@ -292,7 +307,8 @@ ERL_NIF_INIT_DECL(NAME) \
FUNCS, \
LOAD, RELOAD, UPGRADE, UNLOAD, \
ERL_NIF_VM_VARIANT, \
- 1 \
+ 1, \
+ sizeof(ErlNifResourceTypeInit) \
}; \
ERL_NIF_INIT_BODY; \
return &entry; \
diff --git a/erts/emulator/beam/erl_nif_api_funcs.h b/erts/emulator/beam/erl_nif_api_funcs.h
index 9a8f216773..9163ce25eb 100644
--- a/erts/emulator/beam/erl_nif_api_funcs.h
+++ b/erts/emulator/beam/erl_nif_api_funcs.h
@@ -175,6 +175,8 @@ ERL_NIF_API_FUNC_DECL(size_t, enif_binary_to_term, (ErlNifEnv *env, const unsign
ERL_NIF_API_FUNC_DECL(int, enif_port_command, (ErlNifEnv *env, const ErlNifPort* to_port, ErlNifEnv *msg_env, ERL_NIF_TERM msg));
ERL_NIF_API_FUNC_DECL(int,enif_thread_type,(void));
ERL_NIF_API_FUNC_DECL(int,enif_snprintf,(char * buffer, size_t size, const char *format, ...));
+ERL_NIF_API_FUNC_DECL(int,enif_select,(ErlNifEnv* env, ErlNifEvent e, enum ErlNifSelectFlags flags, void* obj, ERL_NIF_TERM ref));
+ERL_NIF_API_FUNC_DECL(ErlNifResourceType*,enif_open_resource_type_x,(ErlNifEnv*, const char* name_str, const ErlNifResourceTypeInit*, ErlNifResourceFlags flags, ErlNifResourceFlags* tried));
/*
** ADD NEW ENTRIES HERE (before this comment) !!!
@@ -332,6 +334,8 @@ ERL_NIF_API_FUNC_DECL(int,enif_snprintf,(char * buffer, size_t size, const char
# define enif_port_command ERL_NIF_API_FUNC_MACRO(enif_port_command)
# define enif_thread_type ERL_NIF_API_FUNC_MACRO(enif_thread_type)
# define enif_snprintf ERL_NIF_API_FUNC_MACRO(enif_snprintf)
+# define enif_select ERL_NIF_API_FUNC_MACRO(enif_select)
+# define enif_open_resource_type_x ERL_NIF_API_FUNC_MACRO(enif_open_resource_type_x)
/*
** ADD NEW ENTRIES HERE (before this comment)
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index 2b2f3c5cdc..5c5693a315 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -42,6 +42,7 @@
#include "erl_utils.h"
#include "erl_port.h"
#include "erl_gc.h"
+#include "erl_nif.h"
struct enif_func_t;
@@ -58,6 +59,33 @@ struct enif_environment_t /* ErlNifEnv */
Process *tracee;
int exiting; /* boolean (dirty nifs might return in exiting state) */
};
+struct enif_resource_type_t
+{
+ struct enif_resource_type_t* next; /* list of all resource types */
+ struct enif_resource_type_t* prev;
+ struct erl_module_nif* owner; /* that created this type and thus implements the destructor*/
+ ErlNifResourceDtor* dtor; /* user destructor function */
+ ErlNifResourceStop* stop;
+ erts_refc_t refc; /* num of resources of this type (HOTSPOT warning)
+ +1 for active erl_module_nif */
+ Eterm module;
+ Eterm name;
+};
+typedef struct enif_resource_t
+{
+ struct enif_resource_type_t* type;
+#ifdef DEBUG
+ erts_refc_t nif_refc;
+# ifdef ARCH_32
+ byte align__[4];
+# endif
+#endif
+
+ char data[1];
+}ErlNifResource;
+
+#define DATA_TO_RESOURCE(PTR) ((ErlNifResource*)((char*)(PTR) - offsetof(ErlNifResource,data)))
+
extern void erts_pre_nif(struct enif_environment_t*, Process*,
struct erl_module_nif*, Process* tracee);
extern void erts_post_nif(struct enif_environment_t* env);
@@ -67,6 +95,7 @@ extern void erts_pre_dirty_nif(ErtsSchedulerData *,
struct erl_module_nif*);
extern void erts_post_dirty_nif(struct enif_environment_t* env);
#endif
+extern void erts_resource_stop(ErlNifResource* resource);
extern Eterm erts_nif_taints(Process* p);
extern void erts_print_nif_taints(fmtfn_t to, void* to_arg);
void erts_unload_nif(struct erl_module_nif* nif);
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c
index 2c138e1746..6f61fc8a28 100644
--- a/erts/emulator/sys/common/erl_check_io.c
+++ b/erts/emulator/sys/common/erl_check_io.c
@@ -53,6 +53,8 @@ typedef char EventStateType;
#define ERTS_EV_TYPE_DRV_SEL ((EventStateType) 1) /* driver_select */
#define ERTS_EV_TYPE_DRV_EV ((EventStateType) 2) /* driver_event */
#define ERTS_EV_TYPE_STOP_USE ((EventStateType) 3) /* pending stop_select */
+#define ERTS_EV_TYPE_NIF ((EventStateType) 4) /* enif_select */
+#define ERTS_EV_TYPE_STOP_NIF ((EventStateType) 5) /* pending nif stop */
typedef char EventStateFlags;
#define ERTS_EV_FLAG_USED ((EventStateFlags) 1) /* ERL_DRV_USE has been turned on */
@@ -122,7 +124,11 @@ typedef struct {
#if ERTS_CIO_HAVE_DRV_EVENT
ErtsDrvEventDataState *event; /* ERTS_EV_TYPE_DRV_EV */
#endif
- erts_driver_t* drv_ptr; /* ERTS_EV_TYPE_STOP_USE */
+ ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */
+ union {
+ erts_driver_t* drv_ptr; /* ERTS_EV_TYPE_STOP_USE */
+ ErlNifResource* resource; /* ERTS_EV_TYPE_STOP_NIF */
+ }stop;
} driver;
ErtsPollEvents events;
unsigned short remove_cnt; /* number of removed_fd's referring to this fd */
@@ -212,11 +218,18 @@ static ERTS_INLINE void hash_erase_drv_ev_state(ErtsDrvEventState *state)
static void stale_drv_select(Eterm id, ErtsDrvEventState *state, int mode);
static void drv_select_steal(ErlDrvPort ix, ErtsDrvEventState *state,
- int mode, int on);
+ int mode, int on);
+static void nif_select_steal(ErtsDrvEventState *state, int mode,
+ ErlNifResource* resource, Eterm ref);
+
static void print_drv_select_op(erts_dsprintf_buf_t *dsbufp,
ErlDrvPort ix, ErtsSysFdType fd, int mode, int on);
+static void print_nif_select_op(erts_dsprintf_buf_t*, ErtsSysFdType,
+ int mode, ErlNifResource*, Eterm ref);
+
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
-static void select_large_fd_error(ErlDrvPort, ErtsSysFdType, int, int);
+static void drv_select_large_fd_error(ErlDrvPort, ErtsSysFdType, int, int);
+static void nif_select_large_fd_error(ErtsSysFdType, int, ErlNifResource*,Eterm ref);
#endif
#if ERTS_CIO_HAVE_DRV_EVENT
static void drv_event_steal(ErlDrvPort ix, ErtsDrvEventState *state,
@@ -227,8 +240,12 @@ static void print_drv_event_op(erts_dsprintf_buf_t *dsbufp,
static void event_large_fd_error(ErlDrvPort, ErtsSysFdType, ErlDrvEventData);
#endif
#endif
-static void steal_pending_stop_use(erts_dsprintf_buf_t*, ErlDrvPort,
- ErtsDrvEventState*, int mode, int on);
+static void
+steal_pending_stop_use(erts_dsprintf_buf_t*, ErlDrvPort, ErtsDrvEventState*,
+ int mode, int on);
+static void
+steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErlNifResource*,
+ ErtsDrvEventState *state, int mode, int on);
#ifdef ERTS_SMP
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST)
@@ -263,6 +280,18 @@ alloc_drv_select_data(void)
return dsp;
}
+static ERTS_INLINE ErtsNifSelectDataState *
+alloc_nif_select_data(void)
+{
+ ErtsNifSelectDataState *dsp = erts_alloc(ERTS_ALC_T_NIF_SEL_D_STATE,
+ sizeof(ErtsNifSelectDataState));
+ dsp->in.pid = NIL;
+ dsp->out.pid = NIL;
+ dsp->in.ddeselect_cnt = 0;
+ dsp->out.ddeselect_cnt = 0;
+ return dsp;
+}
+
static ERTS_INLINE void
free_drv_select_data(ErtsDrvSelectDataState *dsp)
{
@@ -271,6 +300,12 @@ free_drv_select_data(ErtsDrvSelectDataState *dsp)
erts_free(ERTS_ALC_T_DRV_SEL_D_STATE, dsp);
}
+static ERTS_INLINE void
+free_nif_select_data(ErtsNifSelectDataState *dsp)
+{
+ erts_free(ERTS_ALC_T_NIF_SEL_D_STATE, dsp);
+}
+
#if ERTS_CIO_HAVE_DRV_EVENT
static ERTS_INLINE ErtsDrvEventDataState *
@@ -352,6 +387,7 @@ forget_removed(struct pollset_info* psi)
erts_smp_spin_unlock(&psi->removed_list_lock);
while (fdlp) {
+ ErlNifResource* resource = NULL;
erts_driver_t* drv_ptr = NULL;
erts_smp_mtx_t* mtx;
ErtsSysFdType fd;
@@ -372,15 +408,25 @@ forget_removed(struct pollset_info* psi)
ASSERT(state->remove_cnt > 0);
if (--state->remove_cnt == 0) {
switch (state->type) {
+ case ERTS_EV_TYPE_STOP_NIF:
+ /* Now we can call stop */
+ resource = state->driver.stop.resource;
+ state->driver.stop.resource = NULL;
+ ASSERT(resource);
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ goto case_ERTS_EV_TYPE_NONE;
+
case ERTS_EV_TYPE_STOP_USE:
/* Now we can call stop_select */
- drv_ptr = state->driver.drv_ptr;
+ drv_ptr = state->driver.stop.drv_ptr;
ASSERT(drv_ptr);
state->type = ERTS_EV_TYPE_NONE;
state->flags &= ~ERTS_EV_FLAG_USED;
- state->driver.drv_ptr = NULL;
+ state->driver.stop.drv_ptr = NULL;
/* Fall through */
- case ERTS_EV_TYPE_NONE:
+ case ERTS_EV_TYPE_NONE:
+ case_ERTS_EV_TYPE_NONE:
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
hash_erase_drv_ev_state(state);
#endif
@@ -403,6 +449,11 @@ forget_removed(struct pollset_info* psi)
erts_ddll_dereference_driver(drv_ptr->handle);
}
}
+ if (resource) {
+ erts_resource_stop(resource);
+ enif_release_resource(resource->data);
+ }
+
tofree = fdlp;
fdlp = fdlp->next;
removed_fd_free(tofree);
@@ -440,7 +491,8 @@ grow_drv_ev_state(int min_ix)
#if ERTS_CIO_HAVE_DRV_EVENT
drv_ev_state[i].driver.event = NULL;
#endif
- drv_ev_state[i].driver.drv_ptr = NULL;
+ drv_ev_state[i].driver.stop.drv_ptr = NULL;
+ drv_ev_state[i].driver.nif = NULL;
drv_ev_state[i].events = 0;
drv_ev_state[i].remove_cnt = 0;
drv_ev_state[i].type = ERTS_EV_TYPE_NONE;
@@ -480,6 +532,7 @@ abort_tasks(ErtsDrvEventState *state, int mode)
ERTS_EV_TYPE_DRV_EV);
return;
#endif
+ case ERTS_EV_TYPE_NIF:
case ERTS_EV_TYPE_NONE:
return;
default:
@@ -534,6 +587,14 @@ deselect(ErtsDrvEventState *state, int mode)
if (!(state->events)) {
switch (state->type) {
+ case ERTS_EV_TYPE_NIF:
+ state->driver.nif->in.pid = NIL;
+ state->driver.nif->out.pid = NIL;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ enif_release_resource(state->driver.stop.resource);
+ state->driver.stop.resource = NULL;
+ break;
case ERTS_EV_TYPE_DRV_SEL:
state->driver.select->inport = NIL;
state->driver.select->outport = NIL;
@@ -569,7 +630,8 @@ check_fd_cleanup(ErtsDrvEventState *state,
#if ERTS_CIO_HAVE_DRV_EVENT
ErtsDrvEventDataState **free_event,
#endif
- ErtsDrvSelectDataState **free_select)
+ ErtsDrvSelectDataState **free_select,
+ ErtsNifSelectDataState **free_nif)
{
erts_aint_t current_cio_time;
@@ -586,6 +648,12 @@ check_fd_cleanup(ErtsDrvEventState *state,
state->driver.select = NULL;
}
+ *free_nif = NULL;
+ if (state->driver.nif && (state->type != ERTS_EV_TYPE_NIF)) {
+ *free_nif = state->driver.nif;
+ state->driver.nif = NULL;
+ }
+
#if ERTS_CIO_HAVE_DRV_EVENT
*free_event = NULL;
if (state->driver.event
@@ -617,12 +685,14 @@ check_cleanup_active_fd(ErtsSysFdType fd,
ErtsPollControlEntry *pce,
int *pce_ix,
#endif
- erts_aint_t current_cio_time)
+ erts_aint_t current_cio_time,
+ int may_sleep)
{
ErtsDrvEventState *state;
int active = 0;
erts_smp_mtx_t *mtx = fd_mtx(fd);
void *free_select = NULL;
+ void *free_nif = NULL;
#if ERTS_CIO_HAVE_DRV_EVENT
void *free_event = NULL;
#endif
@@ -682,6 +752,41 @@ check_cleanup_active_fd(ErtsSysFdType fd,
}
}
+ if (state->driver.nif) {
+#if ERTS_CIO_DEFER_ACTIVE_EVENTS
+# error Windows
+#endif
+ int do_wake = 0;
+ ErtsPollEvents rm_events = 0;
+ if (state->driver.nif->in.ddeselect_cnt) {
+ ASSERT(state->type == ERTS_EV_TYPE_NIF);
+ ASSERT(state->events & ERTS_POLL_EV_IN);
+ ASSERT(is_nil(state->driver.nif->in.pid));
+ if (may_sleep || state->driver.nif->in.ddeselect_cnt == 1) {
+ rm_events = ERTS_POLL_EV_IN;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ }
+ }
+ if (state->driver.nif->out.ddeselect_cnt) {
+ ASSERT(state->type == ERTS_EV_TYPE_NIF);
+ ASSERT(state->events & ERTS_POLL_EV_OUT);
+ ASSERT(is_nil(state->driver.nif->out.pid));
+ if (may_sleep || state->driver.nif->out.ddeselect_cnt == 1) {
+ rm_events |= ERTS_POLL_EV_OUT;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ }
+ }
+ if (rm_events) {
+ state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, rm_events, 0, &do_wake);
+ }
+ if (state->events)
+ active = 1;
+ else if (state->type != ERTS_EV_TYPE_NIF) {
+ free_nif = state->driver.nif;
+ state->driver.nif = NULL;
+ }
+ }
+
#if ERTS_CIO_HAVE_DRV_EVENT
if (state->driver.event) {
if (is_iotask_active(&state->driver.event->iotask, current_cio_time)) {
@@ -722,6 +827,8 @@ check_cleanup_active_fd(ErtsSysFdType fd,
if (free_select)
free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
#if ERTS_CIO_HAVE_DRV_EVENT
if (free_event)
free_drv_event_data(free_event);
@@ -746,7 +853,7 @@ check_cleanup_active_fd(ErtsSysFdType fd,
}
static void
-check_cleanup_active_fds(erts_aint_t current_cio_time)
+check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
{
int six = pollset.active_fd.six;
int eix = pollset.active_fd.eix;
@@ -773,7 +880,8 @@ check_cleanup_active_fds(erts_aint_t current_cio_time)
pctrl_entries,
&pctrl_ix,
#endif
- current_cio_time)) {
+ current_cio_time,
+ may_sleep)) {
no--;
if (ix == six) {
#ifdef DEBUG
@@ -831,7 +939,6 @@ add_active_fd(ErtsSysFdType fd)
int eix = pollset.active_fd.eix;
int size = pollset.active_fd.size;
-
pollset.active_fd.array[eix] = fd;
erts_smp_atomic32_set_relb(&pollset.active_fd.no,
@@ -867,6 +974,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
ErtsDrvEventDataState *free_event = NULL;
#endif
ErtsDrvSelectDataState *free_select = NULL;
+ ErtsNifSelectDataState *free_nif = NULL;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(name, 64);
#endif
@@ -882,7 +990,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
return -1;
}
if (fd >= max_fds) {
- select_large_fd_error(ix, fd, mode, on);
+ drv_select_large_fd_error(ix, fd, mode, on);
return -1;
}
grow_drv_ev_state(fd);
@@ -920,20 +1028,32 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
}
#endif
+ switch (state->type) {
#if ERTS_CIO_HAVE_DRV_EVENT
- if (state->type == ERTS_EV_TYPE_DRV_EV)
- drv_select_steal(ix, state, mode, on);
+ case ERTS_EV_TYPE_DRV_EV:
#endif
- if (state->type == ERTS_EV_TYPE_STOP_USE) {
- erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- print_drv_select_op(dsbufp, ix, state->fd, mode, on);
- steal_pending_stop_use(dsbufp, ix, state, mode, on);
- if (state->type == ERTS_EV_TYPE_STOP_USE) {
- ret = 0;
- goto done; /* stop_select still pending */
- }
- ASSERT(state->type == ERTS_EV_TYPE_NONE);
- }
+ case ERTS_EV_TYPE_NIF:
+ drv_select_steal(ix, state, mode, on);
+ break;
+ case ERTS_EV_TYPE_STOP_USE: {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_drv_select_op(dsbufp, ix, state->fd, mode, on);
+ steal_pending_stop_use(dsbufp, ix, state, mode, on);
+ if (state->type == ERTS_EV_TYPE_STOP_USE) {
+ ret = 0;
+ goto done; /* stop_select still pending */
+ }
+ ASSERT(state->type == ERTS_EV_TYPE_NONE);
+ break;
+ }
+ case ERTS_EV_TYPE_STOP_NIF: {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_drv_select_op(dsbufp, ix, state->fd, mode, on);
+ steal_pending_stop_nif(dsbufp, NULL, state, mode, on);
+ ASSERT(state->type == ERTS_EV_TYPE_NONE);
+ break;
+
+ }}
if (mode & ERL_DRV_READ) {
if (state->type == ERTS_EV_TYPE_DRV_SEL) {
@@ -1037,7 +1157,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
else {
/* Not safe to close fd, postpone stop_select callback. */
state->type = ERTS_EV_TYPE_STOP_USE;
- state->driver.drv_ptr = drv_ptr;
+ state->driver.stop.drv_ptr = drv_ptr;
if (drv_ptr->handle) {
erts_ddll_reference_referenced_driver(drv_ptr->handle);
}
@@ -1054,7 +1174,8 @@ done:
#if ERTS_CIO_HAVE_DRV_EVENT
&free_event,
#endif
- &free_select);
+ &free_select,
+ &free_nif);
done_unknown:
erts_smp_mtx_unlock(fd_mtx(fd));
@@ -1067,6 +1188,9 @@ done_unknown:
}
if (free_select)
free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
+
#if ERTS_CIO_HAVE_DRV_EVENT
if (free_event)
free_drv_event_data(free_event);
@@ -1075,6 +1199,269 @@ done_unknown:
}
int
+ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
+ ErlNifEvent e,
+ enum ErlNifSelectFlags mode,
+ void* obj,
+ Eterm ref)
+{
+ int on;
+ const Eterm id = env->proc->common.id;
+ ErlNifResource* resource = DATA_TO_RESOURCE(obj);
+ ErtsSysFdType fd = (ErtsSysFdType) e;
+ ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
+ ErtsPollEvents new_events, old_events;
+ ErtsDrvEventState *state;
+ int wake_poller;
+ int ret;
+ enum { NO_STOP=0, CALL_STOP, CALL_STOP_AND_RELEASE } call_stop = NO_STOP;
+#if ERTS_CIO_HAVE_DRV_EVENT
+ ErtsDrvEventDataState *free_event = NULL;
+#endif
+ ErtsDrvSelectDataState *free_select = NULL;
+ ErtsNifSelectDataState *free_nif = NULL;
+#ifdef USE_VM_PROBES
+ DTRACE_CHARBUF(name, 64);
+#endif
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ if ((unsigned)fd >= (unsigned)erts_smp_atomic_read_nob(&drv_ev_state_len)) {
+ if (fd < 0) {
+ return -1;
+ }
+ if (fd >= max_fds) {
+ nif_select_large_fd_error(fd, mode, resource, ref);
+ return -1;
+ }
+ grow_drv_ev_state(fd);
+ }
+#endif
+
+ erts_smp_mtx_lock(fd_mtx(fd));
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ state = &drv_ev_state[(int) fd];
+#else
+ state = hash_get_drv_ev_state(fd); /* may be NULL! */
+#endif
+
+ if (mode & ERL_NIF_SELECT_STOP) {
+ ASSERT(resource->type->stop);
+ if (IS_FD_UNKNOWN(state)) {
+ /* fast track to stop callback */
+ call_stop = CALL_STOP;
+ ret = 0;
+ goto done_unknown;
+ }
+ on = 0;
+ mode = ERL_DRV_READ | ERL_DRV_WRITE | ERL_DRV_USE;
+ wake_poller = 1; /* to eject fd from pollset (if needed) */
+ }
+ else {
+ on = 1;
+ ASSERT(mode);
+ wake_poller = 0;
+ }
+
+#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ if (state == NULL) {
+ state = hash_new_drv_ev_state(fd);
+ }
+#endif
+
+ switch (state->type) {
+ case ERTS_EV_TYPE_NIF:
+ /*
+ * Changing resource is considered stealing.
+ * Changing process and/or ref is ok (I think?).
+ */
+ if (state->driver.stop.resource != resource)
+ nif_select_steal(state, ERL_DRV_READ | ERL_DRV_WRITE, resource, ref);
+ break;
+#if ERTS_CIO_HAVE_DRV_EVENT
+ case ERTS_EV_TYPE_DRV_EV:
+#endif
+ case ERTS_EV_TYPE_DRV_SEL:
+ nif_select_steal(state, mode, resource, ref);
+ break;
+ case ERTS_EV_TYPE_STOP_USE: {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_nif_select_op(dsbufp, fd, mode, resource, ref);
+ steal_pending_stop_use(dsbufp, ERTS_INVALID_ERL_DRV_PORT, state, mode, on);
+ ASSERT(state->type == ERTS_EV_TYPE_NONE);
+ break;
+ }
+ case ERTS_EV_TYPE_STOP_NIF: {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_nif_select_op(dsbufp, fd, mode, resource, ref);
+ steal_pending_stop_nif(dsbufp, resource, state, mode, on);
+ if (state->type == ERTS_EV_TYPE_STOP_NIF) {
+ ret = 0;
+ goto done;
+ }
+ ASSERT(state->type == ERTS_EV_TYPE_NONE);
+ break;
+ }}
+
+ ASSERT(state->type == ERTS_EV_TYPE_NONE ||
+ state->type == ERTS_EV_TYPE_NIF);
+
+ if (mode & ERL_DRV_READ) {
+ ctl_events |= ERTS_POLL_EV_IN;
+ }
+ if (mode & ERL_DRV_WRITE) {
+ ctl_events |= ERTS_POLL_EV_OUT;
+ }
+
+ ASSERT((state->type == ERTS_EV_TYPE_NIF) ||
+ (state->type == ERTS_EV_TYPE_NONE && !state->events));
+
+ new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller);
+
+ if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
+ if (state->type == ERTS_EV_TYPE_NIF && !state->events) {
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ state->driver.nif->in.pid = NIL;
+ state->driver.nif->out.pid = NIL;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ state->driver.stop.resource = NULL;
+ }
+ ret = -1;
+ goto done;
+ }
+
+ old_events = state->events;
+
+ ASSERT(on
+ ? (new_events == (state->events | ctl_events))
+ : (new_events == (state->events & ~ctl_events)));
+
+ ASSERT(state->type == ERTS_EV_TYPE_NIF
+ || state->type == ERTS_EV_TYPE_NONE);
+
+ state->events = new_events;
+ if (ctl_events) {
+ if (on) {
+ Uint32* refn;
+ if (!state->driver.nif)
+ state->driver.nif = alloc_nif_select_data();
+ if (state->type == ERTS_EV_TYPE_NONE) {
+ state->type = ERTS_EV_TYPE_NIF;
+ state->driver.stop.resource = resource;
+ enif_keep_resource(resource->data);
+ }
+ ASSERT(state->type == ERTS_EV_TYPE_NIF);
+ ASSERT(state->driver.stop.resource == resource);
+ if (ctl_events & ERTS_POLL_EV_IN) {
+ ASSERT(is_nil(state->driver.nif->in.pid));
+ state->driver.nif->in.pid = id;
+ if (is_immed(ref)) {
+ state->driver.nif->in.immed = ref;
+ } else {
+ ASSERT(is_internal_ref(ref));
+ refn = internal_ref_numbers(ref);
+ state->driver.nif->in.immed = THE_NON_VALUE;
+ state->driver.nif->in.refn[0] = refn[0];
+ state->driver.nif->in.refn[1] = refn[1];
+ state->driver.nif->in.refn[2] = refn[2];
+ }
+ state->driver.nif->in.ddeselect_cnt = 0;
+ }
+ if (ctl_events & ERTS_POLL_EV_OUT) {
+ ASSERT(is_nil(state->driver.nif->out.pid));
+ state->driver.nif->out.pid = id;
+ if (is_immed(ref)) {
+ state->driver.nif->out.immed = ref;
+ } else {
+ ASSERT(is_internal_ref(ref));
+ refn = internal_ref_numbers(ref);
+ state->driver.nif->out.immed = THE_NON_VALUE;
+ state->driver.nif->out.refn[0] = refn[0];
+ state->driver.nif->out.refn[1] = refn[1];
+ state->driver.nif->out.refn[2] = refn[2];
+ }
+ state->driver.nif->out.ddeselect_cnt = 0;
+ }
+ state->flags |= ERTS_EV_FLAG_USED;
+ }
+ else { /* off */
+ if (state->type == ERTS_EV_TYPE_NIF) {
+ //erts_fprintf(stderr, "SVERK: enif select clear fd=%d inpid=%T inrsrc=%p\n",
+ // state->fd, state->driver.nif->inpid,
+ // state->driver.nif->in.resource);
+ state->driver.nif->in.pid = NIL;
+ state->driver.nif->out.pid = NIL;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ if (old_events != 0) {
+ remember_removed(state, &pollset);
+ }
+ }
+ ASSERT(new_events==0);
+ if (state->remove_cnt == 0 || !wake_poller) {
+ /* Safe to close fd now as it is not in pollset
+ or there was no need to eject fd (kernel poll) */
+ //erts_fprintf(stderr, "SVERK : enif_select calling stop before return\n");
+ if (state->type == ERTS_EV_TYPE_NIF) {
+ ASSERT(state->driver.stop.resource == resource);
+ call_stop = CALL_STOP_AND_RELEASE;
+ state->driver.stop.resource = NULL;
+ }
+ else {
+ ASSERT(!state->driver.stop.resource);
+ call_stop = CALL_STOP;
+ }
+ state->type = ERTS_EV_TYPE_NONE;
+ }
+ else {
+ /* Not safe to close fd, postpone stop_select callback. */
+ //erts_fprintf(stderr, "SVERK: enif_select schedule stop\n");
+ if (state->type == ERTS_EV_TYPE_NONE) {
+ ASSERT(!state->driver.stop.resource);
+ state->driver.stop.resource = resource;
+ enif_keep_resource(resource);
+ }
+ state->type = ERTS_EV_TYPE_STOP_NIF;
+ }
+ }
+ }
+
+ ret = 0;
+
+done:
+
+ check_fd_cleanup(state,
+#if ERTS_CIO_HAVE_DRV_EVENT
+ &free_event,
+#endif
+ &free_select,
+ &free_nif);
+
+done_unknown:
+ erts_smp_mtx_unlock(fd_mtx(fd));
+ if (call_stop) {
+ erts_resource_stop(resource);
+ if (call_stop == CALL_STOP_AND_RELEASE) {
+ enif_release_resource(resource->data);
+ }
+ }
+ if (free_select)
+ free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
+
+#if ERTS_CIO_HAVE_DRV_EVENT
+ if (free_event)
+ free_drv_event_data(free_event);
+#endif
+ return ret;
+}
+
+
+int
ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
ErlDrvEvent e,
ErlDrvEventData event_data)
@@ -1094,6 +1481,7 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
ErtsDrvEventDataState *free_event;
#endif
ErtsDrvSelectDataState *free_select;
+ ErtsNifSelectDataState *free_nif;
Port *prt = erts_drvport2port(ix);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
@@ -1203,12 +1591,15 @@ done:
#if ERTS_CIO_HAVE_DRV_EVENT
&free_event,
#endif
- &free_select);
+ &free_select,
+ &free_nif);
erts_smp_mtx_unlock(fd_mtx(fd));
if (free_select)
free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
#if ERTS_CIO_HAVE_DRV_EVENT
if (free_event)
free_drv_event_data(free_event);
@@ -1244,13 +1635,19 @@ need2steal(ErtsDrvEventState *state, int mode)
state,
ERL_DRV_WRITE);
break;
+ case ERTS_EV_TYPE_NIF:
+ ASSERT(state->driver.stop.resource);
+ do_steal = 1;
+ break;
+
#if ERTS_CIO_HAVE_DRV_EVENT
case ERTS_EV_TYPE_DRV_EV:
do_steal |= chk_stale(state->driver.event->port, state, 0);
break;
#endif
case ERTS_EV_TYPE_STOP_USE:
- ASSERT(0);
+ case ERTS_EV_TYPE_STOP_NIF:
+ ASSERT(0);
break;
default:
break;
@@ -1311,6 +1708,25 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode)
erts_dsprintf(dsbufp, "\n");
break;
}
+ case ERTS_EV_TYPE_NIF: {
+ Eterm iid = state->driver.nif->in.pid;
+ Eterm oid = state->driver.nif->out.pid;
+ const char* with = "with";
+ ErlNifResourceType* rt = state->driver.stop.resource->type;
+
+ erts_dsprintf(dsbufp, "resource %T:%T", rt->module, rt->name);
+
+ if (is_not_nil(iid)) {
+ erts_dsprintf(dsbufp, " %s in-pid %T", with, iid);
+ with = "and";
+ }
+ if (is_not_nil(oid)) {
+ erts_dsprintf(dsbufp, " %s out-pid %T", with, oid);
+ }
+ deselect(state, 0);
+ erts_dsprintf(dsbufp, "\n");
+ break;
+ }
#if ERTS_CIO_HAVE_DRV_EVENT
case ERTS_EV_TYPE_DRV_EV: {
Eterm eid = state->driver.event->port;
@@ -1328,7 +1744,8 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode)
break;
}
#endif
- case ERTS_EV_TYPE_STOP_USE: {
+ case ERTS_EV_TYPE_STOP_USE:
+ case ERTS_EV_TYPE_STOP_NIF: {
ASSERT(0);
break;
}
@@ -1358,6 +1775,23 @@ print_drv_select_op(erts_dsprintf_buf_t *dsbufp,
}
static void
+print_nif_select_op(erts_dsprintf_buf_t *dsbufp,
+ ErtsSysFdType fd, int mode,
+ ErlNifResource* resource, Eterm ref)
+{
+ erts_dsprintf(dsbufp,
+ "enif_select(_, %d,%s%s%s, %T:%T, %T) ",
+ (int) GET_FD(fd),
+ mode & ERL_NIF_SELECT_READ ? " READ" : "",
+ mode & ERL_NIF_SELECT_WRITE ? " WRITE" : "",
+ mode & ERL_NIF_SELECT_STOP ? " STOP" : "",
+ resource->type->module,
+ resource->type->name,
+ ref);
+}
+
+
+static void
drv_select_steal(ErlDrvPort ix, ErtsDrvEventState *state, int mode, int on)
{
if (need2steal(state, mode)) {
@@ -1368,6 +1802,18 @@ drv_select_steal(ErlDrvPort ix, ErtsDrvEventState *state, int mode, int on)
}
}
+static void
+nif_select_steal(ErtsDrvEventState *state, int mode,
+ ErlNifResource* resource, Eterm ref)
+{
+ if (need2steal(state, mode)) {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_nif_select_op(dsbufp, state->fd, mode, resource, ref);
+ steal(dsbufp, state, mode);
+ erts_send_error_to_logger_nogl(dsbufp);
+ }
+}
+
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
static void
large_fd_error_common(erts_dsprintf_buf_t *dsbufp, ErtsSysFdType fd)
@@ -1378,7 +1824,7 @@ large_fd_error_common(erts_dsprintf_buf_t *dsbufp, ErtsSysFdType fd)
}
static void
-select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on)
+drv_select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on)
{
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
print_drv_select_op(dsbufp, ix, fd, mode, on);
@@ -1386,6 +1832,16 @@ select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on)
large_fd_error_common(dsbufp, fd);
erts_send_error_to_logger_nogl(dsbufp);
}
+static void
+nif_select_large_fd_error(ErtsSysFdType fd, int mode,
+ ErlNifResource* resource, Eterm ref)
+{
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_nif_select_op(dsbufp, fd, mode, resource, ref);
+ erts_dsprintf(dsbufp, "failed: ");
+ large_fd_error_common(dsbufp, fd);
+ erts_send_error_to_logger_nogl(dsbufp);
+}
#endif /* ERTS_SYS_CONTINOUS_FD_NUMBERS */
@@ -1394,38 +1850,78 @@ static void
steal_pending_stop_use(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix,
ErtsDrvEventState *state, int mode, int on)
{
+ int cancel = 0;
ASSERT(state->type == ERTS_EV_TYPE_STOP_USE);
- erts_dsprintf(dsbufp, "failed: fd=%d (re)selected before stop_select "
- "was called for driver %s\n",
- (int) GET_FD(state->fd), state->driver.drv_ptr->name);
- erts_send_error_to_logger_nogl(dsbufp);
if (on) {
/* Either fd-owner changed its mind about closing
* or closed fd before stop_select callback and fd is now reused.
* In either case stop_select should not be called.
- */
- state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
- if (state->driver.drv_ptr->handle) {
- erts_ddll_dereference_driver(state->driver.drv_ptr->handle);
- }
- state->driver.drv_ptr = NULL;
+ */
+ cancel = 1;
}
else if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
Port *prt = erts_drvport2port(ix);
- erts_driver_t* drv_ptr = prt != ERTS_INVALID_ERL_DRV_PORT ? prt->drv_ptr : NULL;
- if (drv_ptr && drv_ptr != state->driver.drv_ptr) {
- /* Some other driver wants the stop_select callback */
- if (state->driver.drv_ptr->handle) {
- erts_ddll_dereference_driver(state->driver.drv_ptr->handle);
- }
- if (drv_ptr->handle) {
- erts_ddll_reference_referenced_driver(drv_ptr->handle);
- }
- state->driver.drv_ptr = drv_ptr;
- }
+ if (prt == ERTS_INVALID_ERL_DRV_PORT
+ || prt->drv_ptr != state->driver.stop.drv_ptr) {
+ /* Some other driver or nif wants the stop_select callback */
+ cancel = 1;
+ }
+ }
+
+ if (cancel) {
+ erts_dsprintf(dsbufp, "called before stop_select was called for driver '%s'\n",
+ state->driver.stop.drv_ptr->name);
+ if (state->driver.stop.drv_ptr->handle) {
+ erts_ddll_dereference_driver(state->driver.stop.drv_ptr->handle);
+ }
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ state->driver.stop.drv_ptr = NULL;
+ }
+ else {
+ erts_dsprintf(dsbufp, "ignored repeated call\n");
+ }
+ erts_send_error_to_logger_nogl(dsbufp);
+}
+
+static void
+steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErlNifResource* resource,
+ ErtsDrvEventState *state, int mode, int on)
+{
+ int cancel = 0;
+
+ ASSERT(state->type == ERTS_EV_TYPE_STOP_NIF);
+ ASSERT(state->driver.stop.resource);
+
+ if (on) {
+ ASSERT(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE));
+ /* Either fd-owner changed its mind about closing
+ * or closed fd before stop callback and fd is now reused.
+ * In either case, stop should not be called.
+ */
+ cancel = 1;
}
+ else if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE
+ && resource != state->driver.stop.resource) {
+ /* Some driver or other resource wants the stop callback */
+ cancel = 1;
+ }
+
+ if (cancel) {
+ ErlNifResourceType* rt = state->driver.stop.resource->type;
+ erts_dsprintf(dsbufp, "called before stop was called for NIF resource %T:%T\n",
+ rt->module, rt->name);
+
+ enif_release_resource(state->driver.stop.resource);
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ state->driver.stop.resource = NULL;
+ }
+ else {
+ erts_dsprintf(dsbufp, "ignored repeated call\n");
+ }
+ erts_send_error_to_logger_nogl(dsbufp);
}
@@ -1557,6 +2053,48 @@ oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time)
}
}
+static ERTS_INLINE void
+send_event_tuple(struct erts_nif_select_event* e, ErlNifResource* resource,
+ Eterm event_atom)
+{
+ Process* rp = erts_proc_lookup(e->pid);
+ ErtsProcLocks rp_locks = 0;
+ ErtsMessage* mp;
+ ErlOffHeap* ohp;
+ ErtsBinary* bin;
+ Eterm* hp;
+ Uint hsz = 5 + PROC_BIN_SIZE + REF_THING_SIZE; /* {select, Resource, Ref, EventAtom} */
+ Eterm resource_term, ref_term, tuple;
+
+ if (!rp) {
+ erts_fprintf(stderr, "SVERK: Process %T not alive for msg %T\n", e->pid, event_atom);
+ return;
+ }
+
+ bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource);
+
+ mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp);
+
+ resource_term = erts_mk_magic_binary_term(&hp, ohp, &bin->binary);
+ if (is_value(e->immed)) {
+ ASSERT(is_immed(e->immed));
+ ref_term = e->immed;
+ }
+ else {
+ write_ref_thing(hp, e->refn[0], e->refn[1], e->refn[2]);
+ ref_term = make_internal_ref(hp);
+ }
+ hp += REF_THING_SIZE;
+ tuple = TUPLE4(hp, am_select, resource_term, ref_term, event_atom);
+
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_message(rp, rp_locks, mp, tuple, am_system);
+
+ if (rp_locks)
+ erts_smp_proc_unlock(rp, rp_locks);
+}
+
+
#if ERTS_CIO_HAVE_DRV_EVENT
static ERTS_INLINE void
eready(Eterm id, ErtsDrvEventState *state, ErlDrvEventData event_data,
@@ -1602,6 +2140,8 @@ ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int set,
ERTS_CIO_POLL_INTR_TMD(pollset.ps, set, timeout_time);
}
+#define ERTS_NIF_DELAYED_DESELECT 20
+
void
ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
{
@@ -1635,7 +2175,8 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
current_cio_time++;
erts_smp_atomic_set_relb(&erts_check_io_time, current_cio_time);
- check_cleanup_active_fds(current_cio_time);
+ check_cleanup_active_fds(current_cio_time,
+ timeout_time != ERTS_POLL_NO_TIMEOUT);
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
@@ -1741,6 +2282,69 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
break;
}
+ case ERTS_EV_TYPE_NIF: { /* Requested via enif_select()... */
+ struct erts_nif_select_event in = {NIL};
+ struct erts_nif_select_event out = {NIL};
+ ErlNifResource* resource;
+ ErtsPollEvents revents = pollres[i].events;
+
+ if (revents & ERTS_POLL_EV_ERR) {
+ /*
+ * Handle error events by triggering all in/out events
+ * that the NIF has selected.
+ * We *do not* want to send a message that corresponds
+ * to an event not selected.
+ */
+ revents = state->events;
+ }
+ else {
+ revents &= (state->events | ERTS_POLL_EV_NVAL);
+ }
+
+ if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
+ if (revents & ERTS_POLL_EV_OUT) {
+ if (is_not_nil(state->driver.nif->out.pid)) {
+ out = state->driver.nif->out;
+ resource = state->driver.stop.resource;
+ state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
+ state->driver.nif->out.pid = NIL;
+ add_active_fd(state->fd);
+ }
+ else {
+ ASSERT(state->driver.nif->out.ddeselect_cnt >= 2);
+ state->driver.nif->out.ddeselect_cnt--;
+ }
+ }
+ if (revents & ERTS_POLL_EV_IN) {
+ if (is_not_nil(state->driver.nif->in.pid)) {
+ in = state->driver.nif->in;
+ resource = state->driver.stop.resource;
+ state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
+ state->driver.nif->in.pid = NIL;
+ add_active_fd(state->fd);
+ }
+ else {
+ ASSERT(state->driver.nif->in.ddeselect_cnt >= 2);
+ state->driver.nif->in.ddeselect_cnt--;
+ }
+ }
+ }
+ else if (revents & ERTS_POLL_EV_NVAL) {
+ abort();
+ }
+
+#ifdef ERTS_SMP
+ erts_smp_mtx_unlock(fd_mtx(fd));
+#endif
+ if (is_not_nil(in.pid)) {
+ send_event_tuple(&in, resource, am_ready_input);
+ }
+ if (is_not_nil(out.pid)) {
+ send_event_tuple(&out, resource, am_ready_output);
+ }
+ goto next_pollres_unlocked;
+ }
+
#if ERTS_CIO_HAVE_DRV_EVENT
case ERTS_EV_TYPE_DRV_EV: { /* Requested via driver_event()... */
ErlDrvEventData event_data;
@@ -1781,6 +2385,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
#ifdef ERTS_SMP
erts_smp_mtx_unlock(fd_mtx(fd));
#endif
+ next_pollres_unlocked:;
}
erts_smp_atomic_set_nob(&pollset.in_poll_wait, 0);
@@ -2306,6 +2911,39 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
}
}
}
+ else if (state->type == ERTS_EV_TYPE_NIF) {
+ ErlNifResource* r;
+ erts_printf("enif_select ");
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ if (internal) {
+ erts_printf("internal ");
+ err = 1;
+ }
+
+ if (cio_events == ep_events) {
+ erts_printf("ev=");
+ if (print_events(cio_events) != 0)
+ err = 1;
+ }
+ else {
+ err = 1;
+ erts_printf("cio_ev=");
+ print_events(cio_events);
+ erts_printf(" ep_ev=");
+ print_events(ep_events);
+ }
+#else
+ if (print_events(cio_events) != 0)
+ err = 1;
+#endif
+ erts_printf(" inpid=%T dd_cnt=%b32d", state->driver.nif->in.pid,
+ state->driver.nif->in.ddeselect_cnt);
+ erts_printf(" outpid=%T dd_cnt=%b32d", state->driver.nif->out.pid,
+ state->driver.nif->out.ddeselect_cnt);
+ r = state->driver.stop.resource;
+ erts_printf(" resource=%p(%T:%T)", r, r->type->module, r->type->name);
+ }
#if ERTS_CIO_HAVE_DRV_EVENT
else if (state->type == ERTS_EV_TYPE_DRV_EV) {
Eterm id;
@@ -2362,11 +3000,12 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
erts_printf("control_type=%d ", (int)state->type);
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
if (cio_events == ep_events) {
- erts_printf("ev=0x%b32x", (Uint32) cio_events);
+ erts_printf("ev=");
+ print_events(cio_events);
}
else {
- erts_printf("cio_ev=0x%b32x", (Uint32) cio_events);
- erts_printf(" ep_ev=0x%b32x", (Uint32) ep_events);
+ erts_printf("cio_ev="); print_events(cio_events);
+ erts_printf(" ep_ev="); print_events(ep_events);
}
#else
erts_printf("ev=0x%b32x", (Uint32) cio_events);
@@ -2395,7 +3034,7 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
#if ERTS_CIO_HAVE_DRV_EVENT
null_des.driver.event = NULL;
#endif
- null_des.driver.drv_ptr = NULL;
+ null_des.driver.stop.drv_ptr = NULL;
null_des.events = 0;
null_des.remove_cnt = 0;
null_des.type = ERTS_EV_TYPE_NONE;
diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h
index 14f1ea3f43..242e7cfcf5 100644
--- a/erts/emulator/sys/common/erl_check_io.h
+++ b/erts/emulator/sys/common/erl_check_io.h
@@ -34,6 +34,8 @@
int driver_select_kp(ErlDrvPort, ErlDrvEvent, int, int);
int driver_select_nkp(ErlDrvPort, ErlDrvEvent, int, int);
+int enif_select_kp(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm);
+int enif_select_nkp(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm);
int driver_event_kp(ErlDrvPort, ErlDrvEvent, ErlDrvEventData);
int driver_event_nkp(ErlDrvPort, ErlDrvEvent, ErlDrvEventData);
Uint erts_check_io_size_kp(void);
@@ -136,4 +138,20 @@ typedef struct {
ErtsIoTask iniotask;
ErtsIoTask outiotask;
} ErtsDrvSelectDataState;
+
+struct erts_nif_select_event {
+ Eterm pid;
+ Eterm immed;
+ Uint32 refn[ERTS_REF_NUMBERS];
+ Sint32 ddeselect_cnt; /* 0: No delayed deselect in progress
+ * 1: Do deselect before next poll
+ * >1: Countdown of ignored events
+ */
+};
+
+typedef struct {
+ struct erts_nif_select_event in;
+ struct erts_nif_select_event out;
+} ErtsNifSelectDataState;
+
#endif /* #ifndef ERL_CHECK_IO_INTERNAL__ */
diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c
index b8a28bcc18..0c4df3e13a 100644
--- a/erts/emulator/sys/common/erl_poll.c
+++ b/erts/emulator/sys/common/erl_poll.c
@@ -3030,6 +3030,7 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet ps,
ev[fd] = 0;
else {
ev[fd] = ps->fds_status[fd].events;
+ ASSERT(ps->fds_status[fd].used_events == ev[fd]);
if (
#if ERTS_POLL_USE_WAKEUP_PIPE
fd == ps->wake_fds[0] || fd == ps->wake_fds[1] ||
diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c
index 4b2edace0a..789b455f2d 100644
--- a/erts/emulator/sys/unix/sys.c
+++ b/erts/emulator/sys/unix/sys.c
@@ -161,6 +161,7 @@ int erts_use_kernel_poll = 0;
struct {
int (*select)(ErlDrvPort, ErlDrvEvent, int, int);
+ int (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm);
int (*event)(ErlDrvPort, ErlDrvEvent, ErlDrvEventData);
void (*check_io_as_interrupt)(void);
void (*check_io_interrupt)(int);
@@ -184,6 +185,13 @@ driver_event(ErlDrvPort port, ErlDrvEvent event, ErlDrvEventData event_data)
return (*io_func.event)(port, event, event_data);
}
+int enif_select(ErlNifEnv* env, ErlNifEvent event,
+ enum ErlNifSelectFlags flags, void* obj, Eterm ref)
+{
+ return (*io_func.enif_select)(env, event, flags, obj, ref);
+}
+
+
Eterm erts_check_io_info(void *p)
{
return (*io_func.info)(p);
@@ -201,6 +209,7 @@ init_check_io(void)
{
if (erts_use_kernel_poll) {
io_func.select = driver_select_kp;
+ io_func.enif_select = enif_select_kp;
io_func.event = driver_event_kp;
#ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT
io_func.check_io_as_interrupt = erts_check_io_async_sig_interrupt_kp;
@@ -216,6 +225,7 @@ init_check_io(void)
}
else {
io_func.select = driver_select_nkp;
+ io_func.enif_select = enif_select_nkp;
io_func.event = driver_event_nkp;
#ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT
io_func.check_io_as_interrupt = erts_check_io_async_sig_interrupt_nkp;
diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl
index 0d3910b2e2..8c761202d6 100644
--- a/erts/emulator/test/nif_SUITE.erl
+++ b/erts/emulator/test/nif_SUITE.erl
@@ -31,6 +31,7 @@
init_per_testcase/2, end_per_testcase/2,
basic/1, reload_error/1, upgrade/1, heap_frag/1,
t_on_load/1,
+ select/1,
hipe/1,
types/1, many_args/1, binaries/1, get_string/1, get_atom/1,
maps/1,
@@ -64,6 +65,7 @@ all() ->
[{group, G} || G <- api_groups()]
++
[reload_error, heap_frag, types, many_args,
+ select,
hipe,
binaries, get_string, get_atom, maps, api_macros, from_array,
iolist_as_binary, resource, resource_binary,
@@ -434,6 +436,33 @@ t_on_load(Config) when is_list(Config) ->
verify_tmpmem(TmpMem),
ok.
+-define(ERL_NIF_SELECT_READ, (1 bsl 0)).
+-define(ERL_NIF_SELECT_WRITE, (1 bsl 1)).
+-define(ERL_NIF_SELECT_STOP, (1 bsl 2)).
+
+select(Config) when is_list(Config) ->
+ ensure_lib_loaded(Config),
+
+ Ref = make_ref(),
+ {R,W} = pipe_nif(),
+ ok = write_nif(W, <<"hej">>),
+ <<"hej">> = read_nif(R, 3),
+ eagain = read_nif(R, 3),
+ 0 = select_nif(R,?ERL_NIF_SELECT_READ,R,Ref),
+ [] = flush(),
+ ok = write_nif(W, <<"hej">>),
+ [{select, R, Ref, ready_input}] = flush(),
+ <<"hej">> = read_nif(R, 3),
+
+ %% To be extended...
+
+ 0 = select_nif(R,?ERL_NIF_SELECT_STOP,R,Ref),
+ 0 = select_nif(W,?ERL_NIF_SELECT_STOP,W,Ref),
+ timer:sleep(10),
+ true = is_closed_nif(R),
+ true = is_closed_nif(W),
+ ok.
+
hipe(Config) when is_list(Config) ->
Data = proplists:get_value(data_dir, Config),
Priv = proplists:get_value(priv_dir, Config),
@@ -1910,6 +1939,15 @@ call(Pid,Cmd) ->
receive_any() ->
receive M -> M end.
+flush() ->
+ flush(10).
+flush(Timeout) ->
+ receive M ->
+ [M | flush(Timeout)]
+ after Timeout ->
+ []
+ end.
+
repeat(0, _, Arg) ->
Arg;
repeat(N, Fun, Arg0) ->
@@ -2273,6 +2311,11 @@ term_to_binary_nif(_, _) -> ?nif_stub.
binary_to_term_nif(_, _, _) -> ?nif_stub.
port_command_nif(_, _) -> ?nif_stub.
format_term_nif(_,_) -> ?nif_stub.
+select_nif(_,_,_,_) -> ?nif_stub.
+pipe_nif() -> ?nif_stub.
+write_nif(_,_) -> ?nif_stub.
+read_nif(_,_) -> ?nif_stub.
+is_closed_nif(_) -> ?nif_stub.
%% maps
is_map_nif(_) -> ?nif_stub.
diff --git a/erts/emulator/test/nif_SUITE_data/nif_SUITE.c b/erts/emulator/test/nif_SUITE_data/nif_SUITE.c
index 2c93891852..a4915b13c4 100644
--- a/erts/emulator/test/nif_SUITE_data/nif_SUITE.c
+++ b/erts/emulator/test/nif_SUITE_data/nif_SUITE.c
@@ -25,6 +25,8 @@
#include <limits.h>
#ifndef __WIN32__
#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
#endif
#include "nif_mod.h"
@@ -42,7 +44,9 @@ static ERL_NIF_TERM atom_second;
static ERL_NIF_TERM atom_millisecond;
static ERL_NIF_TERM atom_microsecond;
static ERL_NIF_TERM atom_nanosecond;
-
+static ERL_NIF_TERM atom_eagain;
+static ERL_NIF_TERM atom_eof;
+static ERL_NIF_TERM atom_error;
typedef struct
{
@@ -102,6 +106,18 @@ struct binary_resource {
unsigned size;
};
+static ErlNifResourceType* fd_resource_type;
+static void fd_resource_dtor(ErlNifEnv* env, void* obj);
+static void fd_resource_stop(ErlNifEnv* env, void* obj);
+static ErlNifResourceTypeInit fd_rt_init = {
+ fd_resource_dtor,
+ fd_resource_stop
+};
+struct fd_resource {
+ int fd;
+};
+
+
static int get_pointer(ErlNifEnv* env, ERL_NIF_TERM term, void** pp)
{
ErlNifBinary bin;
@@ -144,6 +160,9 @@ static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
msgenv_resource_type = enif_open_resource_type(env,NULL,"nif_SUITE.msgenv",
msgenv_dtor,
ERL_NIF_RT_CREATE, NULL);
+ fd_resource_type = enif_open_resource_type_x(env, "nif_SUITE.fd",
+ &fd_rt_init,
+ ERL_NIF_RT_CREATE, NULL);
atom_false = enif_make_atom(env,"false");
atom_true = enif_make_atom(env,"true");
atom_self = enif_make_atom(env,"self");
@@ -154,6 +173,9 @@ static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_millisecond = enif_make_atom(env,"millisecond");
atom_microsecond = enif_make_atom(env,"microsecond");
atom_nanosecond = enif_make_atom(env,"nanosecond");
+ atom_eagain = enif_make_atom(env, "eagain");
+ atom_eof = enif_make_atom(env, "eof");
+ atom_error = enif_make_atom(env, "error");
*priv_data = data;
return 0;
@@ -2010,6 +2032,165 @@ static ERL_NIF_TERM format_term(ErlNifEnv* env, int argc, const ERL_NIF_TERM arg
}
+static int get_fd(ErlNifEnv* env, ERL_NIF_TERM term, ErlNifEvent* fd)
+{
+ struct fd_resource* rsrc;
+
+ if (!enif_get_resource(env, term, fd_resource_type, (void**)&rsrc)) {
+ return 0;
+ }
+ *fd = rsrc->fd;
+ return 1;
+}
+
+static ERL_NIF_TERM select_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+{
+ ErlNifEvent e;
+ enum ErlNifSelectFlags mode;
+ void* obj;
+ ERL_NIF_TERM ref;
+ int retval;
+
+ if (!get_fd(env, argv[0], &e)
+ || !enif_get_uint(env, argv[1], &mode)
+ || !enif_get_resource(env, argv[2], fd_resource_type, &obj))
+ {
+ return enif_make_badarg(env);
+ }
+
+ ref = argv[3];
+
+ retval = enif_select(env, e, mode, obj, ref);
+
+ return enif_make_int(env, retval);
+}
+
+static ERL_NIF_TERM pipe_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+{
+ struct fd_resource* read_rsrc;
+ struct fd_resource* write_rsrc;
+ ERL_NIF_TERM read_fd, write_fd;
+ unsigned char *inp, *outp;
+ int fds[2], flags;
+
+ if (pipe(fds) < 0)
+ return enif_make_string(env, "pipe failed", ERL_NIF_LATIN1);
+
+ if ((flags = fcntl(fds[0], F_GETFL, 0)) < 0
+ || fcntl(fds[0], F_SETFL, flags|O_NONBLOCK) < 0
+ || (flags = fcntl(fds[1], F_GETFL, 0)) < 0
+ || fcntl(fds[1], F_SETFL, flags|O_NONBLOCK) < 0) {
+ close(fds[0]);
+ close(fds[1]);
+ return enif_make_string(env, "fcntl failed on pipe", ERL_NIF_LATIN1);
+ }
+
+ read_rsrc = enif_alloc_resource(fd_resource_type, sizeof(struct fd_resource));
+ write_rsrc = enif_alloc_resource(fd_resource_type, sizeof(struct fd_resource));
+ read_rsrc->fd = fds[0];
+ write_rsrc->fd = fds[1];
+ read_fd = enif_make_resource(env, read_rsrc);
+ write_fd = enif_make_resource(env, write_rsrc);
+ enif_release_resource(read_rsrc);
+ enif_release_resource(write_rsrc);
+
+ return enif_make_tuple2(env, read_fd, write_fd);
+}
+
+static ERL_NIF_TERM write_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+{
+ ErlNifEvent fd;
+ ErlNifBinary bin;
+ int n, written = 0;
+
+ if (!get_fd(env, argv[0], &fd)
+ || !enif_inspect_binary(env, argv[1], &bin))
+ return enif_make_badarg(env);
+
+ for (;;) {
+ n = write(fd, bin.data + written, bin.size - written);
+ if (n >= 0) {
+ written += n;
+ if (written == bin.size) {
+ return atom_ok;
+ }
+ }
+ else if (errno == EAGAIN) {
+ return enif_make_tuple2(env, atom_eagain, enif_make_int(env, written));
+ }
+ else if (errno == EINTR) {
+ continue;
+ }
+ else {
+ return enif_make_tuple2(env, atom_error, enif_make_int(env, errno));
+ }
+ }
+}
+
+static ERL_NIF_TERM read_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+{
+ ErlNifEvent fd;
+ unsigned char* buf;
+ int n, count, nread = 0;
+ ERL_NIF_TERM res;
+
+ if (!get_fd(env, argv[0], &fd)
+ || !enif_get_int(env, argv[1], &count) || count < 1)
+ return enif_make_badarg(env);
+
+ buf = enif_make_new_binary(env, count, &res);
+
+ for (;;) {
+ n = read(fd, buf, count);
+ if (n > 0) {
+ if (n < count) {
+ res = enif_make_sub_binary(env, res, 0, n);
+ }
+ return res;
+ }
+ else if (n == 0) {
+ return atom_eof;
+ }
+ else if (errno == EAGAIN) {
+ return atom_eagain;
+ }
+ else if (errno == EINTR) {
+ continue;
+ }
+ else {
+ return enif_make_tuple2(env, atom_error, enif_make_int(env, errno));
+ }
+ }
+}
+
+static ERL_NIF_TERM is_closed_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+{
+ ErlNifEvent fd;
+
+ if (!get_fd(env, argv[0], &fd))
+ return enif_make_badarg(env);
+
+ return fd < 0 ? atom_true : atom_false;
+}
+
+static void fd_resource_dtor(ErlNifEnv* env, void* obj)
+{
+ struct fd_resource* rsrc = (struct fd_resource*)obj;
+ if (rsrc->fd >= 0)
+ close(rsrc->fd);
+}
+
+static void fd_resource_stop(ErlNifEnv* env, void* obj)
+{
+ struct fd_resource* rsrc = (struct fd_resource*)obj;
+ if (rsrc->fd >= 0) {
+ close(rsrc->fd);
+ rsrc->fd = -1; /* thread safety ? */
+ }
+}
+
+
+
static ErlNifFunc nif_funcs[] =
{
{"lib_version", 0, lib_version},
@@ -2086,7 +2267,12 @@ static ErlNifFunc nif_funcs[] =
{"term_to_binary_nif", 2, term_to_binary},
{"binary_to_term_nif", 3, binary_to_term},
{"port_command_nif", 2, port_command},
- {"format_term_nif", 2, format_term}
+ {"format_term_nif", 2, format_term},
+ {"select_nif", 4, select_nif},
+ {"pipe_nif", 0, pipe_nif},
+ {"write_nif", 2, write_nif},
+ {"read_nif", 2, read_nif},
+ {"is_closed_nif", 1, is_closed_nif}
};
ERL_NIF_INIT(nif_SUITE,nif_funcs,load,NULL,upgrade,unload)