aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/sys/win32/erl_poll.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/sys/win32/erl_poll.c')
-rw-r--r--erts/emulator/sys/win32/erl_poll.c259
1 files changed, 70 insertions, 189 deletions
diff --git a/erts/emulator/sys/win32/erl_poll.c b/erts/emulator/sys/win32/erl_poll.c
index 7a87e81141..24c8c7bbc7 100644
--- a/erts/emulator/sys/win32/erl_poll.c
+++ b/erts/emulator/sys/win32/erl_poll.c
@@ -34,6 +34,7 @@
*/
/*#define HARDDEBUG */
+/*#define HARDTRACE */
#ifdef HARDDEBUG
#ifdef HARDTRACE
#define HARDTRACEF(X) my_debug_printf##X
@@ -50,7 +51,7 @@ static void my_debug_printf(char *fmt, ...)
va_start(args, fmt);
erts_vsnprintf(buffer,1024,fmt,args);
va_end(args);
- erts_fprintf(stderr,"%s\r\n",buffer);
+ erts_printf("%s\r\n",buffer);
}
#else
#define HARDTRACEF(X)
@@ -278,16 +279,13 @@ struct erts_pollset {
Waiter** waiter;
int allocated_waiters; /* Size ow waiter array */
int num_waiters; /* Number of waiter threads. */
- int restore_events; /* Tells us to restore waiters events
- next time around */
HANDLE event_io_ready; /* To be used when waiting for io */
/* These are used to wait for workers to enter standby */
volatile int standby_wait_counter; /* Number of threads to wait for */
CRITICAL_SECTION standby_crit; /* CS to guard the counter */
- HANDLE standby_wait_event; /* Event signalled when counte == 0 */
+ HANDLE standby_wait_event; /* Event signalled when counter == 0 */
erts_atomic32_t wakeup_state;
erts_mtx_t mtx;
- erts_atomic64_t timeout_time;
};
@@ -352,39 +350,19 @@ do { \
wait_standby(PS); \
} while(0)
-static ERTS_INLINE void
-init_timeout_time(ErtsPollSet ps)
-{
- erts_atomic64_init_nob(&ps->timeout_time,
- (erts_aint64_t) ERTS_MONOTONIC_TIME_MAX);
-}
-
-static ERTS_INLINE void
-set_timeout_time(ErtsPollSet ps, ErtsMonotonicTime time)
-{
- erts_atomic64_set_relb(&ps->timeout_time,
- (erts_aint64_t) time);
-}
-
-static ERTS_INLINE ErtsMonotonicTime
-get_timeout_time(ErtsPollSet ps)
-{
- return (ErtsMonotonicTime) erts_atomic64_read_acqb(&ps->timeout_time);
-}
-
#define ERTS_POLL_NOT_WOKEN ((erts_aint32_t) 0)
#define ERTS_POLL_WOKEN_IO_READY ((erts_aint32_t) 1)
#define ERTS_POLL_WOKEN_INTR ((erts_aint32_t) 2)
#define ERTS_POLL_WOKEN_TIMEDOUT ((erts_aint32_t) 3)
static ERTS_INLINE int
-is_io_ready(ErtsPollSet ps)
+is_io_ready(ErtsPollSet *ps)
{
return erts_atomic32_read_nob(&ps->wakeup_state) == ERTS_POLL_WOKEN_IO_READY;
}
static ERTS_INLINE void
-woke_up(ErtsPollSet ps)
+woke_up(ErtsPollSet *ps)
{
if (erts_atomic32_read_nob(&ps->wakeup_state) == ERTS_POLL_NOT_WOKEN)
erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
@@ -407,7 +385,7 @@ woke_up(ErtsPollSet ps)
}
static ERTS_INLINE int
-wakeup_cause(ErtsPollSet ps)
+wakeup_cause(ErtsPollSet *ps)
{
int res;
erts_aint32_t wakeup_state = erts_atomic32_read_acqb(&ps->wakeup_state);
@@ -430,46 +408,8 @@ wakeup_cause(ErtsPollSet ps)
return res;
}
-static ERTS_INLINE DWORD
-poll_wait_timeout(ErtsPollSet ps, ErtsMonotonicTime timeout_time)
-{
- ErtsMonotonicTime current_time, diff_time, timeout;
-
- if (timeout_time == ERTS_POLL_NO_TIMEOUT) {
- no_timeout:
- set_timeout_time(ps, ERTS_MONOTONIC_TIME_MIN);
- woke_up(ps);
- return (DWORD) 0;
- }
-
- current_time = erts_get_monotonic_time(NULL);
- diff_time = timeout_time - current_time;
- if (diff_time <= 0)
- goto no_timeout;
-
- /* Round up to nearest milli second */
- timeout = (ERTS_MONOTONIC_TO_MSEC(diff_time - 1) + 1);
- if (timeout > INT_MAX)
- timeout = INT_MAX; /* Also prevents DWORD overflow */
-
- set_timeout_time(ps, current_time + ERTS_MSEC_TO_MONOTONIC(timeout));
-
- ResetEvent(ps->event_io_ready);
- /*
- * Since we don't know the internals of ResetEvent() we issue
- * a memory barrier as a safety precaution ensuring that
- * the load of wakeup_state wont be reordered with stores made
- * by ResetEvent().
- */
- ERTS_THR_MEMORY_BARRIER;
- if (erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN)
- return (DWORD) 0;
-
- return (DWORD) timeout;
-}
-
static ERTS_INLINE void
-wake_poller(ErtsPollSet ps, int io_ready)
+wake_poller(ErtsPollSet *ps, int io_ready)
{
erts_aint32_t wakeup_state;
if (io_ready) {
@@ -504,13 +444,13 @@ wake_poller(ErtsPollSet ps, int io_ready)
}
static ERTS_INLINE void
-reset_io_ready(ErtsPollSet ps)
+reset_io_ready(ErtsPollSet *ps)
{
erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
}
static ERTS_INLINE void
-restore_io_ready(ErtsPollSet ps)
+restore_io_ready(ErtsPollSet *ps)
{
erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_IO_READY);
}
@@ -520,13 +460,13 @@ restore_io_ready(ErtsPollSet ps)
* notifying a poller thread about I/O ready.
*/
static ERTS_INLINE void
-notify_io_ready(ErtsPollSet ps)
+notify_io_ready(ErtsPollSet *ps)
{
wake_poller(ps, 1);
}
static ERTS_INLINE void
-reset_interrupt(ErtsPollSet ps)
+reset_interrupt(ErtsPollSet *ps)
{
/* We need to keep io-ready if set */
erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
@@ -543,12 +483,12 @@ reset_interrupt(ErtsPollSet ps)
}
static ERTS_INLINE void
-set_interrupt(ErtsPollSet ps)
+set_interrupt(ErtsPollSet *ps)
{
wake_poller(ps, 0);
}
-static void setup_standby_wait(ErtsPollSet ps, int num_threads)
+static void setup_standby_wait(ErtsPollSet *ps, int num_threads)
{
EnterCriticalSection(&(ps->standby_crit));
ps->standby_wait_counter = num_threads;
@@ -556,7 +496,7 @@ static void setup_standby_wait(ErtsPollSet ps, int num_threads)
LeaveCriticalSection(&(ps->standby_crit));
}
-static void signal_standby(ErtsPollSet ps)
+static void signal_standby(ErtsPollSet *ps)
{
EnterCriticalSection(&(ps->standby_crit));
--(ps->standby_wait_counter);
@@ -570,7 +510,7 @@ static void signal_standby(ErtsPollSet ps)
LeaveCriticalSection(&(ps->standby_crit));
}
-static void wait_standby(ErtsPollSet ps)
+static void wait_standby(ErtsPollSet *ps)
{
WaitForSingleObject(ps->standby_wait_event,INFINITE);
}
@@ -638,7 +578,7 @@ static void consistency_check(Waiter* w)
#endif
-static void new_waiter(ErtsPollSet ps)
+static void new_waiter(ErtsPollSet *ps)
{
register Waiter* w;
DWORD tid; /* Id for thread. */
@@ -732,7 +672,7 @@ static void *break_waiter(void *param)
static void *threaded_waiter(void *param)
{
register Waiter* w = (Waiter *) param;
- ErtsPollSet ps = (ErtsPollSet) w->xdata;
+ ErtsPollSet *ps = (ErtsPollSet*) w->xdata;
#ifdef HARD_POLL_DEBUG2
HANDLE oold_fired[64];
int num_oold_fired;
@@ -835,9 +775,9 @@ event_happened:
ASSERT(i >= WAIT_OBJECT_0+1);
i -= WAIT_OBJECT_0;
ASSERT(i >= 1);
- w->active_events--;
HARDDEBUGF(("i = %d, a,h,t = %d,%d,%d",i,
w->active_events, w->highwater, w->total_events));
+ w->active_events--;
#ifdef HARD_POLL_DEBUG2
fired[num_fired++] = w->events[i];
#endif
@@ -867,7 +807,7 @@ event_happened:
* The actual adding and removing from pollset utilities
*/
-static int set_driver_select(ErtsPollSet ps, HANDLE event, ErtsPollEvents mode)
+static int set_driver_select(ErtsPollSet *ps, HANDLE event, ErtsPollEvents mode)
{
int i;
int best_waiter = -1; /* The waiter with lowest number of events. */
@@ -957,13 +897,13 @@ static int set_driver_select(ErtsPollSet ps, HANDLE event, ErtsPollEvents mode)
#endif
erts_mtx_unlock(&w->mtx);
START_WAITER(ps,w);
- HARDDEBUGF(("add select %d %d %d %d",best_waiter,
+ HARDDEBUGF(("%d: add select %d %d %d %d", event, best_waiter,
w->active_events,w->highwater,w->total_events));
return mode;
}
-static int cancel_driver_select(ErtsPollSet ps, HANDLE event)
+static int cancel_driver_select(ErtsPollSet *ps, HANDLE event)
{
int i;
@@ -1018,7 +958,7 @@ static int cancel_driver_select(ErtsPollSet ps, HANDLE event)
* Interface functions
*/
-void erts_poll_interrupt(ErtsPollSet ps, int set /* bool */)
+void erts_poll_interrupt(ErtsPollSet *ps, int set /* bool */)
{
HARDTRACEF(("In erts_poll_interrupt(%d)",set));
if (!set)
@@ -1028,35 +968,23 @@ void erts_poll_interrupt(ErtsPollSet ps, int set /* bool */)
HARDTRACEF(("Out erts_poll_interrupt(%d)",set));
}
-void erts_poll_interrupt_timed(ErtsPollSet ps,
- int set /* bool */,
- ErtsMonotonicTime timeout_time)
-{
- HARDTRACEF(("In erts_poll_interrupt_timed(%d,%ld)",set,timeout_time));
- if (!set)
- reset_interrupt(ps);
- else if (get_timeout_time(ps) > timeout_time)
- set_interrupt(ps);
- HARDTRACEF(("Out erts_poll_interrupt_timed"));
-}
-
/*
* Windows is special, there is actually only one event type, and
* the only difference between ERTS_POLL_EV_IN and ERTS_POLL_EV_OUT
* is which driver callback will eventually be called.
*/
-static ErtsPollEvents do_poll_control(ErtsPollSet ps,
- ErtsSysFdType fd,
- ErtsPollEvents pe,
- int on /* bool */)
+static ErtsPollEvents do_poll_control(ErtsPollSet *ps,
+ ErtsSysFdType fd,
+ ErtsPollOp op,
+ ErtsPollEvents pe)
{
HANDLE event = (HANDLE) fd;
ErtsPollEvents mode;
ErtsPollEvents result;
ASSERT(event != INVALID_HANDLE_VALUE);
- if (on) {
+ if (op != ERTS_POLL_OP_DEL) {
if (pe & ERTS_POLL_EV_IN || !(pe & ERTS_POLL_EV_OUT )) {
mode = ERTS_POLL_EV_IN;
} else {
@@ -1069,51 +997,30 @@ static ErtsPollEvents do_poll_control(ErtsPollSet ps,
return result;
}
-ErtsPollEvents erts_poll_control(ErtsPollSet ps,
+ErtsPollEvents erts_poll_control(ErtsPollSet *ps,
ErtsSysFdType fd,
+ ErtsPollOp op,
ErtsPollEvents pe,
- int on,
int* do_wake) /* In: Wake up polling thread */
/* Out: Poller is woken */
{
ErtsPollEvents result;
- HARDTRACEF(("In erts_poll_control(0x%08X, %u, %d)",(unsigned long) fd, (unsigned) pe, on));
+ HARDTRACEF(("In erts_poll_control(0x%08X, %s, %s)",
+ (unsigned long) fd, op2str(op), ev2str(pe)));
ERTS_POLLSET_LOCK(ps);
- result=do_poll_control(ps,fd,pe,on);
+ result=do_poll_control(ps, fd, op, pe);
ERTS_POLLSET_UNLOCK(ps);
*do_wake = 0; /* Never any need to wake polling threads on windows */
HARDTRACEF(("Out erts_poll_control -> %u",(unsigned) result));
return result;
}
-void erts_poll_controlv(ErtsPollSet ps,
- ErtsPollControlEntry pcev[],
- int len)
-{
- int i;
- int hshur = 0;
- int do_wake = 0;
-
- HARDTRACEF(("In erts_poll_controlv(%d)",len));
- ERTS_POLLSET_LOCK(ps);
-
- for (i = 0; i < len; i++) {
- pcev[i].events = do_poll_control(ps,
- pcev[i].fd,
- pcev[i].events,
- pcev[i].on);
- }
- ERTS_POLLSET_UNLOCK(ps);
- HARDTRACEF(("Out erts_poll_controlv"));
-}
-
-int erts_poll_wait(ErtsPollSet ps,
+int erts_poll_wait(ErtsPollSet *ps,
ErtsPollResFd pr[],
- int *len,
- ErtsMonotonicTime timeout_time)
+ int *len)
{
int no_fds;
- DWORD timeout;
+ DWORD timeout = INFINITE;
EventData* ev;
int res = 0;
int num = 0;
@@ -1124,42 +1031,6 @@ int erts_poll_wait(ErtsPollSet ps,
HARDTRACEF(("In erts_poll_wait"));
ERTS_POLLSET_LOCK(ps);
- if (!is_io_ready(ps) && ps->restore_events) {
- HARDDEBUGF(("Restore events: %d",ps->num_waiters));
- ps->restore_events = 0;
- for (i = 0; i < ps->num_waiters; ++i) {
- Waiter* w = ps->waiter[i];
- erts_mtx_lock(&w->mtx);
- HARDDEBUGF(("Maybe reset %d %d %d %d",i,
- w->active_events,w->highwater,w->total_events));
- if (w->active_events < w->total_events) {
- erts_mtx_unlock(&w->mtx);
- STOP_WAITER(ps,w);
- HARDDEBUGF(("Need reset %d %d %d %d",i,
- w->active_events,w->highwater,w->total_events));
- erts_mtx_lock(&w->mtx);
- /* Need reset, just check that it doesn't have got more to tell */
- if (w->highwater != w->active_events) {
- HARDDEBUGF(("Oups!"));
- /* Oups, got signalled before we took the lock, can't reset */
- if(!is_io_ready(ps)) {
- erts_exit(ERTS_ERROR_EXIT,"Internal error: "
- "Inconsistent io structures in erl_poll.\n");
- }
- START_WAITER(ps,w);
- erts_mtx_unlock(&w->mtx);
- ps->restore_events = 1;
- continue;
- }
- w->active_events = w->highwater = w->total_events;
- START_WAITER(ps,w);
- erts_mtx_unlock(&w->mtx);
- } else {
- erts_mtx_unlock(&w->mtx);
- }
- }
- }
-
no_fds = *len;
#ifdef ERTS_POLL_MAX_RES
@@ -1167,11 +1038,18 @@ int erts_poll_wait(ErtsPollSet ps,
no_fds = ERTS_POLL_MAX_RES;
#endif
- timeout = poll_wait_timeout(ps, timeout_time);
-
- /*HARDDEBUGF(("timeout = %ld",(long) timeout));*/
+ ResetEvent(ps->event_io_ready);
+ /*
+ * Since we don't know the internals of ResetEvent() we issue
+ * a memory barrier as a safety precaution ensuring that
+ * the load of wakeup_state wont be reordered with stores made
+ * by ResetEvent().
+ */
+ ERTS_THR_MEMORY_BARRIER;
+ if (erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN)
+ timeout = (DWORD) 0;
- if (timeout > 0 && !erts_atomic32_read_nob(&break_waiter_state)) {
+ if (!erts_atomic32_read_nob(&break_waiter_state)) {
HANDLE harr[2] = {ps->event_io_ready, break_happened_event};
int num_h = 2;
ERTS_MSACC_PUSH_STATE_M();
@@ -1215,7 +1093,7 @@ int erts_poll_wait(ErtsPollSet ps,
reset_io_ready(ps);
- n = ps->num_waiters;
+ n = ps->num_waiters;
for (i = 0; i < n; i++) {
Waiter* w = ps->waiter[i];
@@ -1241,11 +1119,10 @@ int erts_poll_wait(ErtsPollSet ps,
HARDDEBUGF(("To many FD's to report!"));
goto done;
}
- HARDDEBUGF(("SET! Restore events"));
- ps->restore_events = 1;
HARDDEBUGF(("Report %d,%d",i,j));
- pr[num].fd = (ErtsSysFdType) w->events[j];
- pr[num].events = w->evdata[j]->mode;
+ ERTS_POLL_RES_SET_FD(&pr[num], w->events[j]);
+ ERTS_POLL_RES_SET_EVTS(&pr[num], w->evdata[j]->mode);
+ remove_event_from_set(w, j);
#ifdef HARD_POLL_DEBUG
poll_debug_reported(w->events[j],w->highwater | (j << 16));
poll_debug_reported(w->events[j],first | (last << 16));
@@ -1253,13 +1130,14 @@ int erts_poll_wait(ErtsPollSet ps,
++num;
}
+ w->total_events = w->highwater = w->active_events;
+
#ifdef DEBUG
consistency_check(w);
#endif
erts_mtx_unlock(&w->mtx);
}
done:
- set_timeout_time(ps, ERTS_MONOTONIC_TIME_MAX);
*len = num;
ERTS_POLLSET_UNLOCK(ps);
HARDTRACEF(("Out erts_poll_wait"));
@@ -1274,7 +1152,7 @@ int erts_poll_max_fds(void)
return res;
}
-void erts_poll_info(ErtsPollSet ps,
+void erts_poll_info(ErtsPollSet *ps,
ErtsPollInfo *pip)
{
Uint size = 0;
@@ -1299,16 +1177,12 @@ void erts_poll_info(ErtsPollSet ps,
pip->primary = "WaitForMultipleObjects";
- pip->fallback = NULL;
-
pip->kernel_poll = NULL;
pip->memory_size = size;
pip->poll_set_size = num_events;
- pip->fallback_poll_set_size = 0;
-
pip->lazy_updates = 0;
pip->pending_updates = 0;
@@ -1316,6 +1190,8 @@ void erts_poll_info(ErtsPollSet ps,
pip->batch_updates = 0;
pip->concurrent_updates = 0;
+
+ pip->is_fallback = 0;
ERTS_POLLSET_UNLOCK(ps);
pip->max_fds = erts_poll_max_fds();
@@ -1323,9 +1199,9 @@ void erts_poll_info(ErtsPollSet ps,
}
-ErtsPollSet erts_poll_create_pollset(void)
+ErtsPollSet *erts_poll_create_pollset(int no)
{
- ErtsPollSet ps = SEL_ALLOC(ERTS_ALC_T_POLLSET,
+ ErtsPollSet *ps = SEL_ALLOC(ERTS_ALC_T_POLLSET,
sizeof(struct erts_pollset));
HARDTRACEF(("In erts_poll_create_pollset"));
@@ -1337,17 +1213,15 @@ ErtsPollSet erts_poll_create_pollset(void)
ps->standby_wait_counter = 0;
ps->event_io_ready = CreateManualEvent(FALSE);
ps->standby_wait_event = CreateManualEvent(FALSE);
- ps->restore_events = 0;
erts_atomic32_init_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
erts_mtx_init(&ps->mtx, "pollset", NIL, ERTS_LOCK_FLAGS_CATEGORY_IO);
- init_timeout_time(ps);
HARDTRACEF(("Out erts_poll_create_pollset"));
return ps;
}
-void erts_poll_destroy_pollset(ErtsPollSet ps)
+void erts_poll_destroy_pollset(ErtsPollSet *ps)
{
int i;
HARDTRACEF(("In erts_poll_destroy_pollset"));
@@ -1378,14 +1252,16 @@ void erts_poll_destroy_pollset(ErtsPollSet ps)
/*
* Actually mostly initializes the friend module sys_interrupt...
*/
-void erts_poll_init(void)
+void erts_poll_init(int *concurrent_updates)
{
- erts_tid_t thread;
#ifdef HARD_POLL_DEBUG
poll_debug_init();
#endif
+ if (concurrent_updates)
+ *concurrent_updates = 0;
+
HARDTRACEF(("In erts_poll_init"));
erts_sys_break_event = CreateManualEvent(FALSE);
@@ -1394,21 +1270,26 @@ void erts_poll_init(void)
break_happened_event = CreateManualEvent(FALSE);
erts_atomic32_init_nob(&break_waiter_state, 0);
+ HARDTRACEF(("Out erts_poll_init"));
+}
+
+void erts_poll_late_init(void)
+{
+ erts_tid_t thread;
erts_thr_create(&thread, &break_waiter, NULL, NULL);
ERTS_UNSET_BREAK_REQUESTED;
- HARDTRACEF(("Out erts_poll_init"));
}
/*
* Non windows friendly interface, not used when fd's are not continous
*/
-void erts_poll_get_selected_events(ErtsPollSet ps,
+void erts_poll_get_selected_events(ErtsPollSet *ps,
ErtsPollEvents ev[],
int len)
{
int i;
HARDTRACEF(("In erts_poll_get_selected_events"));
for (i = 0; i < len; ++i)
- ev[i] = 0;
+ ev[i] = ERTS_POLL_EV_NONE;
HARDTRACEF(("Out erts_poll_get_selected_events"));
}