aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/sys/common/erl_check_io.c
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2017-05-30 16:35:18 +0200
committerLukas Larsson <[email protected]>2017-10-02 10:35:52 +0200
commit988f5f5e8061ce2e135a314ca782788eda478a06 (patch)
tree1609c42abb9ed03a176865ee32ad3de803c392d3 /erts/emulator/sys/common/erl_check_io.c
parent1f9003a3dd9bbf9e3dcd84b8fdecef04a3b5e9c7 (diff)
downloadotp-988f5f5e8061ce2e135a314ca782788eda478a06.tar.gz
otp-988f5f5e8061ce2e135a314ca782788eda478a06.tar.bz2
otp-988f5f5e8061ce2e135a314ca782788eda478a06.zip
erts: Move all I/O polling to a seperate thread
Diffstat (limited to 'erts/emulator/sys/common/erl_check_io.c')
-rw-r--r--erts/emulator/sys/common/erl_check_io.c2304
1 files changed, 1043 insertions, 1261 deletions
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c
index ff0f3ea121..3131b96536 100644
--- a/erts/emulator/sys/common/erl_check_io.c
+++ b/erts/emulator/sys/common/erl_check_io.c
@@ -29,7 +29,6 @@
#endif
#define ERL_CHECK_IO_C__
-#define ERTS_WANT_BREAK_HANDLING
#ifndef WANT_NONBLOCKING
# define WANT_NONBLOCKING
#endif
@@ -44,151 +43,108 @@
#define ERTS_WANT_TIMER_WHEEL_API
#include "erl_time.h"
+#if 0
+#define DEBUG_PRINT(FMT, ...) erts_printf(FMT "\r\n", ##__VA_ARGS__)
+#define DEBUG_PRINT_FD(FMT, STATE, ...) \
+ DEBUG_PRINT("%d: " FMT " (ev=%s, ac=%s, flg=%d)", \
+ (STATE) ? (STATE)->fd : (ErtsSysFdType)-1, ##__VA_ARGS__, \
+ ev2str((STATE) ? (STATE)->events : ERTS_POLL_EV_NONE), \
+ ev2str((STATE) ? (STATE)->active_events : ERTS_POLL_EV_NONE), \
+ (STATE) ? (STATE)->flags : ERTS_EV_FLAG_CLEAR)
+#define DEBUG_PRINT_MODE
+#else
+#define DEBUG_PRINT(...)
+#endif
+
+#ifndef DEBUG_PRINT_FD
+#define DEBUG_PRINT_FD(...)
+#endif
+
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
# include "safe_hash.h"
# define DRV_EV_STATE_HTAB_SIZE 1024
#endif
-typedef char EventStateType;
-#define ERTS_EV_TYPE_NONE ((EventStateType) 0)
-#define ERTS_EV_TYPE_DRV_SEL ((EventStateType) 1) /* driver_select */
-#define ERTS_EV_TYPE_STOP_USE ((EventStateType) 2) /* pending stop_select */
-#define ERTS_EV_TYPE_NIF ((EventStateType) 3) /* enif_select */
-#define ERTS_EV_TYPE_STOP_NIF ((EventStateType) 4) /* pending nif stop */
-
-typedef char EventStateFlags;
-#define ERTS_EV_FLAG_USED ((EventStateFlags) 1) /* ERL_DRV_USE has been turned on */
-#define ERTS_EV_FLAG_DEFER_IN_EV ((EventStateFlags) 2)
-#define ERTS_EV_FLAG_DEFER_OUT_EV ((EventStateFlags) 4)
+typedef enum {
+ ERTS_EV_TYPE_NONE = 0,
+ ERTS_EV_TYPE_DRV_SEL = 1, /* driver_select */
+ ERTS_EV_TYPE_STOP_USE = 2, /* pending stop_select */
+ ERTS_EV_TYPE_NIF = 3, /* enif_select */
+ ERTS_EV_TYPE_STOP_NIF = 4 /* pending nif stop */
+} EventStateType;
-#ifdef DEBUG
-# define ERTS_ACTIVE_FD_INC 2
+typedef enum {
+ ERTS_EV_FLAG_CLEAR = 0,
+ ERTS_EV_FLAG_USED = 1, /* ERL_DRV_USE has been turned on */
+#ifdef ERTS_ENABLE_KERNEL_POLL
+ ERTS_EV_FLAG_FALLBACK = 2, /* Set when kernel poll rejected fd
+ and it was put in the nkp version */
#else
-# define ERTS_ACTIVE_FD_INC 128
+ ERTS_EV_FLAG_FALLBACK = ERTS_EV_FLAG_CLEAR,
#endif
-#define ERTS_CHECK_IO_POLL_RES_LEN 512
-
-#if defined(ERTS_KERNEL_POLL_VERSION)
-# define ERTS_CIO_EXPORT(FUNC) FUNC ## _kp
-#elif defined(ERTS_NO_KERNEL_POLL_VERSION)
-# define ERTS_CIO_EXPORT(FUNC) FUNC ## _nkp
-#else
-# define ERTS_CIO_EXPORT(FUNC) FUNC
-#endif
+ /* Combinations */
+ ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK
+} EventStateFlags;
-#define ERTS_CIO_POLL_CTL ERTS_POLL_EXPORT(erts_poll_control)
-#define ERTS_CIO_POLL_CTLV ERTS_POLL_EXPORT(erts_poll_controlv)
-#define ERTS_CIO_POLL_WAIT ERTS_POLL_EXPORT(erts_poll_wait)
-#define ERTS_CIO_POLL_INTR ERTS_POLL_EXPORT(erts_poll_interrupt)
-#define ERTS_CIO_POLL_INTR_TMD ERTS_POLL_EXPORT(erts_poll_interrupt_timed)
-#define ERTS_CIO_NEW_POLLSET ERTS_POLL_EXPORT(erts_poll_create_pollset)
-#define ERTS_CIO_FREE_POLLSET ERTS_POLL_EXPORT(erts_poll_destroy_pollset)
-#define ERTS_CIO_POLL_MAX_FDS ERTS_POLL_EXPORT(erts_poll_max_fds)
-#define ERTS_CIO_POLL_INIT ERTS_POLL_EXPORT(erts_poll_init)
-#define ERTS_CIO_POLL_INFO ERTS_POLL_EXPORT(erts_poll_info)
+#define flag2str(flags) \
+ ((flags) == ERTS_EV_FLAG_CLEAR ? "CLEAR" : \
+ ((flags) == ERTS_EV_FLAG_USED ? "USED" : \
+ ((flags) == ERTS_EV_FLAG_FALLBACK ? "FLBK" : \
+ ((flags) == ERTS_EV_FLAG_USED_FALLBACK ? "USED|FLBK" : "ERROR"))))
-#define GET_FD(fd) fd
+/* How many events that can be handled at once by one erts_poll_wait call */
+#define ERTS_CHECK_IO_POLL_RES_LEN 512
-static struct pollset_info
+/* Each I/O Poll Thread has one ErtsPollThread each. The ps field
+ can point to either a private ErtsPollSet or a shared one.
+ At the moment only kqueue and epoll pollsets can be
+ shared across threads.
+*/
+typedef struct erts_poll_thread
{
- ErtsPollSet ps;
- erts_atomic_t in_poll_wait; /* set while doing poll */
- erts_atomic_t check_io_time;
- struct {
- int six; /* start index */
- int eix; /* end index */
- erts_atomic32_t no;
- int size;
- ErtsSysFdType *array;
- } active_fd;
- erts_atomic_t removed_list; /* struct removed_fd* */
-}*pollsetv;
-
-#define NUM_OF_POLLSETS 1
+ ErtsPollSet *ps;
+ ErtsPollResFd *pollres;
+ int pollres_len;
+} ErtsPollThread;
-#ifdef ERTS_ENABLE_KERNEL_POLL
-void erts_init_check_io_kp(void);
-void erts_init_check_io_nkp(void);
-int ERTS_CIO_EXPORT(driver_select)(ErlDrvPort, ErlDrvEvent, int, int);
-int ERTS_CIO_EXPORT(enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, const ErlNifPid*, Eterm);
-Uint ERTS_CIO_EXPORT(erts_check_io_size)(void);
-Eterm ERTS_CIO_EXPORT(erts_check_io_info)(void *);
-int ERTS_CIO_EXPORT(erts_check_io_max_files)(void);
-void ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info*, int);
-void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info*, int, ErtsMonotonicTime);
-void ERTS_CIO_EXPORT(erts_check_io)(int);
-struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int);
-int ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *);
-void ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp);
-#ifdef ERTS_ENABLE_LOCK_COUNT
-void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable);
-#endif
+/* pollsetv contains pointers to the ErtsPollSets that are in use.
+ * Which pollset to use is determined by hashing the fd.
+ */
+static ErtsPollSet **pollsetv;
+#if ERTS_POLL_USE_FALLBACK
+static ErtsPollSet *flbk_pollset;
#endif
-
-/* ToDo: Was inline in erl_check_io.h but now need struct pollset_info */
-void
-ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp)
-{
- ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task);
- erts_aint_t ci_time = erts_atomic_read_acqb(&itp->pollset->check_io_time);
- erts_atomic_set_relb(&itp->executed_time, ci_time);
-}
-
-
-struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int sched_nr)
-{
- ASSERT(sched_nr > 0 && sched_nr <= erts_no_schedulers);
- return &pollsetv[sched_nr - 1];
-}
+static ErtsPollThread *psiv;
typedef struct {
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
SafeHashBucket hb;
#endif
ErtsSysFdType fd;
- struct pollset_info *pollset;
struct {
ErtsDrvSelectDataState *select; /* ERTS_EV_TYPE_DRV_SEL */
ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */
union {
erts_driver_t* drv_ptr; /* ERTS_EV_TYPE_STOP_USE */
ErtsResource* resource; /* ERTS_EV_TYPE_STOP_NIF */
- }stop;
+ } stop;
} driver;
- ErtsPollEvents events;
- unsigned short remove_cnt; /* number of removed_fd's referring to this fd */
+ ErtsPollEvents events; /* The events that have been selected upon */
+ ErtsPollEvents active_events; /* The events currently active in the pollset */
EventStateType type;
EventStateFlags flags;
} ErtsDrvEventState;
-struct removed_fd {
- struct removed_fd *next;
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- ErtsSysFdType fd;
-#else
- ErtsDrvEventState* state;
- #ifdef DEBUG
- ErtsSysFdType fd;
- #endif
-#endif
-
-};
-
struct drv_ev_state_shared {
- /* The layout of this struct must be independent of kp/nkp compilation */
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- int max_fds;
-#endif
-
-#define DRV_EV_STATE_LOCK_CNT 128
union {
erts_mtx_t lck;
byte _cache_line_alignment[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_mtx_t))];
- }locks[DRV_EV_STATE_LOCK_CNT];
+ } locks[ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT];
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ int max_fds;
erts_atomic_t len;
ErtsDrvEventState *v;
erts_mtx_t grow_lock; /* prevent lock-hogging of racing growers */
@@ -200,51 +156,121 @@ struct drv_ev_state_shared {
#endif
};
-#ifndef ERTS_KERNEL_POLL_VERSION
+int ERTS_WRITE_UNLIKELY(erts_no_pollsets) = 1;
+int ERTS_WRITE_UNLIKELY(erts_no_poll_threads) = 1;
struct drv_ev_state_shared drv_ev_state;
-#else
-extern struct drv_ev_state_shared drv_ev_state;
-#endif
-static ERTS_INLINE erts_mtx_t* fd_mtx(ErtsSysFdType fd)
-{
+static ERTS_INLINE int fd_hash(ErtsSysFdType fd) {
int hash = (int)fd;
# ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
hash ^= (hash >> 9);
# endif
- return &drv_ev_state.locks[hash % DRV_EV_STATE_LOCK_CNT].lck;
+ return hash;
}
-#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
+static ERTS_INLINE erts_mtx_t* fd_mtx(ErtsSysFdType fd)
+{
+ return &drv_ev_state.locks[fd_hash(fd) % ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT].lck;
+}
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+
+static ERTS_INLINE ErtsDrvEventState *get_drv_ev_state(ErtsSysFdType fd)
+{
+ return &drv_ev_state.v[(int) fd];
+}
+
+#define new_drv_ev_state(State, fd) (State)
+#define erase_drv_ev_state(State)
+
+static ERTS_INLINE int grow_drv_ev_state(ErtsSysFdType fd) {
+ int i;
+ int old_len;
+ int new_len;
+
+ if ((unsigned)fd >= (unsigned)erts_atomic_read_nob(&drv_ev_state.len)) {
+
+ if (fd < 0 || fd >= drv_ev_state.max_fds)
+ return 0;
-static ERTS_INLINE ErtsDrvEventState *hash_get_drv_ev_state(ErtsSysFdType fd)
+ erts_mtx_lock(&drv_ev_state.grow_lock);
+ old_len = erts_atomic_read_nob(&drv_ev_state.len);
+ if (fd >= old_len) {
+ new_len = erts_poll_new_table_len(old_len, fd + 1);
+ if (new_len > drv_ev_state.max_fds)
+ new_len = drv_ev_state.max_fds;
+
+ for (i=0; i<ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; i++) { /* lock all fd's */
+ erts_mtx_lock(&drv_ev_state.locks[i].lck);
+ }
+ drv_ev_state.v = (drv_ev_state.v
+ ? erts_realloc(ERTS_ALC_T_DRV_EV_STATE,
+ drv_ev_state.v,
+ sizeof(ErtsDrvEventState)*new_len)
+ : erts_alloc(ERTS_ALC_T_DRV_EV_STATE,
+ sizeof(ErtsDrvEventState)*new_len));
+ ERTS_CT_ASSERT(ERTS_EV_TYPE_NONE == 0);
+ sys_memzero(drv_ev_state.v+old_len,
+ sizeof(ErtsDrvEventState) * (new_len - old_len));
+ for (i = old_len; i < new_len; i++) {
+ drv_ev_state.v[i].fd = (ErtsSysFdType) i;
+ }
+ erts_atomic_set_nob(&drv_ev_state.len, new_len);
+ for (i=0; i<ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; i++) {
+ erts_mtx_unlock(&drv_ev_state.locks[i].lck);
+ }
+ }
+ /*else already grown by racing thread */
+
+ erts_mtx_unlock(&drv_ev_state.grow_lock);
+ }
+ return 1;
+}
+
+static int drv_ev_state_len(void)
+{
+ return erts_atomic_read_nob(&drv_ev_state.len);
+}
+
+#else /* !ERTS_SYS_CONTINOUS_FD_NUMBERS */
+
+static ERTS_INLINE ErtsDrvEventState *get_drv_ev_state(ErtsSysFdType fd)
{
ErtsDrvEventState tmpl;
tmpl.fd = fd;
return (ErtsDrvEventState *) safe_hash_get(&drv_ev_state.tab, (void *) &tmpl);
}
-static ERTS_INLINE ErtsDrvEventState* hash_new_drv_ev_state(ErtsSysFdType fd)
+static ERTS_INLINE ErtsDrvEventState* new_drv_ev_state(ErtsDrvEventState *state,
+ ErtsSysFdType fd)
{
ErtsDrvEventState tmpl;
+
+ if (state)
+ return state;
+
tmpl.fd = fd;
- tmpl.pollset = NULL;
tmpl.driver.select = NULL;
tmpl.driver.nif = NULL;
tmpl.driver.stop.drv_ptr = NULL;
tmpl.events = 0;
- tmpl.remove_cnt = 0;
+ tmpl.active_events = 0;
tmpl.type = ERTS_EV_TYPE_NONE;
tmpl.flags = 0;
+
return (ErtsDrvEventState *) safe_hash_put(&drv_ev_state.tab, (void *) &tmpl);
}
-static ERTS_INLINE void hash_erase_drv_ev_state(ErtsDrvEventState *state)
+static ERTS_INLINE void erase_drv_ev_state(ErtsDrvEventState *state)
{
- ASSERT(state->remove_cnt == 0);
safe_hash_erase(&drv_ev_state.tab, (void *) state);
}
+static int drv_ev_state_len(void)
+{
+ return erts_atomic_read_nob(&drv_ev_state.tab.nitems);
+}
+
#endif /* !ERTS_SYS_CONTINOUS_FD_NUMBERS */
static void stale_drv_select(Eterm id, ErtsDrvEventState *state, int mode);
@@ -268,36 +294,39 @@ steal_pending_stop_use(erts_dsprintf_buf_t*, ErlDrvPort, ErtsDrvEventState*,
static void
steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource*,
ErtsDrvEventState *state, int mode, int on);
-
-ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST)
+static ERTS_INLINE void
+check_fd_cleanup(ErtsDrvEventState *state,
+ ErtsDrvSelectDataState **free_select,
+ ErtsNifSelectDataState **free_nif);
+#ifdef DEBUG_PRINT_MODE
+static char *drvmode2str(int mode);
+static char *nifmode2str(enum ErlNifSelectFlags mode);
+#endif
static ERTS_INLINE void
-init_iotask(ErtsIoTask *io_task, struct pollset_info* psi)
+init_iotask(ErtsIoTask *io_task, ErtsSysFdType fd)
{
erts_port_task_handle_init(&io_task->task);
- erts_atomic_init_nob(&io_task->executed_time, ~((erts_aint_t) 0));
- io_task->pollset = psi;
+ io_task->fd = fd;
}
static ERTS_INLINE int
-is_iotask_active(ErtsIoTask *io_task, erts_aint_t current_cio_time)
-{
+is_iotask_active(ErtsIoTask *io_task)
+{
if (erts_port_task_is_scheduled(&io_task->task))
return 1;
- if (erts_atomic_read_nob(&io_task->executed_time) == current_cio_time)
- return 1;
return 0;
}
static ERTS_INLINE ErtsDrvSelectDataState *
-alloc_drv_select_data(struct pollset_info* psi)
+alloc_drv_select_data(ErtsSysFdType fd)
{
ErtsDrvSelectDataState *dsp = erts_alloc(ERTS_ALC_T_DRV_SEL_D_STATE,
sizeof(ErtsDrvSelectDataState));
dsp->inport = NIL;
dsp->outport = NIL;
- init_iotask(&dsp->iniotask, psi);
- init_iotask(&dsp->outiotask, psi);
+ init_iotask(&dsp->iniotask, fd);
+ init_iotask(&dsp->outiotask, fd);
return dsp;
}
@@ -308,8 +337,6 @@ alloc_nif_select_data(void)
sizeof(ErtsNifSelectDataState));
dsp->in.pid = NIL;
dsp->out.pid = NIL;
- dsp->in.ddeselect_cnt = 0;
- dsp->out.ddeselect_cnt = 0;
return dsp;
}
@@ -327,176 +354,126 @@ free_nif_select_data(ErtsNifSelectDataState *dsp)
erts_free(ERTS_ALC_T_NIF_SEL_D_STATE, dsp);
}
-static ERTS_INLINE void
-remember_removed(ErtsDrvEventState *state)
+static ERTS_INLINE int
+get_pollset_id(ErtsSysFdType fd)
{
- struct removed_fd *fdlp;
- ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd)));
- if (erts_atomic_read_nob(&state->pollset->in_poll_wait)) {
- erts_aint_t was_next, exp_next;
- state->remove_cnt++;
- ASSERT(state->remove_cnt > 0);
- fdlp = removed_fd_alloc();
- #if defined(ERTS_SYS_CONTINOUS_FD_NUMBERS) || defined(DEBUG)
- fdlp->fd = state->fd;
- #endif
- #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
- fdlp->state = state;
- #endif
-
- /* Lockless atomic insertion in removed_list */
- was_next = erts_atomic_read_acqb(&state->pollset->removed_list);
- do {
- exp_next = was_next;
- fdlp->next = (struct removed_fd*) exp_next;
- was_next = erts_atomic_cmpxchg_mb(&state->pollset->removed_list,
- (erts_aint_t) fdlp,
- exp_next);
- }while (was_next != exp_next);
- }
+ return fd_hash(fd) % erts_no_pollsets;
}
+static ERTS_INLINE ErtsPollSet *
+get_pollset(ErtsSysFdType fd)
+{
+ return pollsetv[get_pollset_id(fd)];
+}
-static ERTS_INLINE int
-is_removed(ErtsDrvEventState *state)
+#if ERTS_POLL_USE_FALLBACK
+static ERTS_INLINE ErtsPollSet *
+get_fallback(void)
{
- /* Note that there is a possible race here, where an fd is removed
- (increasing remove_cnt) and then added again just before erts_poll_wait
- is called by erts_check_io. Any polled event on the re-added fd will then
- be falsely ignored. But that does not matter, as the event will trigger
- again next time erl_check_io is called. */
- return state->remove_cnt > 0;
+ return flbk_pollset;
}
+#endif
-static void
-forget_removed(struct pollset_info* psi)
+/*
+ * Place a fd within a pollset. This will automatically use
+ * the fallback ps if needed.
+ */
+static ERTS_INLINE ErtsPollEvents
+erts_io_control_wakeup(ErtsDrvEventState *state, ErtsPollOp op,
+ ErtsPollEvents pe, int *wake_poller)
{
- struct removed_fd* fdlp;
- struct removed_fd* tofree;
+ ErtsSysFdType fd = state->fd;
+ ErtsPollEvents res = 0;
+ EventStateFlags flags = state->flags;
- fdlp = (struct removed_fd*) erts_atomic_xchg_mb(&psi->removed_list,
- (erts_aint_t) NULL);
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd)));
- while (fdlp) {
- ErtsResource* resource = NULL;
- erts_driver_t* drv_ptr = NULL;
- erts_mtx_t* mtx;
- ErtsSysFdType fd;
- ErtsDrvEventState *state;
+ if (!(flags & ERTS_EV_FLAG_FALLBACK)) {
+ res = erts_poll_control(get_pollset(fd), fd, op, pe, wake_poller);
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- fd = fdlp->fd;
- mtx = fd_mtx(fd);
- erts_mtx_lock(mtx);
- state = &drv_ev_state.v[(int) fd];
-#else
- state = fdlp->state;
- fd = state->fd;
- ASSERT(fd == fdlp->fd);
- mtx = fd_mtx(fd);
- erts_mtx_lock(mtx);
-#endif
- ASSERT(state->remove_cnt > 0);
- if (--state->remove_cnt == 0) {
- switch (state->type) {
- case ERTS_EV_TYPE_STOP_NIF:
- /* Now we can call stop */
- resource = state->driver.stop.resource;
- state->driver.stop.resource = NULL;
- ASSERT(resource);
- state->type = ERTS_EV_TYPE_NONE;
- state->flags = 0;
- goto case_ERTS_EV_TYPE_NONE;
-
- case ERTS_EV_TYPE_STOP_USE:
- /* Now we can call stop_select */
- drv_ptr = state->driver.stop.drv_ptr;
- ASSERT(drv_ptr);
- state->type = ERTS_EV_TYPE_NONE;
- state->flags = 0;
- state->driver.stop.drv_ptr = NULL;
- /* Fall through */
- case ERTS_EV_TYPE_NONE:
- case_ERTS_EV_TYPE_NONE:
- state->pollset = NULL;
-#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
- hash_erase_drv_ev_state(state);
-#endif
- break;
- case ERTS_EV_TYPE_DRV_SEL:
- break;
- default:
- ASSERT(0);
- }
- }
- erts_mtx_unlock(mtx);
- if (drv_ptr) {
- int was_unmasked = erts_block_fpe();
- DTRACE1(driver_stop_select, drv_ptr->name);
- LTTNG1(driver_stop_select, drv_ptr->name);
- (*drv_ptr->stop_select) ((ErlDrvEvent) fd, NULL);
- erts_unblock_fpe(was_unmasked);
- if (drv_ptr->handle) {
- erts_ddll_dereference_driver(drv_ptr->handle);
- }
- }
- if (resource) {
- erts_resource_stop(resource, (ErlNifEvent)fd, 0);
- enif_release_resource(resource->data);
+#if ERTS_POLL_USE_FALLBACK
+ if (op == ERTS_POLL_OP_ADD && res == ERTS_POLL_EV_NVAL) {
+ /* When an add fails with NVAL, the poll/kevent operation could not
+ put that fd in the pollset, so we instead put it into a fallback pollset */
+ state->flags |= ERTS_EV_FLAG_FALLBACK;
+ res = erts_poll_control_flbk(get_fallback(), fd, op, pe, wake_poller);
}
-
- tofree = fdlp;
- fdlp = fdlp->next;
- removed_fd_free(tofree);
+ } else {
+ ASSERT(op != ERTS_POLL_OP_ADD);
+ res = erts_poll_control_flbk(get_fallback(), fd, op, pe, wake_poller);
+#endif
}
+
+ return res;
}
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
-static void
-grow_drv_ev_state(int min_ix)
+static ERTS_INLINE ErtsPollEvents
+erts_io_control(ErtsDrvEventState *state, ErtsPollOp op, ErtsPollEvents pe)
{
- int i;
- int old_len;
- int new_len;
+ int wake_poller = 0;
+ return erts_io_control_wakeup(state, op, pe, &wake_poller);
+}
- erts_mtx_lock(&drv_ev_state.grow_lock);
- old_len = erts_atomic_read_nob(&drv_ev_state.len);
- if (min_ix >= old_len) {
- new_len = erts_poll_new_table_len(old_len, min_ix + 1);
- if (new_len > drv_ev_state.max_fds)
- new_len = drv_ev_state.max_fds;
-
- for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) { /* lock all fd's */
- erts_mtx_lock(&drv_ev_state.locks[i].lck);
- }
- drv_ev_state.v = (drv_ev_state.v
- ? erts_realloc(ERTS_ALC_T_DRV_EV_STATE,
- drv_ev_state.v,
- sizeof(ErtsDrvEventState)*new_len)
- : erts_alloc(ERTS_ALC_T_DRV_EV_STATE,
- sizeof(ErtsDrvEventState)*new_len));
- for (i = old_len; i < new_len; i++) {
- drv_ev_state.v[i].fd = (ErtsSysFdType) i;
- drv_ev_state.v[i].pollset = NULL;
- drv_ev_state.v[i].driver.select = NULL;
- drv_ev_state.v[i].driver.stop.drv_ptr = NULL;
- drv_ev_state.v[i].driver.nif = NULL;
- drv_ev_state.v[i].events = 0;
- drv_ev_state.v[i].remove_cnt = 0;
- drv_ev_state.v[i].type = ERTS_EV_TYPE_NONE;
- drv_ev_state.v[i].flags = 0;
- }
- erts_atomic_set_nob(&drv_ev_state.len, new_len);
- for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) {
- erts_mtx_unlock(&drv_ev_state.locks[i].lck);
- }
+/* ToDo: Was inline in erl_check_io.h but now need struct erts_poll_thread */
+void
+erts_io_notify_port_task_executed(ErtsPortTaskType type,
+ ErtsPortTaskHandle *pthp,
+ void (*reset_handle)(ErtsPortTaskHandle *))
+{
+ ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task);
+ ErtsSysFdType fd = itp->fd;
+ erts_mtx_t *mtx = fd_mtx(fd);
+ int active_events;
+ ErtsDrvEventState *state;
+ ErtsDrvSelectDataState *free_select = NULL;
+ ErtsNifSelectDataState *free_nif = NULL;
+
+ erts_mtx_lock(mtx);
+ state = get_drv_ev_state(fd);
+
+ active_events = state->active_events;
+
+ switch (type) {
+ case ERTS_PORT_TASK_INPUT:
+
+ DEBUG_PRINT_FD("executed ready_input", state);
+
+ ASSERT(!(state->active_events & ERTS_POLL_EV_IN));
+ if (state->events & ERTS_POLL_EV_IN)
+ active_events |= ERTS_POLL_EV_IN;
+ break;
+ case ERTS_PORT_TASK_OUTPUT:
+
+ DEBUG_PRINT_FD("executed ready_output", state);
+
+ ASSERT(!(state->active_events & ERTS_POLL_EV_OUT));
+ if (state->events & ERTS_POLL_EV_OUT)
+ active_events |= ERTS_POLL_EV_OUT;
+ break;
+ default:
+ erts_exit(ERTS_ABORT_EXIT, "Invalid IO port task type");
+ break;
}
- /*else already grown by racing thread */
- erts_mtx_unlock(&drv_ev_state.grow_lock);
-}
-#endif /* ERTS_SYS_CONTINOUS_FD_NUMBERS */
+ reset_handle(pthp);
+
+ if (active_events) {
+ /* This is not needed if active_events has not changed */
+ if (state->active_events != active_events) {
+ state->active_events = active_events;
+ erts_io_control(state, ERTS_POLL_OP_MOD, active_events);
+ }
+ } else {
+ check_fd_cleanup(state, &free_select, &free_nif);
+ }
+ erts_mtx_unlock(mtx);
+
+ if (free_select)
+ free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
+}
static ERTS_INLINE void
abort_task(Eterm id, ErtsPortTaskHandle *pthp, EventStateType type)
@@ -542,16 +519,14 @@ abort_tasks(ErtsDrvEventState *state, int mode)
static void
deselect(ErtsDrvEventState *state, int mode)
{
- int do_wake = 0;
ErtsPollEvents rm_events;
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd)));
- ASSERT(state->events);
abort_tasks(state, mode);
- if (!mode)
+ if (!mode) {
rm_events = state->events;
- else {
+ } else {
rm_events = 0;
ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL);
if (mode & ERL_DRV_READ) {
@@ -564,18 +539,16 @@ deselect(ErtsDrvEventState *state, int mode)
}
}
- ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, rm_events,
- 0, &do_wake);
state->events &= ~rm_events;
+ state->active_events &= ~rm_events;
if (!(state->events)) {
+ erts_io_control(state, ERTS_POLL_OP_DEL, 0);
switch (state->type) {
case ERTS_EV_TYPE_NIF:
state->driver.nif->in.pid = NIL;
state->driver.nif->out.pid = NIL;
- state->driver.nif->in.ddeselect_cnt = 0;
- state->driver.nif->out.ddeselect_cnt = 0;
- enif_release_resource(state->driver.stop.resource);
+ enif_release_resource(state->driver.stop.resource->data);
state->driver.stop.resource = NULL;
break;
case ERTS_EV_TYPE_DRV_SEL:
@@ -588,15 +561,15 @@ deselect(ErtsDrvEventState *state, int mode)
ASSERT(0);
break;
}
-
state->type = ERTS_EV_TYPE_NONE;
state->flags = 0;
- remember_removed(state);
+ } else {
+ erts_io_control(state, ERTS_POLL_OP_MOD, state->active_events);
}
}
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
-# define IS_FD_UNKNOWN(state) ((state)->type == ERTS_EV_TYPE_NONE && (state)->remove_cnt == 0)
+# define IS_FD_UNKNOWN(state) ((state)->type == ERTS_EV_TYPE_NONE)
#else
# define IS_FD_UNKNOWN(state) ((state) == NULL)
#endif
@@ -606,17 +579,13 @@ check_fd_cleanup(ErtsDrvEventState *state,
ErtsDrvSelectDataState **free_select,
ErtsNifSelectDataState **free_nif)
{
- erts_aint_t current_cio_time;
-
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd)));
-
- current_cio_time = erts_atomic_read_acqb(&state->pollset->check_io_time);
*free_select = NULL;
if (state->driver.select
&& (state->type != ERTS_EV_TYPE_DRV_SEL)
- && !is_iotask_active(&state->driver.select->iniotask, current_cio_time)
- && !is_iotask_active(&state->driver.select->outiotask, current_cio_time)) {
-
+ && !is_iotask_active(&state->driver.select->iniotask)
+ && !is_iotask_active(&state->driver.select->outiotask)) {
+
*free_select = state->driver.select;
state->driver.select = NULL;
}
@@ -628,14 +597,10 @@ check_fd_cleanup(ErtsDrvEventState *state,
}
if (((state->type != ERTS_EV_TYPE_NONE)
- | state->remove_cnt
| (state->driver.nif != NULL)
| (state->driver.select != NULL)) == 0) {
- state->pollset = NULL;
-#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
- hash_erase_drv_ev_state(state);
-#endif
+ erase_drv_ev_state(state);
}
}
@@ -645,253 +610,8 @@ check_fd_cleanup(ErtsDrvEventState *state,
# define MUST_DEFER(MAY_SLEEP) (MAY_SLEEP)
#endif
-static ERTS_INLINE int
-check_cleanup_active_fd(struct pollset_info* psi,
- ErtsSysFdType fd,
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- ErtsPollControlEntry *pce,
- int *pce_ix,
-#endif
- erts_aint_t current_cio_time,
- int may_sleep)
-{
- ErtsDrvEventState *state;
- int active = 0;
- erts_mtx_t *mtx = fd_mtx(fd);
- void *free_select = NULL;
- void *free_nif = NULL;
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- ErtsPollEvents evon = 0, evoff = 0;
-#endif
-
- erts_mtx_lock(mtx);
-
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- state = &drv_ev_state.v[(int) fd];
-#else
- state = hash_get_drv_ev_state(fd); /* may be NULL! */
-#endif
- if (state && state->pollset == psi)
- {
- if (state->driver.select) {
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- if (is_iotask_active(&state->driver.select->iniotask, current_cio_time)) {
- active = 1;
- if (MUST_DEFER(may_sleep)
- && (state->events & ERTS_POLL_EV_IN)
- && !(state->flags & ERTS_EV_FLAG_DEFER_IN_EV)) {
- evoff |= ERTS_POLL_EV_IN;
- state->flags |= ERTS_EV_FLAG_DEFER_IN_EV;
- }
- }
- else if (state->flags & ERTS_EV_FLAG_DEFER_IN_EV) {
- if (state->events & ERTS_POLL_EV_IN)
- evon |= ERTS_POLL_EV_IN;
- state->flags &= ~ERTS_EV_FLAG_DEFER_IN_EV;
- }
- if (is_iotask_active(&state->driver.select->outiotask, current_cio_time)) {
- active = 1;
- if (MUST_DEFER(may_sleep)
- && (state->events & ERTS_POLL_EV_OUT)
- && !(state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)) {
- evoff |= ERTS_POLL_EV_OUT;
- state->flags |= ERTS_EV_FLAG_DEFER_OUT_EV;
- }
- }
- else if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) {
- if (state->events & ERTS_POLL_EV_OUT) {
- evon |= ERTS_POLL_EV_OUT;
- }
- state->flags &= ~ERTS_EV_FLAG_DEFER_OUT_EV;
- }
- if (active)
- (void) 0;
- else
-#else
- if (is_iotask_active(&state->driver.select->iniotask, current_cio_time)
- || is_iotask_active(&state->driver.select->outiotask, current_cio_time))
- active = 1;
- else
-#endif
- if (state->type != ERTS_EV_TYPE_DRV_SEL) {
- free_select = state->driver.select;
- state->driver.select = NULL;
- }
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- if (evon) {
- int do_wake = 0;
- ERTS_CIO_POLL_CTL(psi->ps, state->fd, evon, 1, &do_wake);
- }
-#endif
- }
-
- if (state->driver.nif) {
- ErtsPollEvents rm_events = 0;
- if (state->driver.nif->in.ddeselect_cnt) {
- ASSERT(state->type == ERTS_EV_TYPE_NIF);
- ASSERT(state->events & ERTS_POLL_EV_IN);
- ASSERT(is_nil(state->driver.nif->in.pid));
- if (may_sleep || state->driver.nif->in.ddeselect_cnt == 1) {
- rm_events = ERTS_POLL_EV_IN;
- state->driver.nif->in.ddeselect_cnt = 0;
- }
- }
- if (state->driver.nif->out.ddeselect_cnt) {
- ASSERT(state->type == ERTS_EV_TYPE_NIF);
- ASSERT(state->events & ERTS_POLL_EV_OUT);
- ASSERT(is_nil(state->driver.nif->out.pid));
- if (may_sleep || state->driver.nif->out.ddeselect_cnt == 1) {
- rm_events |= ERTS_POLL_EV_OUT;
- state->driver.nif->out.ddeselect_cnt = 0;
- }
- }
- if (rm_events) {
- int do_wake = 0;
- state->events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd,
- rm_events, 0, &do_wake);
- }
- if (state->events)
- active = 1;
- else if (state->type != ERTS_EV_TYPE_NIF) {
- free_nif = state->driver.nif;
- state->driver.nif = NULL;
- }
- }
-
-#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if (((state->type != ERTS_EV_TYPE_NONE) | state->remove_cnt | active) == 0)
- hash_erase_drv_ev_state(state);
-#endif
- }
-
- erts_mtx_unlock(mtx);
-
- if (free_select)
- free_drv_select_data(free_select);
- if (free_nif)
- free_nif_select_data(free_nif);
-
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- if (evoff) {
- ErtsPollControlEntry *pcep = &pce[(*pce_ix)++];
- pcep->fd = fd;
- pcep->events = evoff;
- pcep->on = 0;
- }
-#endif
-
- return active;
-}
-
-static void
-check_cleanup_active_fds(struct pollset_info* psi,
- erts_aint_t current_cio_time, int may_sleep)
-{
- int six = psi->active_fd.six;
- int eix = psi->active_fd.eix;
- erts_aint32_t no = erts_atomic32_read_dirty(&psi->active_fd.no);
- const int size = psi->active_fd.size;
- int ix = six;
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- /* every fd might add one entry */
- Uint pce_sz = sizeof(ErtsPollControlEntry)*no;
- ErtsPollControlEntry *pctrl_entries = (pce_sz
- ? erts_alloc(ERTS_ALC_T_TMP, pce_sz)
- : NULL);
- int pctrl_ix = 0;
-#endif
-
- while (ix != eix) {
- ErtsSysFdType fd = psi->active_fd.array[ix];
- int nix = ix + 1;
- if (nix >= size)
- nix = 0;
- ASSERT(fd != ERTS_SYS_FD_INVALID);
- if (!check_cleanup_active_fd(psi, fd,
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- pctrl_entries,
- &pctrl_ix,
-#endif
- current_cio_time,
- may_sleep)) {
- no--;
- if (ix == six) {
-#ifdef DEBUG
- psi->active_fd.array[ix] = ERTS_SYS_FD_INVALID;
-#endif
- six = nix;
- }
- else {
- psi->active_fd.array[ix] = psi->active_fd.array[six];
-#ifdef DEBUG
- psi->active_fd.array[six] = ERTS_SYS_FD_INVALID;
-#endif
- six++;
- if (six >= size)
- six = 0;
- }
- }
- ix = nix;
- }
-
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- ASSERT(pctrl_ix <= pce_sz/sizeof(ErtsPollControlEntry));
- if (pctrl_ix)
- ERTS_CIO_POLL_CTLV(psi->ps, pctrl_entries, pctrl_ix);
- if (pctrl_entries)
- erts_free(ERTS_ALC_T_TMP, pctrl_entries);
-#endif
-
- psi->active_fd.six = six;
- psi->active_fd.eix = eix;
- erts_atomic32_set_relb(&psi->active_fd.no, no);
-}
-
-static void grow_active_fds(struct pollset_info *psi)
-{
- ASSERT(psi->active_fd.six == psi->active_fd.eix);
- psi->active_fd.six = 0;
- psi->active_fd.eix = psi->active_fd.size;
- psi->active_fd.size += ERTS_ACTIVE_FD_INC;
- psi->active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR,
- psi->active_fd.array,
- psi->active_fd.size*sizeof(ErtsSysFdType));
-#ifdef DEBUG
- {
- int i;
- for (i = psi->active_fd.eix + 1; i < psi->active_fd.size; i++)
- psi->active_fd.array[i] = ERTS_SYS_FD_INVALID;
- }
-#endif
-}
-
-static ERTS_INLINE void
-add_active_fd(struct pollset_info *psi, ErtsSysFdType fd)
-{
- int eix = psi->active_fd.eix;
- const int size = psi->active_fd.size;
-
- psi->active_fd.array[eix] = fd;
-
- erts_atomic32_set_relb(&psi->active_fd.no,
- (erts_atomic32_read_dirty(&psi->active_fd.no)
- + 1));
-
- eix++;
- if (eix >= size)
- eix = 0;
- psi->active_fd.eix = eix;
-
- if (psi->active_fd.six == eix) {
- grow_active_fds(psi);
- }
-}
-
int
-ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
- ErlDrvEvent e,
- int mode,
- int on)
+driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on)
{
void (*stop_select_fn)(ErlDrvEvent, void*) = NULL;
Port *prt = erts_drvport2port(ix);
@@ -899,6 +619,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
ErtsSysFdType fd = (ErtsSysFdType) e;
ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
ErtsPollEvents old_events;
+ ErtsPollOp ctl_op = ERTS_POLL_OP_MOD;
ErtsDrvEventState *state;
int wake_poller = 0;
int ret;
@@ -907,32 +628,29 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(name, 64);
#endif
+ ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_CHECK_IO);
- if (prt == ERTS_INVALID_ERL_DRV_PORT)
+ if (prt == ERTS_INVALID_ERL_DRV_PORT) {
+ ERTS_MSACC_POP_STATE();
return -1;
+ }
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if ((unsigned)fd >= (unsigned)erts_atomic_read_nob(&drv_ev_state.len)) {
- if (fd < 0) {
- return -1;
- }
- if (fd >= drv_ev_state.max_fds) {
- drv_select_large_fd_error(ix, fd, mode, on);
- return -1;
- }
- grow_drv_ev_state(fd);
+ if (!grow_drv_ev_state(fd)) {
+ if (fd > 0) drv_select_large_fd_error(ix, fd, mode, on);
+ ERTS_MSACC_POP_STATE();
+ return -1;
}
#endif
erts_mtx_lock(fd_mtx(fd));
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- state = &drv_ev_state.v[(int) fd];
-#else
- state = hash_get_drv_ev_state(fd); /* may be NULL! */
-#endif
+ state = get_drv_ev_state(fd); /* may be NULL! */
+
+ DEBUG_PRINT_FD("driver_select(%T, %p, %s, %d)",
+ state, id, fd, drvmode2str(mode), on);
if (!on) {
if (IS_FD_UNKNOWN(state)) {
@@ -950,15 +668,10 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
}
else if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
mode |= (ERL_DRV_READ | ERL_DRV_WRITE);
- wake_poller = 1; /* to eject fd from pollset (if needed) */
}
}
-#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if (state == NULL) {
- state = hash_new_drv_ev_state(fd);
- }
-#endif
+ state = new_drv_ev_state(state, fd);
switch (state->type) {
case ERTS_EV_TYPE_NIF:
@@ -982,7 +695,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
ASSERT(state->type == ERTS_EV_TYPE_NONE);
break;
- }}
+ }
+ default: break;
+ }
if (mode & ERL_DRV_READ) {
if (state->type == ERTS_EV_TYPE_DRV_SEL) {
@@ -999,7 +714,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
drv_select_steal(ix, state, mode, on);
}
ctl_events |= ERTS_POLL_EV_OUT;
- }
+ }
ASSERT((state->type == ERTS_EV_TYPE_DRV_SEL) ||
@@ -1010,32 +725,31 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
if (on) {
ctl_events &= ~old_events;
state->events |= ctl_events;
+ if (ctl_events & ERTS_POLL_EV_IN && (!state->driver.select || !is_iotask_active(&state->driver.select->iniotask)))
+ state->active_events |= ERTS_POLL_EV_IN;
+ if (ctl_events & ERTS_POLL_EV_OUT && (!state->driver.select || !is_iotask_active(&state->driver.select->outiotask)))
+ state->active_events |= ERTS_POLL_EV_OUT;
+ if (old_events == 0 && !(state->flags & ERTS_EV_FLAG_USED)) {
+ ctl_op = ERTS_POLL_OP_ADD;
+ }
}
else {
ctl_events &= old_events;
state->events &= ~ctl_events;
+ state->active_events &= ~ctl_events;
- if (!(state->flags & ERTS_EV_FLAG_USED)
- && old_events && !state->events) {
- /*
- * Old driver removing all events. At least wake poller.
- * It will not make close() 100% safe but it will prevent
- * actions delayed by poll timeout.
- */
- wake_poller = 1;
+ if (!state->events) {
+ if (!(state->flags & ERTS_EV_FLAG_USED) || mode & ERL_DRV_USE)
+ ctl_op = ERTS_POLL_OP_DEL;
}
}
- if (ctl_events) {
+ if (ctl_events || ctl_op == ERTS_POLL_OP_DEL) {
ErtsPollEvents new_events;
- if (!state->pollset) {
- ErtsSchedulerData* esdp = erts_get_scheduler_data();
- ASSERT(esdp);
- state->pollset = esdp->pollset;
- }
-
- new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller);
+ new_events = erts_io_control_wakeup(state, ctl_op,
+ state->active_events,
+ &wake_poller);
if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
if (state->type == ERTS_EV_TYPE_DRV_SEL && !old_events) {
@@ -1048,14 +762,13 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
goto done;
}
- ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL
- || state->type == ERTS_EV_TYPE_NONE);
+ ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL || state->type == ERTS_EV_TYPE_NONE);
}
if (on) {
if (ctl_events) {
if (!state->driver.select)
- state->driver.select = alloc_drv_select_data(state->pollset);
+ state->driver.select = alloc_drv_select_data(state->fd);
if (state->type == ERTS_EV_TYPE_NONE)
state->type = ERTS_EV_TYPE_DRV_SEL;
ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL);
@@ -1069,47 +782,44 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
}
}
else { /* off */
- if (state->type == ERTS_EV_TYPE_DRV_SEL) {
- if (ctl_events & ERTS_POLL_EV_IN) {
- abort_tasks(state, ERL_DRV_READ);
- state->driver.select->inport = NIL;
- }
- if (ctl_events & ERTS_POLL_EV_OUT) {
- abort_tasks(state, ERL_DRV_WRITE);
- state->driver.select->outport = NIL;
- }
- if (state->events == 0) {
- if (old_events != 0) {
- remember_removed(state);
- }
- if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) {
- state->type = ERTS_EV_TYPE_NONE;
- state->flags = 0;
- }
- /*else keep it, as fd will probably be selected upon again */
- }
- }
- if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
- erts_driver_t* drv_ptr = prt->drv_ptr;
- ASSERT(state->events==0);
- if (state->remove_cnt == 0 || !wake_poller) {
- /* Safe to close fd now as it is not in pollset
- or there was no need to eject fd (kernel poll) */
- stop_select_fn = drv_ptr->stop_select;
+ if (state->type == ERTS_EV_TYPE_DRV_SEL) {
+ if (ctl_events & ERTS_POLL_EV_IN) {
+ abort_tasks(state, ERL_DRV_READ);
+ state->driver.select->inport = NIL;
+ }
+ if (ctl_events & ERTS_POLL_EV_OUT) {
+ abort_tasks(state, ERL_DRV_WRITE);
+ state->driver.select->outport = NIL;
+ }
+ if (state->events == 0) {
+ if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) {
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags = 0;
+ }
+ /*else keep it, as fd will probably be selected upon again */
+ }
+ }
+ if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
+ erts_driver_t* drv_ptr = prt->drv_ptr;
+ ASSERT(state->events==0);
+ if (!wake_poller) {
+ /* Safe to close fd now as it is not in pollset
+ or there was no need to eject fd (kernel poll) */
+ stop_select_fn = drv_ptr->stop_select;
#ifdef USE_VM_PROBES
- strncpy(name, prt->drv_ptr->name, sizeof(name)-1);
- name[sizeof(name)-1] = '\0';
+ strncpy(name, prt->drv_ptr->name, sizeof(name)-1);
+ name[sizeof(name)-1] = '\0';
#endif
- }
- else {
- /* Not safe to close fd, postpone stop_select callback. */
- state->type = ERTS_EV_TYPE_STOP_USE;
- state->driver.stop.drv_ptr = drv_ptr;
- if (drv_ptr->handle) {
- erts_ddll_reference_referenced_driver(drv_ptr->handle);
- }
- }
- }
+ }
+ else {
+ /* Not safe to close fd, postpone stop_select callback. */
+ state->type = ERTS_EV_TYPE_STOP_USE;
+ state->driver.stop.drv_ptr = drv_ptr;
+ if (drv_ptr->handle) {
+ erts_ddll_reference_referenced_driver(drv_ptr->handle);
+ }
+ }
+ }
}
ret = 0;
@@ -1134,54 +844,47 @@ done_unknown:
if (free_nif)
free_nif_select_data(free_nif);
+ ERTS_MSACC_POP_STATE();
+
return ret;
}
int
-ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
- ErlNifEvent e,
- enum ErlNifSelectFlags mode,
- void* obj,
- const ErlNifPid* pid,
- Eterm ref)
+enif_select(ErlNifEnv* env,
+ ErlNifEvent e,
+ enum ErlNifSelectFlags mode,
+ void* obj,
+ const ErlNifPid* pid,
+ Eterm ref)
{
int on;
ErtsResource* resource = DATA_TO_RESOURCE(obj);
ErtsSysFdType fd = (ErtsSysFdType) e;
ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
ErtsPollEvents old_events;
+ ErtsPollOp ctl_op = ERTS_POLL_OP_MOD;
ErtsDrvEventState *state;
- int wake_poller;
- int ret;
+ int ret, wake_poller = 0;
enum { NO_STOP=0, CALL_STOP, CALL_STOP_AND_RELEASE } call_stop = NO_STOP;
ErtsDrvSelectDataState *free_select = NULL;
ErtsNifSelectDataState *free_nif = NULL;
-#ifdef USE_VM_PROBES
- DTRACE_CHARBUF(name, 64);
-#endif
ASSERT(!(resource->monitors && resource->monitors->is_dying));
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if ((unsigned)fd >= (unsigned)erts_atomic_read_nob(&drv_ev_state.len)) {
- if (fd < 0) {
- return INT_MIN | ERL_NIF_SELECT_INVALID_EVENT;
- }
- if (fd >= drv_ev_state.max_fds) {
- nif_select_large_fd_error(fd, mode, resource, ref);
- return INT_MIN | ERL_NIF_SELECT_INVALID_EVENT;
- }
- grow_drv_ev_state(fd);
+ if (!grow_drv_ev_state(fd)) {
+ if (fd > 0) nif_select_large_fd_error(fd, mode, resource, ref);
+ return INT_MIN | ERL_NIF_SELECT_INVALID_EVENT;
}
#endif
erts_mtx_lock(fd_mtx(fd));
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- state = &drv_ev_state.v[(int) fd];
-#else
- state = hash_get_drv_ev_state(fd); /* may be NULL! */
-#endif
+ state = get_drv_ev_state(fd); /* may be NULL! */
+
+ DEBUG_PRINT_FD("enif_select(%T, %d, %s, %p, %T, %T)",
+ state, env->proc->common.id, fd, nifmode2str(mode), resource,
+ pid ? pid->pid : THE_NON_VALUE, ref);
if (mode & ERL_NIF_SELECT_STOP) {
ASSERT(resource->type->stop);
@@ -1193,13 +896,12 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
}
on = 0;
mode = ERL_DRV_READ | ERL_DRV_WRITE | ERL_DRV_USE;
- wake_poller = 1; /* to eject fd from pollset (if needed) */
ctl_events = ERTS_POLL_EV_IN | ERTS_POLL_EV_OUT;
+ ctl_op = ERTS_POLL_OP_DEL;
}
else {
on = 1;
ASSERT(mode);
- wake_poller = 0;
if (mode & ERL_DRV_READ) {
ctl_events |= ERTS_POLL_EV_IN;
}
@@ -1208,11 +910,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
}
}
-#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if (state == NULL) {
- state = hash_new_drv_ev_state(fd);
- }
-#endif
+ state = new_drv_ev_state(state,fd);
switch (state->type) {
case ERTS_EV_TYPE_NIF:
@@ -1243,7 +941,9 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
}
ASSERT(state->type == ERTS_EV_TYPE_NONE);
break;
- }}
+ }
+ default: break;
+ }
ASSERT((state->type == ERTS_EV_TYPE_NIF) ||
(state->type == ERTS_EV_TYPE_NONE && !state->events));
@@ -1253,20 +953,22 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
if (on) {
ctl_events &= ~old_events;
state->events |= ctl_events;
+ state->active_events |= ctl_events;
+ if (state->type == ERTS_EV_TYPE_NONE)
+ ctl_op = ERTS_POLL_OP_ADD;
}
else {
ctl_events &= old_events;
state->events &= ~ctl_events;
+ state->active_events &= ~ctl_events;
}
- if (ctl_events) {
+ if (ctl_events || ctl_op == ERTS_POLL_OP_DEL) {
ErtsPollEvents new_events;
- if (!state->pollset) {
- state->pollset = erts_get_scheduler_data()->pollset;
- }
-
- new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller);
+ new_events = erts_io_control_wakeup(state, ctl_op,
+ state->active_events,
+ &wake_poller);
if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
if (state->type == ERTS_EV_TYPE_NIF && !old_events) {
@@ -1274,8 +976,6 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
state->flags = 0;
state->driver.nif->in.pid = NIL;
state->driver.nif->out.pid = NIL;
- state->driver.nif->in.ddeselect_cnt = 0;
- state->driver.nif->out.ddeselect_cnt = 0;
state->driver.stop.resource = NULL;
}
ret = INT_MIN | ERL_NIF_SELECT_FAILED;
@@ -1307,11 +1007,9 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
ASSERT(is_internal_ref(ref));
refn = internal_ref_numbers(ref);
state->driver.nif->in.immed = THE_NON_VALUE;
- state->driver.nif->in.refn[0] = refn[0];
- state->driver.nif->in.refn[1] = refn[1];
- state->driver.nif->in.refn[2] = refn[2];
+ sys_memcpy(state->driver.nif->in.refn, refn,
+ sizeof(state->driver.nif->in.refn));
}
- state->driver.nif->in.ddeselect_cnt = 0;
}
if (mode & ERL_DRV_WRITE) {
state->driver.nif->out.pid = recipient;
@@ -1321,11 +1019,9 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
ASSERT(is_internal_ref(ref));
refn = internal_ref_numbers(ref);
state->driver.nif->out.immed = THE_NON_VALUE;
- state->driver.nif->out.refn[0] = refn[0];
- state->driver.nif->out.refn[1] = refn[1];
- state->driver.nif->out.refn[2] = refn[2];
+ sys_memcpy(state->driver.nif->out.refn, refn,
+ sizeof(state->driver.nif->out.refn));
}
- state->driver.nif->out.ddeselect_cnt = 0;
}
ret = 0;
}
@@ -1333,14 +1029,9 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
if (state->type == ERTS_EV_TYPE_NIF) {
state->driver.nif->in.pid = NIL;
state->driver.nif->out.pid = NIL;
- state->driver.nif->in.ddeselect_cnt = 0;
- state->driver.nif->out.ddeselect_cnt = 0;
- if (old_events != 0) {
- remember_removed(state);
- }
}
ASSERT(state->events==0);
- if (state->remove_cnt == 0 || !wake_poller) {
+ if (!wake_poller) {
/*
* Safe to close fd now as it is not in pollset
* or there was no need to eject fd (kernel poll)
@@ -1455,7 +1146,7 @@ print_driver_name(erts_dsprintf_buf_t *dsbufp, Eterm id)
static void
steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode)
{
- erts_dsprintf(dsbufp, "stealing control of fd=%d from ", (int) GET_FD(state->fd));
+ erts_dsprintf(dsbufp, "stealing control of fd=%d from ", (int) state->fd);
switch (state->type) {
case ERTS_EV_TYPE_DRV_SEL: {
int deselect_mode = 0;
@@ -1479,7 +1170,7 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode)
if (deselect_mode)
deselect(state, deselect_mode);
else {
- erts_dsprintf(dsbufp, "no one", (int) GET_FD(state->fd));
+ erts_dsprintf(dsbufp, "no one", (int) state->fd);
ASSERT(0);
}
erts_dsprintf(dsbufp, "\n");
@@ -1510,7 +1201,7 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode)
break;
}
default:
- erts_dsprintf(dsbufp, "no one\n", (int) GET_FD(state->fd));
+ erts_dsprintf(dsbufp, "no one\n", (int) state->fd);
ASSERT(0);
}
}
@@ -1524,7 +1215,7 @@ print_drv_select_op(erts_dsprintf_buf_t *dsbufp,
"driver_select(%p, %d,%s%s%s%s, %d) "
"by ",
ix,
- (int) GET_FD(fd),
+ (int) fd,
mode & ERL_DRV_READ ? " ERL_DRV_READ" : "",
mode & ERL_DRV_WRITE ? " ERL_DRV_WRITE" : "",
mode & ERL_DRV_USE ? " ERL_DRV_USE" : "",
@@ -1541,7 +1232,7 @@ print_nif_select_op(erts_dsprintf_buf_t *dsbufp,
{
erts_dsprintf(dsbufp,
"enif_select(_, %d,%s%s%s, %T:%T, %T) ",
- (int) GET_FD(fd),
+ (int) fd,
mode & ERL_NIF_SELECT_READ ? " READ" : "",
mode & ERL_NIF_SELECT_WRITE ? " WRITE" : "",
mode & ERL_NIF_SELECT_STOP ? " STOP" : "",
@@ -1673,7 +1364,7 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource,
erts_dsprintf(dsbufp, "called before stop was called for NIF resource %T:%T\n",
rt->module, rt->name);
- enif_release_resource(state->driver.stop.resource);
+ enif_release_resource(state->driver.stop.resource->data);
state->type = ERTS_EV_TYPE_NONE;
state->flags = 0;
state->driver.stop.resource = NULL;
@@ -1687,8 +1378,7 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource,
static ERTS_INLINE int
io_task_schedule_allowed(ErtsDrvEventState *state,
- ErtsPortTaskType type,
- erts_aint_t current_cio_time)
+ ErtsPortTaskType type)
{
ErtsIoTask *io_task;
@@ -1708,42 +1398,41 @@ io_task_schedule_allowed(ErtsDrvEventState *state,
return 0;
}
- return !is_iotask_active(io_task, current_cio_time);
+ return !is_iotask_active(io_task);
}
static ERTS_INLINE void
-iready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time)
+iready(Eterm id, ErtsDrvEventState *state)
{
if (io_task_schedule_allowed(state,
- ERTS_PORT_TASK_INPUT,
- current_cio_time)) {
+ ERTS_PORT_TASK_INPUT)) {
ErtsIoTask *iotask = &state->driver.select->iniotask;
- erts_atomic_set_nob(&iotask->executed_time, current_cio_time);
if (erts_port_task_schedule(id,
&iotask->task,
ERTS_PORT_TASK_INPUT,
(ErlDrvEvent) state->fd) != 0) {
stale_drv_select(id, state, ERL_DRV_READ);
- }
- add_active_fd(state->pollset, state->fd);
+ } else {
+ DEBUG_PRINT_FD("schedule ready_input(%T, %d)",
+ state, id, state->fd);
+ }
}
}
static ERTS_INLINE void
-oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time)
+oready(Eterm id, ErtsDrvEventState *state)
{
if (io_task_schedule_allowed(state,
- ERTS_PORT_TASK_OUTPUT,
- current_cio_time)) {
+ ERTS_PORT_TASK_OUTPUT)) {
ErtsIoTask *iotask = &state->driver.select->outiotask;
- erts_atomic_set_nob(&iotask->executed_time, current_cio_time);
if (erts_port_task_schedule(id,
&iotask->task,
ERTS_PORT_TASK_OUTPUT,
(ErlDrvEvent) state->fd) != 0) {
stale_drv_select(id, state, ERL_DRV_WRITE);
- }
- add_active_fd(state->pollset, state->fd);
+ } else {
+ DEBUG_PRINT_FD("schedule ready_output(%T, %d)", state, id, state->fd);
+ }
}
}
@@ -1798,91 +1487,56 @@ send_event_tuple(struct erts_nif_select_event* e, ErtsResource* resource,
static void bad_fd_in_pollset(ErtsDrvEventState *, Eterm inport, Eterm outport);
void
-ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info *psi, int set)
+erts_check_io_interrupt(ErtsPollThread *psi, int set)
{
- ERTS_CIO_POLL_INTR(psi->ps, set);
+ if (psi) {
+#if ERTS_POLL_USE_FALLBACK
+ if (psi->ps == get_fallback()) {
+ erts_poll_interrupt_flbk(psi->ps, set);
+ return;
+ }
+#endif
+ erts_poll_interrupt(psi->ps, set);
+ }
}
-void
-ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info *psi,
- int set,
- ErtsMonotonicTime timeout_time)
-{
- ERTS_CIO_POLL_INTR_TMD(psi->ps, set, timeout_time);
+ErtsPollThread *
+erts_create_pollset_thread(int id) {
+ return psiv+id;
}
-#ifndef __WIN32__
-/*
- * Number of ignored events, for a lingering fd added by enif_select(),
- * until we deselect fd-event from pollset.
- */
-# define ERTS_NIF_DELAYED_DESELECT 20
-#else
-/* Disable delayed deselect as pollset cannot handle active events */
-# define ERTS_NIF_DELAYED_DESELECT 1
-#endif
-
void
-ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
+erts_check_io(ErtsPollThread *psi)
{
- ErtsPollResFd *pollres;
int pollres_len;
- ErtsMonotonicTime timeout_time;
int poll_ret, i;
- erts_aint_t current_cio_time;
- ErtsSchedulerData *esdp = erts_get_scheduler_data();
- struct pollset_info *psi = esdp->pollset;
+ ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_CHECK_IO);
restart:
-#ifdef ERTS_BREAK_REQUESTED
- if (ERTS_BREAK_REQUESTED)
- erts_do_break_handling();
-#endif
-
- /* Figure out timeout value */
- timeout_time = (do_wait
- ? erts_check_next_timeout_time(esdp)
- : ERTS_POLL_NO_TIMEOUT /* poll only */);
-
- /*
- * No need for an atomic inc op when incrementing
- * erts_check_io_time, since only one thread can
- * check io at a time.
- */
- current_cio_time = erts_atomic_read_dirty(&psi->check_io_time);
- current_cio_time++;
- erts_atomic_set_relb(&psi->check_io_time, current_cio_time);
-
- check_cleanup_active_fds(psi,
- current_cio_time,
- timeout_time != ERTS_POLL_NO_TIMEOUT);
-
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
#endif
- pollres_len = erts_atomic32_read_dirty(&psi->active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN;
+ pollres_len = psi->pollres_len;
- pollres = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollResFd)*pollres_len);
+#if ERTS_POLL_USE_FALLBACK
+ if (psi->ps == get_fallback()) {
- erts_atomic_set_nob(&psi->in_poll_wait, 1);
+ poll_ret = erts_poll_wait_flbk(psi->ps, psi->pollres, &pollres_len);
- poll_ret = ERTS_CIO_POLL_WAIT(psi->ps, pollres, &pollres_len, timeout_time);
+ } else
+#endif
+ {
+ poll_ret = erts_poll_wait(psi->ps, psi->pollres, &pollres_len);
+ }
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
#endif
-#ifdef ERTS_BREAK_REQUESTED
- if (ERTS_BREAK_REQUESTED)
- erts_do_break_handling();
-#endif
-
if (poll_ret != 0) {
- erts_atomic_set_nob(&psi->in_poll_wait, 0);
- forget_removed(psi);
- erts_free(ERTS_ALC_T_TMP, pollres);
+
if (poll_ret == EAGAIN) {
goto restart;
}
@@ -1898,64 +1552,78 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
erl_errno_id(poll_ret), poll_ret);
erts_send_error_to_logger_nogl(dsbufp);
}
+ ERTS_MSACC_POP_STATE();
return;
}
for (i = 0; i < pollres_len; i++) {
- ErtsSysFdType fd = (ErtsSysFdType) pollres[i].fd;
+ erts_driver_t* drv_ptr = NULL;
+ ErtsResource* resource = NULL;
+ ErtsDrvSelectDataState *free_select = NULL;
+ ErtsNifSelectDataState *free_nif = NULL;
+ ErtsSysFdType fd = (ErtsSysFdType) ERTS_POLL_RES_GET_FD(&psi->pollres[i]);
ErtsDrvEventState *state;
+ ErtsPollEvents revents;
erts_mtx_lock(fd_mtx(fd));
+ state = get_drv_ev_state(fd);
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- state = &drv_ev_state.v[ (int) fd];
-#else
- state = hash_get_drv_ev_state(fd);
if (!state) {
- goto next_pollres;
+ erts_mtx_unlock(fd_mtx(fd));
+ continue;
}
-#endif
- /* Skip this fd if it was removed from pollset */
- if (is_removed(state) || state->pollset != psi) {
- goto next_pollres;
- }
+ revents = ERTS_POLL_RES_GET_EVTS(&psi->pollres[i]);
+
+ DEBUG_PRINT_FD("triggered %s", state, ev2str(revents));
+
+ if (revents & ERTS_POLL_EV_ERR) {
+ /*
+ * Handle error events by triggering all in/out events
+ * that has been selected on.
+ * We *do not* want to call a callback that corresponds
+ * to an event not selected.
+ */
+ revents = state->active_events;
+ state->active_events = 0;
+ } else {
+
+ /* Disregard any events that are not active at the moment,
+ for instance this could happen if the driver/nif does
+ select/deselect in rapid succession. */
+ revents &= state->active_events | ERTS_POLL_EV_NVAL;
+ state->active_events &= ~revents;
+
+ /* Reactivate the poll op if there are still active events */
+ if (state->active_events) {
+ DEBUG_PRINT_FD("re-enable %s", state, ev2str(state->active_events));
+ erts_io_control(state, ERTS_POLL_OP_MOD, state->active_events);
+ }
+ }
switch (state->type) {
case ERTS_EV_TYPE_DRV_SEL: { /* Requested via driver_select()... */
- ErtsPollEvents revents = pollres[i].events;
-
- if (revents & ERTS_POLL_EV_ERR) {
- /*
- * Handle error events by triggering all in/out events
- * that the driver has selected.
- * We *do not* want to call a callback that corresponds
- * to an event not selected.
- */
- revents = state->events;
- }
- else {
- revents &= (state->events | ERTS_POLL_EV_NVAL);
- }
if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
if (revents & ERTS_POLL_EV_OUT) {
- oready(state->driver.select->outport, state, current_cio_time);
+ oready(state->driver.select->outport, state);
}
/* Someone might have deselected input since revents
- was read therefore, update revents... */
- revents &= state->events;
+ was read (true also on the non-smp emulator since
+ oready() may have been called); therefore, update
+ revents... */
+ revents &= state->events;
if (revents & ERTS_POLL_EV_IN) {
- iready(state->driver.select->inport, state, current_cio_time);
+ iready(state->driver.select->inport, state);
}
}
else if (revents & ERTS_POLL_EV_NVAL) {
bad_fd_in_pollset(state,
state->driver.select->inport,
state->driver.select->outport);
- add_active_fd(psi, state->fd);
+ check_fd_cleanup(state, &free_select, &free_nif);
}
break;
}
@@ -1964,89 +1632,116 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
struct erts_nif_select_event in = {NIL};
struct erts_nif_select_event out = {NIL};
ErtsResource* resource = NULL;
- ErtsPollEvents revents = pollres[i].events;
-
- if (revents & ERTS_POLL_EV_ERR) {
- /*
- * Handle error events by triggering all in/out events
- * that the NIF has selected.
- * We *do not* want to send a message that corresponds
- * to an event not selected.
- */
- revents = state->events;
- }
- else {
- revents &= (state->events | ERTS_POLL_EV_NVAL);
- }
if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
if (revents & ERTS_POLL_EV_OUT) {
if (is_not_nil(state->driver.nif->out.pid)) {
out = state->driver.nif->out;
resource = state->driver.stop.resource;
- state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
state->driver.nif->out.pid = NIL;
- add_active_fd(psi, state->fd);
- }
- else {
- ASSERT(state->driver.nif->out.ddeselect_cnt >= 2);
- state->driver.nif->out.ddeselect_cnt--;
}
}
if (revents & ERTS_POLL_EV_IN) {
if (is_not_nil(state->driver.nif->in.pid)) {
in = state->driver.nif->in;
resource = state->driver.stop.resource;
- state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
state->driver.nif->in.pid = NIL;
- add_active_fd(psi, state->fd);
- }
- else {
- ASSERT(state->driver.nif->in.ddeselect_cnt >= 2);
- state->driver.nif->in.ddeselect_cnt--;
}
}
+ state->events &= ~revents;
}
else if (revents & ERTS_POLL_EV_NVAL) {
bad_fd_in_pollset(state, NIL, NIL);
- add_active_fd(psi, state->fd);
+ check_fd_cleanup(state, &free_select, &free_nif);
}
erts_mtx_unlock(fd_mtx(fd));
+
if (is_not_nil(in.pid)) {
send_event_tuple(&in, resource, am_ready_input);
}
if (is_not_nil(out.pid)) {
send_event_tuple(&out, resource, am_ready_output);
}
- goto next_pollres_unlocked;
+ continue;
}
+ case ERTS_EV_TYPE_STOP_NIF: {
+ resource = state->driver.stop.resource;
+ state->type = ERTS_EV_TYPE_NONE;
+ goto case_ERTS_EV_TYPE_NONE;
+ }
+
+ case ERTS_EV_TYPE_STOP_USE: {
+#if ERTS_POLL_USE_FALLBACK
+ ASSERT(psi->ps == get_fallback());
+#endif
+ drv_ptr = state->driver.stop.drv_ptr;
+ state->type = ERTS_EV_TYPE_NONE;
+ /* fallthrough */
case ERTS_EV_TYPE_NONE: /* Deselected ... */
+ case_ERTS_EV_TYPE_NONE:
+ ASSERT(!state->events && !state->active_events && !state->flags);
+ check_fd_cleanup(state, &free_select, &free_nif);
break;
+ }
default: { /* Error */
erts_dsprintf_buf_t *dsbufp;
dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp,
"Invalid event request type for fd in erts_poll()! "
- "fd=%d, event request type=%sd\n", (int) state->fd,
+ "fd=%d, event request type=%d\n", (int) state->fd,
(int) state->type);
ASSERT(0);
deselect(state, 0);
- add_active_fd(psi, state->fd);
break;
}
}
- next_pollres:;
erts_mtx_unlock(fd_mtx(fd));
- next_pollres_unlocked:;
- }
- erts_atomic_set_nob(&psi->in_poll_wait, 0);
- erts_free(ERTS_ALC_T_TMP, pollres);
- forget_removed(psi);
+ if (drv_ptr) {
+ int was_unmasked = erts_block_fpe();
+ DTRACE1(driver_stop_select, drv_ptr->name);
+ LTTNG1(driver_stop_select, drv_ptr->name);
+ (*drv_ptr->stop_select)((ErlDrvEvent) fd, NULL);
+ erts_unblock_fpe(was_unmasked);
+ if (drv_ptr->handle) {
+ erts_ddll_dereference_driver(drv_ptr->handle);
+ }
+ }
+ if (resource) {
+ erts_resource_stop(resource, (ErlNifEvent)fd, 1);
+ enif_release_resource(resource->data);
+ }
+ if (free_select)
+ free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
+ }
+
+ /* The entire pollres array was filled with events,
+ * grow it for the next call. We do this for two reasons:
+ * 1. Pulling out more events in on go will increase throughput
+ * 2. If the polling implementation is not fair, this will make
+ * sure that we get all fds that we can. i.e. if 12 fds are
+ * constantly active, but we only have a pollres_len of 10,
+ * two of the fds may never be triggered depending on what the
+ * kernel decides to do.
+ **/
+ if (pollres_len == psi->pollres_len) {
+ int ev_state_len = drv_ev_state_len();
+ erts_free(ERTS_ALC_T_POLLSET, psi->pollres);
+ psi->pollres_len *= 2;
+ /* Never grow it larger than the current drv_ev_state.len size */
+ if (psi->pollres_len > ev_state_len)
+ psi->pollres_len = ev_state_len;
+ psi->pollres = erts_alloc(ERTS_ALC_T_POLLSET,
+ sizeof(ErtsPollResFd) * psi->pollres_len);
+ }
+
+ ERTS_MSACC_POP_STATE();
}
static void
@@ -2167,98 +1862,189 @@ static void drv_ev_state_free(void *des)
}
#endif
-#ifdef ERTS_ENABLE_KERNEL_POLL
+#define ERTS_MAX_NO_OF_POLL_THREADS ERTS_MAX_NO_OF_SCHEDULERS
-struct io_functions {
- int (*select)(ErlDrvPort, ErlDrvEvent, int, int);
- int (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, const ErlNifPid*, Eterm);
- void (*check_io_as_interrupt)(struct pollset_info*);
- void (*check_io_interrupt)(struct pollset_info*, int);
- void (*check_io_interrupt_tmd)(struct pollset_info*, int, ErtsMonotonicTime);
- void (*check_io)(int);
- struct pollset_info* (*get_pollset)(int sched_nr);
- void (*notify_port_task_executed)(ErtsPortTaskHandle *pthp);
- int (*max_files)(void);
- Uint (*size)(void);
- Eterm (*info)(void *);
- int (*check_io_debug)(ErtsCheckIoDebugInfo *);
-#ifdef ERTS_ENABLE_LOCK_COUNT
- void (*lcnt_update_cio_locks)(int enable);
-#endif
-};
+static char *
+get_arg(char* rest, char** argv, int* ip)
+{
+ int i = *ip;
+ if (*rest == '\0') {
+ if (argv[i+1] == NULL) {
+ erts_fprintf(stderr, "too few arguments\n");
+ erts_usage();
+ }
+ argv[i++] = NULL;
+ rest = argv[i];
+ }
+ argv[i] = NULL;
+ *ip = i;
+ return rest;
+}
-# ifdef ERTS_KERNEL_POLL_VERSION
-struct io_functions erts_io_funcs = {0};
-# else
-extern struct io_functions erts_io_funcs;
-# endif
+static void
+parse_args(int *argc, char **argv, int concurrent_waiters)
+{
+ int i = 0, j;
+ int no_pollsets = 0, no_poll_threads = 0,
+ no_pollsets_percentage = 0,
+ no_poll_threads_percentage = 0;
+ ASSERT(argc && argv);
+ while (i < *argc) {
+ if(argv[i][0] == '-') {
+ switch (argv[i][1]) {
+ case 'I': {
+ if (strncmp(argv[i]+2, "Ot", 2) == 0) {
+ char *arg = get_arg(argv[i]+4, argv, &i);
+ if (sscanf(arg, "%d", &no_poll_threads) != 1 ||
+ no_poll_threads < 1 ||
+ ERTS_MAX_NO_OF_POLL_THREADS < no_poll_threads) {
+ erts_fprintf(stderr,"bad I/O poll threads number: %s\n", arg);
+ erts_usage();
+ }
+ } else if (strncmp(argv[i]+2, "Op", 3) == 0) {
+ char *arg = get_arg(argv[i]+4, argv, &i);
+ if (sscanf(arg, "%d", &no_pollsets) != 1 ||
+ no_pollsets < 1) {
+ erts_fprintf(stderr,"bad I/O pollset number: %s\n", arg);
+ erts_usage();
+ }
+ } else if (strncmp(argv[i]+2, "OPt", 4) == 0) {
+ char *arg = get_arg(argv[i]+5, argv, &i);
+ if (sscanf(arg, "%d", &no_poll_threads_percentage) != 1 ||
+ no_poll_threads_percentage < 0 ||
+ no_poll_threads_percentage > 100) {
+ erts_fprintf(stderr,"bad I/O poll thread percentage number: %s\n", arg);
+ erts_usage();
+ }
+ } else if (strncmp(argv[i]+2, "OPp", 4) == 0) {
+ char *arg = get_arg(argv[i]+5, argv, &i);
+ if (sscanf(arg, "%d", &no_pollsets_percentage) != 1 ||
+ no_pollsets_percentage < 0 ||
+ no_pollsets_percentage > 100) {
+ erts_fprintf(stderr,"bad I/O pollset percentage number: %s\n", arg);
+ erts_usage();
+ }
+ } else {
+ break;
+ }
+ break;
+ }
+ case 'K':
+ (void)get_arg(argv[i]+2, argv, &i);
+ break;
+ case '-':
+ goto args_parsed;
+ default:
+ break;
+ }
+ }
+ i++;
+ }
+
+args_parsed:
-#endif /* ERTS_ENABLE_KERNEL_POLL */
+ if (!concurrent_waiters) {
+ no_pollsets = no_poll_threads;
+ no_pollsets_percentage = 100;
+ }
+
+ if (no_poll_threads == 0) {
+ if (no_poll_threads_percentage == 0)
+ no_poll_threads = 1; /* This is the default */
+ else {
+ no_poll_threads = erts_no_schedulers * no_poll_threads_percentage / 100;
+ if (no_poll_threads < 1)
+ no_poll_threads = 1;
+ }
+ }
+
+ if (no_pollsets == 0) {
+ if (no_pollsets_percentage == 0)
+ no_pollsets = 1; /* This is the default */
+ else {
+ no_pollsets = no_poll_threads * no_pollsets_percentage / 100;
+ if (no_pollsets < 1)
+ no_pollsets = 1;
+ }
+ }
+
+ if (no_poll_threads < no_pollsets) {
+ erts_fprintf(stderr,
+ "number of IO poll threads has to be greater or equal to "
+ "the number of \nIO pollsets. Current values are set to: \n"
+ " -IOt %d -IOp %d\n",
+ no_poll_threads, no_pollsets);
+ erts_usage();
+ }
+
+ /* Handled arguments have been marked with NULL. Slide arguments
+ not handled towards the beginning of argv. */
+ for (i = 0, j = 0; i < *argc; i++) {
+ if (argv[i])
+ argv[j++] = argv[i];
+ }
+ *argc = j;
+
+ erts_no_pollsets = no_pollsets;
+ erts_no_poll_threads = no_poll_threads;
+}
void
-ERTS_CIO_EXPORT(erts_init_check_io)(void)
+erts_init_check_io(int *argc, char **argv)
{
- int j;
+ int j, concurrent_waiters, no_poll_threads;
ERTS_CT_ASSERT((INT_MIN & (ERL_NIF_SELECT_STOP_CALLED |
ERL_NIF_SELECT_STOP_SCHEDULED |
ERL_NIF_SELECT_INVALID_EVENT |
ERL_NIF_SELECT_FAILED)) == 0);
-#ifdef ERTS_ENABLE_KERNEL_POLL
- ASSERT(erts_io_funcs.select == NULL);
- erts_io_funcs.select = ERTS_CIO_EXPORT(driver_select);
- erts_io_funcs.enif_select = ERTS_CIO_EXPORT(enif_select);
- erts_io_funcs.check_io_interrupt = ERTS_CIO_EXPORT(erts_check_io_interrupt);
- erts_io_funcs.check_io_interrupt_tmd= ERTS_CIO_EXPORT(erts_check_io_interrupt_timed);
- erts_io_funcs.check_io = ERTS_CIO_EXPORT(erts_check_io);
- erts_io_funcs.get_pollset = ERTS_CIO_EXPORT(erts_get_pollset);
- erts_io_funcs.notify_port_task_executed = ERTS_CIO_EXPORT(erts_io_notify_port_task_executed);
- erts_io_funcs.max_files = ERTS_CIO_EXPORT(erts_check_io_max_files);
- erts_io_funcs.size = ERTS_CIO_EXPORT(erts_check_io_size);
- erts_io_funcs.info = ERTS_CIO_EXPORT(erts_check_io_info);
- erts_io_funcs.check_io_debug = ERTS_CIO_EXPORT(erts_check_io_debug);
-#ifdef ERTS_ENABLE_LOCK_COUNT
- erts_io_funcs.lcnt_update_cio_locks = ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks);
-#endif
-#endif
-
- init_removed_fd_alloc();
-
- ERTS_CIO_POLL_INIT();
- pollsetv = erts_alloc(ERTS_ALC_T_POLLSET,
- sizeof(struct pollset_info)*erts_no_schedulers);
- for (j=0; j < erts_no_schedulers; j++) {
- struct pollset_info* psi = &pollsetv[j];
-
- erts_atomic_init_nob(&psi->check_io_time, 0);
- erts_atomic_init_nob(&psi->in_poll_wait, 0);
- psi->ps = ERTS_CIO_NEW_POLLSET();
- psi->active_fd.six = 0;
- psi->active_fd.eix = 0;
- erts_atomic32_init_nob(&psi->active_fd.no, 0);
- psi->active_fd.size = ERTS_ACTIVE_FD_INC;
- psi->active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR,
- sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC);
-#ifdef DEBUG
- {
- int i;
- for (i = 0; i < ERTS_ACTIVE_FD_INC; i++)
- psi->active_fd.array[i] = ERTS_SYS_FD_INVALID;
- }
+
+ erts_poll_init(&concurrent_waiters);
+#if ERTS_POLL_USE_FALLBACK
+ erts_poll_init_flbk(NULL);
#endif
- erts_atomic_init_nob(&psi->removed_list, (erts_aint_t)NULL);
+ parse_args(argc, argv, concurrent_waiters);
+
+ /* Create the actual pollsets */
+ pollsetv = erts_alloc(ERTS_ALC_T_POLLSET,sizeof(ErtsPollSet *) * erts_no_pollsets);
+
+ for (j=0; j < erts_no_pollsets; j++)
+ pollsetv[j] = erts_poll_create_pollset(j);
+
+#if ERTS_POLL_USE_FALLBACK
+ flbk_pollset = erts_poll_create_pollset_flbk(-1);
+#endif
+
+ no_poll_threads = erts_no_poll_threads;
+#if ERTS_POLL_USE_FALLBACK
+ no_poll_threads++;
+#endif
+
+ psiv = erts_alloc(ERTS_ALC_T_POLLSET, sizeof(ErtsPollThread) * no_poll_threads);
+
+#if ERTS_POLL_USE_FALLBACK
+ psiv[0].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN;
+ psiv[0].pollres = erts_alloc(ERTS_ALC_T_POLLSET,
+ sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN);
+ psiv[0].ps = get_fallback();
+ psiv++;
+#endif
+
+ for (j = 0; j < erts_no_poll_threads; j++) {
+ psiv[j].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN;
+ psiv[j].pollres = erts_alloc(ERTS_ALC_T_POLLSET,
+ sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN);
+ psiv[j].ps = pollsetv[j % erts_no_pollsets];
}
- {
- int i;
- for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) {
- erts_mtx_init(&drv_ev_state.locks[i].lck, "drv_ev_state", make_small(i),
- ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO);
- }
+ for (j=0; j < ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; j++) {
+ erts_mtx_init(&drv_ev_state.locks[j].lck, "drv_ev_state", make_small(j),
+ ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO);
}
+
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- drv_ev_state.max_fds = ERTS_CIO_POLL_MAX_FDS();
+ drv_ev_state.max_fds = erts_poll_max_fds();
erts_atomic_init_nob(&drv_ev_state.len, 0);
drv_ev_state.v = NULL;
erts_mtx_init(&drv_ev_state.grow_lock, "drv_ev_state_grow", NIL,
@@ -2281,24 +2067,29 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void)
}
int
-ERTS_CIO_EXPORT(erts_check_io_max_files)(void)
+erts_check_io_max_files(void)
{
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
return drv_ev_state.max_fds;
#else
- return ERTS_POLL_EXPORT(erts_poll_max_fds)();
+ return erts_poll_max_fds();
#endif
}
Uint
-ERTS_CIO_EXPORT(erts_check_io_size)(void)
+erts_check_io_size(void)
{
Uint res = 0;
ErtsPollInfo pi;
int i;
- for (i = 0; i < erts_no_schedulers; i++) {
- ERTS_CIO_POLL_INFO(pollsetv[i].ps, &pi);
+#if ERTS_POLL_USE_FALLBACK
+ erts_poll_info(get_fallback(), &pi);
+ res += pi.memory_size;
+#endif
+
+ for (i = 0; i < erts_no_pollsets; i++) {
+ erts_poll_info(pollsetv[i], &pi);
res += pi.memory_size;
}
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
@@ -2318,58 +2109,77 @@ ERTS_CIO_EXPORT(erts_check_io_size)(void)
}
Eterm
-ERTS_CIO_EXPORT(erts_check_io_info)(void *proc)
+erts_check_io_info(void *proc)
{
Process *p = (Process *) proc;
Eterm tags[16], values[16], res, list = NIL;
Uint sz, *szp, *hp, **hpp;
ErtsPollInfo *piv;
- Sint i, j;
-
- piv = erts_alloc(ERTS_ALC_T_TMP,
- sizeof(ErtsPollInfo) * erts_no_schedulers);
+ Sint i, j = 0, len;
+ int no_pollsets = erts_no_pollsets + ERTS_POLL_USE_FALLBACK;
+ ERTS_CT_ASSERT(ERTS_POLL_USE_FALLBACK == 0 || ERTS_POLL_USE_FALLBACK == 1);
- for (j = 0; j < erts_no_schedulers; j++) {
- struct pollset_info *psi = &pollsetv[j];
- erts_aint_t cio_time = erts_atomic_read_acqb(&psi->check_io_time);
+ piv = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollInfo) * no_pollsets);
- piv[j].active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no);
- while (1) {
- erts_aint_t post_cio_time;
- int post_active_fds;
+#if ERTS_POLL_USE_FALLBACK
+ erts_poll_info_flbk(get_fallback(), &piv[0]);
+ piv[0].poll_threads = 1;
+ piv[0].active_fds = 0;
+ piv++;
+#endif
- ERTS_CIO_POLL_INFO(psi->ps, &piv[j]);
+ for (j = 0; j < erts_no_pollsets; j++) {
+ erts_poll_info(pollsetv[j], &piv[j]);
+ piv[j].active_fds = 0;
+ piv[j].poll_threads = erts_no_poll_threads / erts_no_pollsets;
+ if (erts_no_poll_threads % erts_no_pollsets > j)
+ piv[j].poll_threads++;
+ }
- post_cio_time = erts_atomic_read_mb(&psi->check_io_time);
- post_active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no);
- if (cio_time == post_cio_time && piv[j].active_fds == post_active_fds)
- break;
- cio_time = post_cio_time;
- piv[j].active_fds = post_active_fds;
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ i = 0;
+ erts_mtx_lock(&drv_ev_state.grow_lock);
+ len = erts_atomic_read_nob(&drv_ev_state.len);
+ for (i = 0; i < ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; i++) {
+ erts_mtx_lock(&drv_ev_state.locks[i].lck);
+ for (j = i; j < len; j+=ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT) {
+ ErtsDrvEventState *state = get_drv_ev_state(j);
+ int pollsetid = get_pollset_id(j);
+ ASSERT(fd_mtx(j) == &drv_ev_state.locks[i].lck);
+ if (state->flags & ERTS_EV_FLAG_FALLBACK)
+ pollsetid = -1;
+ if (state->driver.select
+ && (state->type == ERTS_EV_TYPE_DRV_SEL)
+ && (is_iotask_active(&state->driver.select->iniotask)
+ || is_iotask_active(&state->driver.select->outiotask)))
+ piv[pollsetid].active_fds++;
}
+ erts_mtx_unlock(&drv_ev_state.locks[i].lck);
+ }
+ erts_mtx_unlock(&drv_ev_state.grow_lock);
- #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- piv[j].memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len);
- #else
- piv[j].memory_size += safe_hash_table_sz(&drv_ev_state.tab);
- {
- SafeHashInfo hi;
- safe_hash_get_info(&hi, &drv_ev_state.tab);
- piv[j].memory_size += hi.objs * sizeof(ErtsDrvEventState);
- }
- erts_spin_lock(&drv_ev_state.prealloc_lock);
- piv[j].memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState);
- erts_spin_unlock(&drv_ev_state.prealloc_lock);
- #endif
+ piv[0].memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len);
+#else
+ piv[0].memory_size += safe_hash_table_sz(&drv_ev_state.tab);
+ {
+ SafeHashInfo hi;
+ safe_hash_get_info(&hi, &drv_ev_state.tab);
+ piv[0].memory_size += hi.objs * sizeof(ErtsDrvEventState);
}
+ erts_spin_lock(&drv_ev_state.prealloc_lock);
+ piv[0].memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState);
+ erts_spin_unlock(&drv_ev_state.prealloc_lock);
+#endif
hpp = NULL;
szp = &sz;
sz = 0;
+ piv -= ERTS_POLL_USE_FALLBACK;
+
bld_it:
- for (j = erts_no_schedulers-1; j >= 0; j--) {
+ for (j = no_pollsets-1; j >= 0; j--) {
i = 0;
tags[i] = erts_bld_atom(hpp, szp, "name");
@@ -2378,9 +2188,6 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc)
tags[i] = erts_bld_atom(hpp, szp, "primary");
values[i++] = erts_bld_atom(hpp, szp, piv[j].primary);
- tags[i] = erts_bld_atom(hpp, szp, "fallback");
- values[i++] = erts_bld_atom(hpp, szp, piv[j].fallback ? piv[j].fallback : "false");
-
tags[i] = erts_bld_atom(hpp, szp, "kernel_poll");
values[i++] = erts_bld_atom(hpp, szp,
piv[j].kernel_poll ? piv[j].kernel_poll : "false");
@@ -2389,20 +2196,13 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc)
values[i++] = erts_bld_uint(hpp, szp, piv[j].memory_size);
tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].poll_set_size);
-
- if (piv[j].fallback) {
- tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].fallback_poll_set_size);
- }
+ values[i++] = erts_bld_uint(hpp, szp, piv[j].poll_set_size);
tags[i] = erts_bld_atom(hpp, szp, "lazy_updates");
values[i++] = piv[j].lazy_updates ? am_true : am_false;
- if (piv[j].lazy_updates) {
- tags[i] = erts_bld_atom(hpp, szp, "pending_updates");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].pending_updates);
- }
+ tags[i] = erts_bld_atom(hpp, szp, "pending_updates");
+ values[i++] = erts_bld_uint(hpp, szp, piv[j].pending_updates);
tags[i] = erts_bld_atom(hpp, szp, "batch_updates");
values[i++] = piv[j].batch_updates ? am_true : am_false;
@@ -2410,22 +2210,17 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc)
tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates");
values[i++] = piv[j].concurrent_updates ? am_true : am_false;
+ tags[i] = erts_bld_atom(hpp, szp, "fallback");
+ values[i++] = piv[j].is_fallback ? am_true : am_false;
+
tags[i] = erts_bld_atom(hpp, szp, "max_fds");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].max_fds);
+ values[i++] = erts_bld_uint(hpp, szp, piv[j].max_fds);
tags[i] = erts_bld_atom(hpp, szp, "active_fds");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].active_fds);
-
- #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
- tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_wakeups);
+ values[i++] = erts_bld_uint(hpp, szp, piv[j].active_fds);
- tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_interrupts);
-
- tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_interrupt_timed);
- #endif
+ tags[i] = erts_bld_atom(hpp, szp, "poll_threads");
+ values[i++] = erts_bld_uint(hpp, szp, piv[j].poll_threads);
res = erts_bld_2tup_list(hpp, szp, i, tags, values);
@@ -2454,6 +2249,10 @@ static ERTS_INLINE ErtsPollEvents
print_events(ErtsPollEvents ev)
{
int first = 1;
+ if(ev == ERTS_POLL_EV_NONE) {
+ erts_printf("N/A");
+ return 0;
+ }
if(ev & ERTS_POLL_EV_IN) {
ev &= ~ERTS_POLL_EV_IN;
erts_printf("%s%s", first ? "" : "|", "IN");
@@ -2486,15 +2285,40 @@ print_flags(EventStateFlags f)
erts_printf("%s","USED");
delim = "|";
}
- if(f & ERTS_EV_FLAG_DEFER_IN_EV) {
- erts_printf("%s%s", delim, "DRIN");
+ if(f & ERTS_EV_FLAG_FALLBACK) {
+ erts_printf("%s%s", delim, "FLBK");
delim = "|";
}
- if(f & ERTS_EV_FLAG_DEFER_OUT_EV) {
- erts_printf("%s%s", delim, "DROUT");
+}
+
+#ifdef DEBUG_PRINT_MODE
+
+static ERTS_INLINE char *
+drvmode2str(int mode) {
+ switch (mode) {
+ case ERL_DRV_READ|ERL_DRV_USE: return "READ|USE";
+ case ERL_DRV_WRITE|ERL_DRV_USE: return "WRITE|USE";
+ case ERL_DRV_READ|ERL_DRV_WRITE|ERL_DRV_USE: return "READ|WRITE|USE";
+ case ERL_DRV_USE: return "USE";
+ case ERL_DRV_READ: return "READ";
+ case ERL_DRV_WRITE: return "WRITE";
+ case ERL_DRV_READ|ERL_DRV_WRITE: return "READ|WRITE";
+ default: return "UNKNOWN";
+ }
+}
+
+static ERTS_INLINE char *
+nifmode2str(enum ErlNifSelectFlags mode) {
+ switch (mode) {
+ case ERL_NIF_SELECT_READ: return "READ";
+ case ERL_NIF_SELECT_WRITE: return "WRITE";
+ case ERL_NIF_SELECT_STOP: return "STOP";
+ default: return "UNKNOWN";
}
}
+#endif
+
typedef struct {
int used_fds;
int num_errors;
@@ -2506,53 +2330,32 @@ typedef struct {
#endif
} IterDebugCounters;
-static void doit_erts_check_io_debug(void *vstate, void *vcounters)
+static int erts_debug_print_checkio_state(ErtsDrvEventState *state,
+ ErtsPollEvents ep_events,
+ int internal)
{
- ErtsDrvEventState *state = (ErtsDrvEventState *) vstate;
- IterDebugCounters *counters = (IterDebugCounters *) vcounters;
- ErtsPollEvents cio_events = state->events;
- ErtsSysFdType fd = state->fd;
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- int internal = 0;
- ErtsPollEvents ep_events = counters->epep[(int) fd];
-#endif
- int err = 0;
-
#if defined(HAVE_FSTAT) && !defined(NO_FSTAT_ON_SYS_FD_TYPE)
struct stat stat_buf;
#endif
- if (state->driver.select)
- counters->no_driver_select_structs++;
- if (state->driver.nif)
- counters->no_enif_select_structs++;
-
+ ErtsSysFdType fd = state->fd;
+ ErtsPollEvents cio_events = state->events;
+ int err = 0;
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if (state->events || ep_events) {
- if (ep_events & ERTS_POLL_EV_NVAL) {
- ep_events &= ~ERTS_POLL_EV_NVAL;
- internal = 1;
- counters->internal_fds++;
- }
- else
- counters->used_fds++;
-#else
- if (state->events) {
- counters->used_fds++;
+ ErtsPollEvents aio_events = state->active_events;
#endif
-
- erts_printf("pollset=%d fd=%d ",
- (int)(state->pollset - pollsetv), (int) fd);
-
+ erts_printf("pollset=%d fd=%d ",
+ state->flags & ERTS_EV_FLAG_FALLBACK ? -1 : get_pollset_id(fd), (int) fd);
+
#if defined(HAVE_FSTAT) && !defined(NO_FSTAT_ON_SYS_FD_TYPE)
- if (fstat((int) fd, &stat_buf) < 0)
- erts_printf("type=unknown ");
- else {
- erts_printf("type=");
+ if (fstat((int) fd, &stat_buf) < 0)
+ erts_printf("type=unknown ");
+ else {
+ erts_printf("type=");
#ifdef S_ISSOCK
- if (S_ISSOCK(stat_buf.st_mode))
- erts_printf("sock ");
- else
+ if (S_ISSOCK(stat_buf.st_mode))
+ erts_printf("sock ");
+ else
#endif
#ifdef S_ISFIFO
if (S_ISFIFO(stat_buf.st_mode))
@@ -2560,196 +2363,239 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
else
#endif
#ifdef S_ISCHR
- if (S_ISCHR(stat_buf.st_mode))
- erts_printf("chr ");
- else
+ if (S_ISCHR(stat_buf.st_mode))
+ erts_printf("chr ");
+ else
#endif
#ifdef S_ISDIR
- if (S_ISDIR(stat_buf.st_mode))
- erts_printf("dir ");
- else
+ if (S_ISDIR(stat_buf.st_mode))
+ erts_printf("dir ");
+ else
#endif
#ifdef S_ISBLK
- if (S_ISBLK(stat_buf.st_mode))
- erts_printf("blk ");
- else
+ if (S_ISBLK(stat_buf.st_mode))
+ erts_printf("blk ");
+ else
#endif
#ifdef S_ISREG
- if (S_ISREG(stat_buf.st_mode))
- erts_printf("reg ");
- else
+ if (S_ISREG(stat_buf.st_mode))
+ erts_printf("reg ");
+ else
#endif
#ifdef S_ISLNK
- if (S_ISLNK(stat_buf.st_mode))
- erts_printf("lnk ");
- else
+ if (S_ISLNK(stat_buf.st_mode))
+ erts_printf("lnk ");
+ else
#endif
#ifdef S_ISDOOR
- if (S_ISDOOR(stat_buf.st_mode))
- erts_printf("door ");
- else
+ if (S_ISDOOR(stat_buf.st_mode))
+ erts_printf("door ");
+ else
#endif
#ifdef S_ISWHT
- if (S_ISWHT(stat_buf.st_mode))
- erts_printf("wht ");
- else
+ if (S_ISWHT(stat_buf.st_mode))
+ erts_printf("wht ");
+ else
#endif
#ifdef S_ISXATTR
- if (S_ISXATTR(stat_buf.st_mode))
- erts_printf("xattr ");
- else
+ if (S_ISXATTR(stat_buf.st_mode))
+ erts_printf("xattr ");
+ else
#endif
- erts_printf("unknown ");
- }
+ erts_printf("unknown ");
+ }
#else
- erts_printf("type=unknown ");
+ erts_printf("type=unknown ");
#endif
- if (state->type == ERTS_EV_TYPE_DRV_SEL) {
- erts_printf("driver_select ");
-
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if (internal) {
- erts_printf("internal ");
- err = 1;
- }
-
- if (cio_events == ep_events) {
- erts_printf("ev=");
- if (print_events(cio_events) != 0)
- err = 1;
- }
- else {
- ErtsPollEvents ev = cio_events;
-#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- if (state->flags & ERTS_EV_FLAG_DEFER_IN_EV)
- ev &= ~ERTS_POLL_EV_IN;
- if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)
- ev &= ~ERTS_POLL_EV_OUT;
-#endif
- if (ev != ep_events)
- err = 1;
- erts_printf("cio_ev=");
- print_events(cio_events);
- erts_printf(" ep_ev=");
- print_events(ep_events);
- }
-#else
- if (print_events(cio_events) != 0)
- err = 1;
-#endif
- erts_printf(" ");
- if (cio_events & ERTS_POLL_EV_IN) {
- Eterm id = state->driver.select->inport;
- if (is_nil(id)) {
- erts_printf("inport=none inname=none indrv=none ");
- err = 1;
- }
- else {
- ErtsPortNames *pnp = erts_get_port_names(id, ERTS_INVALID_ERL_DRV_PORT);
- erts_printf(" inport=%T inname=%s indrv=%s ",
- id,
- pnp->name ? pnp->name : "unknown",
- (pnp->driver_name
- ? pnp->driver_name
- : "unknown"));
- erts_free_port_names(pnp);
- }
- }
- if (cio_events & ERTS_POLL_EV_OUT) {
- Eterm id = state->driver.select->outport;
- if (is_nil(id)) {
- erts_printf("outport=none outname=none outdrv=none ");
- err = 1;
- }
- else {
- ErtsPortNames *pnp = erts_get_port_names(id, ERTS_INVALID_ERL_DRV_PORT);
- erts_printf(" outport=%T outname=%s outdrv=%s ",
- id,
- pnp->name ? pnp->name : "unknown",
- (pnp->driver_name
- ? pnp->driver_name
- : "unknown"));
- erts_free_port_names(pnp);
- }
- }
- }
- else if (state->type == ERTS_EV_TYPE_NIF) {
- ErtsResource* r;
- erts_printf("enif_select ");
+ if (state->type == ERTS_EV_TYPE_DRV_SEL) {
+ erts_printf("driver_select ");
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if (internal) {
- erts_printf("internal ");
- err = 1;
- }
-
+ if (internal) {
+ erts_printf("internal ");
+ err = 1;
+ }
+ if (aio_events == cio_events) {
if (cio_events == ep_events) {
erts_printf("ev=");
if (print_events(cio_events) != 0)
err = 1;
}
else {
- err = 1;
+ ErtsPollEvents ev = cio_events;
+ if (ev != ep_events && ep_events != ERTS_POLL_EV_NONE)
+ err = 1;
erts_printf("cio_ev=");
print_events(cio_events);
erts_printf(" ep_ev=");
print_events(ep_events);
}
+ } else {
+ erts_printf("cio_ev=");
+ print_events(cio_events);
+ erts_printf(" aio_ev=");
+ print_events(aio_events);
+ if ((aio_events != ep_events && ep_events != ERTS_POLL_EV_NONE) ||
+ (aio_events != 0 && ep_events == ERTS_POLL_EV_NONE)) {
+ erts_printf(" ep_ev=");
+ print_events(ep_events);
+ err = 1;
+ }
+ }
#else
+ if (print_events(cio_events) != 0)
+ err = 1;
+#endif
+ erts_printf(" ");
+ if (cio_events & ERTS_POLL_EV_IN) {
+ Eterm id = state->driver.select->inport;
+ if (is_nil(id)) {
+ erts_printf("inport=none inname=none indrv=none ");
+ err = 1;
+ }
+ else {
+ ErtsPortNames *pnp = erts_get_port_names(id, ERTS_INVALID_ERL_DRV_PORT);
+ erts_printf(" inport=%T inname=%s indrv=%s ",
+ id,
+ pnp->name ? pnp->name : "unknown",
+ (pnp->driver_name
+ ? pnp->driver_name
+ : "unknown"));
+ erts_free_port_names(pnp);
+ }
+ }
+ if (cio_events & ERTS_POLL_EV_OUT) {
+ Eterm id = state->driver.select->outport;
+ if (is_nil(id)) {
+ erts_printf("outport=none outname=none outdrv=none ");
+ err = 1;
+ }
+ else {
+ ErtsPortNames *pnp = erts_get_port_names(id, ERTS_INVALID_ERL_DRV_PORT);
+ erts_printf(" outport=%T outname=%s outdrv=%s ",
+ id,
+ pnp->name ? pnp->name : "unknown",
+ (pnp->driver_name
+ ? pnp->driver_name
+ : "unknown"));
+ erts_free_port_names(pnp);
+ }
+ }
+ }
+ else if (state->type == ERTS_EV_TYPE_NIF) {
+ ErtsResource* r;
+ erts_printf("enif_select ");
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ if (internal) {
+ erts_printf("internal ");
+ err = 1;
+ }
+
+ if (cio_events == ep_events) {
+ erts_printf("ev=");
if (print_events(cio_events) != 0)
err = 1;
+ }
+ else {
+ err = 1;
+ erts_printf("cio_ev=");
+ print_events(cio_events);
+ erts_printf(" ep_ev=");
+ print_events(ep_events);
+ }
+#else
+ if (print_events(cio_events) != 0)
+ err = 1;
#endif
- erts_printf(" inpid=%T dd_cnt=%b32d", state->driver.nif->in.pid,
- state->driver.nif->in.ddeselect_cnt);
- erts_printf(" outpid=%T dd_cnt=%b32d", state->driver.nif->out.pid,
- state->driver.nif->out.ddeselect_cnt);
- r = state->driver.stop.resource;
- erts_printf(" resource=%p(%T:%T)", r, r->type->module, r->type->name);
+ erts_printf(" inpid=%T", state->driver.nif->in.pid);
+ erts_printf(" outpid=%T", state->driver.nif->out.pid);
+ r = state->driver.stop.resource;
+ erts_printf(" resource=%p(%T:%T)", r, r->type->module, r->type->name);
+ }
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ else if (internal) {
+ erts_printf("internal ");
+ if (cio_events) {
+ err = 1;
+ erts_printf("cio_ev=");
+ print_events(cio_events);
+ }
+ if (ep_events) {
+ erts_printf("ep_ev=");
+ print_events(ep_events);
}
+ }
+#endif
+ else {
+ err = 1;
+ erts_printf("control_type=%d ", (int)state->type);
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- else if (internal) {
- erts_printf("internal ");
- if (cio_events) {
- err = 1;
- erts_printf("cio_ev=");
- print_events(cio_events);
- }
- if (ep_events) {
- erts_printf("ep_ev=");
- print_events(ep_events);
- }
- }
+ if (cio_events == ep_events) {
+ erts_printf("ev=");
+ print_events(cio_events);
+ }
+ else {
+ erts_printf("cio_ev="); print_events(cio_events);
+ erts_printf(" ep_ev="); print_events(ep_events);
+ }
+#else
+ erts_printf("ev=0x%b32x", (Uint32) cio_events);
#endif
- else {
- err = 1;
- erts_printf("control_type=%d ", (int)state->type);
+ }
+
+ erts_printf(" flags="); print_flags(state->flags);
+ if (err) {
+ erts_printf(" ERROR");
+ }
+ erts_printf("\r\n");
+ return err;
+}
+
+static void doit_erts_check_io_debug(void *vstate, void *vcounters)
+{
+ ErtsDrvEventState *state = (ErtsDrvEventState *) vstate;
+ IterDebugCounters *counters = (IterDebugCounters *) vcounters;
+ int internal = 0;
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- if (cio_events == ep_events) {
- erts_printf("ev=");
- print_events(cio_events);
- }
- else {
- erts_printf("cio_ev="); print_events(cio_events);
- erts_printf(" ep_ev="); print_events(ep_events);
- }
+ ErtsSysFdType fd = state->fd;
+ ErtsPollEvents ep_events = counters->epep[(int) fd];
#else
- erts_printf("ev=0x%b32x", (Uint32) cio_events);
+ ErtsPollEvents ep_events = ERTS_POLL_EV_NONE;
#endif
- }
-
- erts_printf(" flags="); print_flags(state->flags);
- if (err) {
+ if (state->driver.select) {
+ counters->no_driver_select_structs++;
+ ASSERT(state->events || (ep_events != 0 && ep_events != ERTS_POLL_EV_NONE));
+ }
+ if (state->driver.nif) {
+ counters->no_enif_select_structs++;
+ ASSERT(state->events || (ep_events != 0 && ep_events != ERTS_POLL_EV_NONE));
+ }
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ if (state->events || (ep_events != 0 && ep_events != ERTS_POLL_EV_NONE)) {
+ if (ep_events & ERTS_POLL_EV_NVAL) {
+ ep_events &= ~ERTS_POLL_EV_NVAL;
+ internal = 1;
+ counters->internal_fds++;
+ }
+ else
+ counters->used_fds++;
+#else
+ if (state->events) {
+ counters->used_fds++;
+#endif
+ if (erts_debug_print_checkio_state(state, ep_events, internal)) {
counters->num_errors++;
- erts_printf(" ERROR");
}
- erts_printf("\n");
}
}
-
+
+/* ciodpi can be NULL when called from etp-commands */
int
-ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
+erts_check_io_debug(ErtsCheckIoDebugInfo *ciodip)
{
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
int fd, len, i;
@@ -2758,12 +2604,10 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
ErtsDrvEventState null_des;
- null_des.pollset = NULL;
null_des.driver.select = NULL;
null_des.driver.nif = NULL;
null_des.driver.stop.drv_ptr = NULL;
null_des.events = 0;
- null_des.remove_cnt = 0;
null_des.type = ERTS_EV_TYPE_NONE;
null_des.flags = 0;
@@ -2771,24 +2615,36 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
sizeof(ErtsPollEvents)*drv_ev_state.max_fds);
#endif
- erts_printf("--- fds in pollset --------------------------------------\n");
#if defined(ERTS_ENABLE_LOCK_CHECK)
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
#endif
- erts_thr_progress_block(); /* stop the world to avoid messy locking */
+ if (ciodip)
+ erts_thr_progress_block(); /* stop the world to avoid messy locking */
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
len = erts_atomic_read_nob(&drv_ev_state.len);
- for (i = 0; i < erts_no_schedulers; i++) {
- ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollsetv[i].ps,
- counters.epep,
- drv_ev_state.max_fds);
+#if ERTS_POLL_USE_FALLBACK
+ erts_printf("--- fds in flbk pollset ---------------------------------\n");
+ erts_poll_get_selected_events_flbk(get_fallback(), counters.epep,
+ drv_ev_state.max_fds);
+ for (fd = 0; fd < len; fd++) {
+ if (drv_ev_state.v[fd].flags & ERTS_EV_FLAG_FALLBACK)
+ doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters);
+ }
+#endif
+ erts_printf("--- fds in pollset --------------------------------------\n");
+
+ for (i = 0; i < erts_no_pollsets; i++) {
+ erts_poll_get_selected_events(pollsetv[i],
+ counters.epep,
+ drv_ev_state.max_fds);
for (fd = 0; fd < len; fd++) {
- if (drv_ev_state.v[fd].pollset == &pollsetv[i])
- doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters);
+ if (!(drv_ev_state.v[fd].flags & ERTS_EV_FLAG_FALLBACK)
+ && get_pollset_id(fd) == i)
+ doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters);
}
}
for (fd = len ; fd < drv_ev_state.max_fds; fd++) {
@@ -2799,11 +2655,15 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug,
&counters);
#endif
- erts_thr_progress_unblock();
- ciodip->no_used_fds = counters.used_fds;
- ciodip->no_driver_select_structs = counters.no_driver_select_structs;
- ciodip->no_enif_select_structs = counters.no_enif_select_structs;
+ if (ciodip)
+ erts_thr_progress_unblock();
+
+ if (ciodip) {
+ ciodip->no_used_fds = counters.used_fds;
+ ciodip->no_driver_select_structs = counters.no_driver_select_structs;
+ ciodip->no_enif_select_structs = counters.no_enif_select_structs;
+ }
erts_printf("\n");
erts_printf("used fds=%d\n", counters.used_fds);
@@ -2822,97 +2682,19 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
}
#ifdef ERTS_ENABLE_LOCK_COUNT
-void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable) {
+void erts_lcnt_update_cio_locks(int enable) {
+ int i;
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
erts_lcnt_enable_hash_lock_count(&drv_ev_state.tab, ERTS_LOCK_FLAGS_CATEGORY_IO, enable);
#else
(void)enable;
#endif
-}
-#endif /* ERTS_ENABLE_LOCK_COUNT */
-
-#ifdef ERTS_ENABLE_KERNEL_POLL
-# ifdef ERTS_KERNEL_POLL_VERSION
-
-/*
- * Compile these only once for kp/nkp
- */
-void erts_init_check_io(void)
-{
- if (erts_use_kernel_poll)
- erts_init_check_io_kp();
- else
- erts_init_check_io_nkp();
-}
-
-int
-driver_select(ErlDrvPort port, ErlDrvEvent event, int mode, int on)
-{
- return (*erts_io_funcs.select)(port, event, mode, on);
-}
-
-int enif_select(ErlNifEnv* env, ErlNifEvent event,
- enum ErlNifSelectFlags flags, void* obj, const ErlNifPid* pid, Eterm ref)
-{
- return (*erts_io_funcs.enif_select)(env, event, flags, obj, pid, ref);
-}
-
-struct pollset_info* erts_get_pollset(int sched_nr)
-{
- return (*erts_io_funcs.get_pollset)(sched_nr);
-}
-
-void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp)
-{
- erts_io_funcs.notify_port_task_executed(pthp);
-}
-
-void erts_check_io(int do_wait)
-{
- erts_io_funcs.check_io(do_wait);
-}
-
-Uint erts_check_io_size(void)
-{
- return erts_io_funcs.size();
-}
-
-
-Eterm erts_check_io_info(void *p)
-{
- return (*erts_io_funcs.info)(p);
-}
-
-int erts_check_io_max_files(void)
-{
- return erts_io_funcs.max_files();
-}
-
-int
-erts_check_io_debug(ErtsCheckIoDebugInfo *ip)
-{
- return (*erts_io_funcs.check_io_debug)(ip);
-}
-
-void erts_check_io_interrupt(struct pollset_info* psi, int set)
-{
- erts_io_funcs.check_io_interrupt(psi, set);
-}
-
-void erts_check_io_interrupt_timed(struct pollset_info* psi, int set,
- ErtsMonotonicTime timeout_time)
-{
- erts_io_funcs.check_io_interrupt_tmd(psi, set, timeout_time);
-}
-
-#ifdef ERTS_ENABLE_LOCK_COUNT
-void erts_lcnt_update_cio_locks(int enable)
-{
- erts_io_funcs.lcnt_update_cio_locks(enable);
-}
+#if ERTS_POLL_USE_FALLBACK
+ erts_lcnt_enable_pollset_lock_count_flbk(get_fallback(), enable);
#endif
-#endif /* ERTS_KERNEL_POLL_VERSION */
-
-#endif /* !ERTS_ENABLE_KERNEL_POLL */
+ for (i = 0; i < erts_no_pollsets; i++)
+ erts_lcnt_enable_pollset_lock_count(pollsetv[i], enable);
+}
+#endif /* ERTS_ENABLE_LOCK_COUNT */