diff options
Diffstat (limited to 'erts/emulator/sys/win32/erl_poll.c')
-rw-r--r-- | erts/emulator/sys/win32/erl_poll.c | 1361 |
1 files changed, 1361 insertions, 0 deletions
diff --git a/erts/emulator/sys/win32/erl_poll.c b/erts/emulator/sys/win32/erl_poll.c new file mode 100644 index 0000000000..d816cc2c07 --- /dev/null +++ b/erts/emulator/sys/win32/erl_poll.c @@ -0,0 +1,1361 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2007-2009. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#define WANT_NONBLOCKING + +#include "sys.h" +#include "erl_alloc.h" +#include "erl_poll.h" + +/* + * Some debug macros + */ + +/*#define HARDDEBUG */ +#ifdef HARDDEBUG +#ifdef HARDTRACE +#define HARDTRACEF(X) my_debug_printf##X +#else +#define HARDTRACEF(X) +#endif + +#define HARDDEBUGF(X) my_debug_printf##X +static void my_debug_printf(char *fmt, ...) +{ + char buffer[1024]; + va_list args; + + va_start(args, fmt); + erts_vsnprintf(buffer,1024,fmt,args); + va_end(args); + erts_fprintf(stderr,"%s\r\n",buffer); +} +#else +#define HARDTRACEF(X) +#define HARDDEBUGF(X) +#endif + +#ifdef DEBUG +#define NoMansLandFill 0xFD /* fill no-man's land with this */ +#define DeadLandFill 0xDD /* fill free objects with this */ +#define CleanLandFill 0xCD /* fill new objects with this */ + +static void consistency_check(struct _Waiter* w); +static void* debug_alloc(ErtsAlcType_t, Uint); +static void* debug_realloc(ErtsAlcType_t, void *, Uint, Uint); + +# define SEL_ALLOC debug_alloc +# define SEL_REALLOC debug_realloc +# define SEL_FREE erts_free + +static void *debug_alloc(ErtsAlcType_t type, Uint size) +{ + void* p = erts_alloc(type, size); + memset(p, CleanLandFill, size); + return p; +} + +static void *debug_realloc(ErtsAlcType_t type, void *ptr, Uint prev_size, + Uint size) +{ + void *p; + size_t fill_size; + void *fill_ptr; + + if (prev_size > size) { + size_t fill_size = (size_t) (prev_size - size); + void *fill_ptr = (void *) (((char *) ptr) + size); + memset(fill_ptr, NoMansLandFill, fill_size); + } + + p = erts_realloc(type, ptr, size); + + if (size > prev_size) { + size_t fill_size = (size_t) (size - prev_size); + void *fill_ptr = (void *) (((char *) p) + prev_size); + memset(fill_ptr, CleanLandFill, fill_size); + } + + return p; +} +#else +# define SEL_ALLOC erts_alloc +# define SEL_REALLOC realloc_wrap +# define SEL_FREE erts_free + +static ERTS_INLINE void * +realloc_wrap(ErtsAlcType_t t, void *p, Uint ps, Uint s) +{ + return erts_realloc(t, p, s); +} +#endif + + +#ifdef HARD_POLL_DEBUG +#define OP_SELECT 1 +#define OP_DESELECT 2 +#define OP_FIRED 3 +#define OP_READ_BEGIN 4 +#define OP_READ_DONE 5 +#define OP_WRITE_BEGIN 6 +#define OP_WRITE_DONE 7 +#define OP_REPORTED 8 +#define OP_DIED 9 +#define OP_ASYNC_INIT 10 +#define OP_ASYNC_IMMED 11 +#define OP_FD_MOVED 12 + +static struct { + int op; + ErtsSysFdType active; + int xdata; +} debug_save_ops[1024]; + +static int num_debug_save_ops = 0; + +static ErtsSysFdType active_debug_fd; +static int active_debug_fd_set = 0; + +static erts_mtx_t save_ops_mtx; + +static void poll_debug_init(void) +{ + erts_mtx_init(&save_ops_mtx, "save_ops_lock"); +} + +void poll_debug_set_active_fd(ErtsSysFdType fd) +{ + erts_mtx_lock(&save_ops_mtx); + active_debug_fd_set = 1; + active_debug_fd = fd; + erts_mtx_unlock(&save_ops_mtx); +} + +static void do_save_op(ErtsSysFdType fd, int op, int xdata) +{ + erts_mtx_lock(&save_ops_mtx); + if (fd == active_debug_fd && num_debug_save_ops < 1024) { + int x = num_debug_save_ops++; + debug_save_ops[x].op = op; + debug_save_ops[x].active = fd; + debug_save_ops[x].xdata = xdata; + } + erts_mtx_unlock(&save_ops_mtx); +} + +void poll_debug_moved(ErtsSysFdType fd, int s1, int s2) +{ + do_save_op(fd,OP_FD_MOVED,s1 | (s2 << 16)); +} + +void poll_debug_select(ErtsSysFdType fd, int mode) +{ + do_save_op(fd,OP_SELECT,mode); +} + +void poll_debug_deselect(ErtsSysFdType fd) +{ + do_save_op(fd,OP_DESELECT,0); +} + +void poll_debug_fired(ErtsSysFdType fd) +{ + do_save_op(fd,OP_FIRED,0); +} + +void poll_debug_read_begin(ErtsSysFdType fd) +{ + do_save_op(fd,OP_READ_BEGIN,0); +} + +void poll_debug_read_done(ErtsSysFdType fd, int bytes) +{ + do_save_op(fd,OP_READ_DONE,bytes); +} + +void poll_debug_async_initialized(ErtsSysFdType fd) +{ + do_save_op(fd,OP_ASYNC_INIT,0); +} + +void poll_debug_async_immediate(ErtsSysFdType fd, int bytes) +{ + do_save_op(fd,OP_ASYNC_IMMED,bytes); +} + +void poll_debug_write_begin(ErtsSysFdType fd) +{ + do_save_op(fd,OP_WRITE_BEGIN,0); +} + +void poll_debug_write_done(ErtsSysFdType fd, int bytes) +{ + do_save_op(fd,OP_WRITE_DONE,bytes); +} + +void poll_debug_reported(ErtsSysFdType fd, int mode) +{ + do_save_op(fd,OP_REPORTED,mode); +} + +void poll_debug_died(ErtsSysFdType fd) +{ + do_save_op(fd,OP_DIED,0); +} + +#endif /* DEBUG */ + +/* + * End of debug macros + */ + + + +/* + * Handles that we poll, but that are actually signalled from outside + * this module + */ + +extern HANDLE erts_service_event; +extern HANDLE erts_sys_break_event; + + +/* + * The structure we hold for each event (i.e. fd) + */ +typedef struct _EventData { + HANDLE event; /* For convenience. */ + ErtsPollEvents mode; /* The current select mode. */ + struct _EventData *next; /* Next in free or delete lists. */ +} EventData; + +/* + * The structure to represent a waiter thread + */ +typedef struct _Waiter { + HANDLE events[MAXIMUM_WAIT_OBJECTS]; /* The events. */ + EventData* evdata[MAXIMUM_WAIT_OBJECTS]; /* Pointers to associated data. */ + int active_events; /* Number of events to wait for */ + int total_events; /* Total number of events in the arrays. */ + int highwater; /* Events processed up to here */ + EventData evdata_heap[MAXIMUM_WAIT_OBJECTS]; /* Pre-allocated EventDatas */ + EventData* first_free_evdata; /* Index of first free EventData object. */ + HANDLE go_ahead; /* The waiter may continue. (Auto-reset) */ + void *xdata; /* used when thread parameter */ + erts_tid_t this; /* Thread "handle" of this waiter */ + erts_mtx_t mtx; /* Mutex for updating/reading pollset, but the + currently used set require thread stopping + to be updated */ +} Waiter; + +/* + * The structure for a pollset. There can currently be only one... + */ +struct ErtsPollSet_ { + Waiter** waiter; + int allocated_waiters; /* Size ow waiter array */ + int num_waiters; /* Number of waiter threads. */ + erts_atomic_t sys_io_ready; /* Tells us there is I/O ready (already). */ + int restore_events; /* Tells us to restore waiters events + next time around */ + HANDLE event_io_ready; /* To be used when waiting for io */ + /* These are used to wait for workers to enter standby */ + volatile int standby_wait_counter; /* Number of threads to wait for */ + CRITICAL_SECTION standby_crit; /* CS to guard the counter */ + HANDLE standby_wait_event; /* Event signalled when counte == 0 */ +#ifdef ERTS_SMP + erts_smp_atomic_t woken; + erts_smp_mtx_t mtx; + erts_smp_atomic_t interrupt; +#endif + erts_smp_atomic_t timeout; +}; + +#ifdef ERTS_SMP + +#define ERTS_POLLSET_LOCK(PS) \ + erts_smp_mtx_lock(&(PS)->mtx) +#define ERTS_POLLSET_UNLOCK(PS) \ + erts_smp_mtx_unlock(&(PS)->mtx) +#define ERTS_POLLSET_SET_POLLED_CHK(PS) \ + ((int) erts_smp_atomic_xchg(&(PS)->polled, (long) 1)) +#define ERTS_POLLSET_SET_POLLED(PS) \ + erts_smp_atomic_set(&(PS)->polled, (long) 1) +#define ERTS_POLLSET_UNSET_POLLED(PS) \ + erts_smp_atomic_set(&(PS)->polled, (long) 0) +#define ERTS_POLLSET_IS_POLLED(PS) \ + ((int) erts_smp_atomic_read(&(PS)->polled)) +#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) \ + ((int) erts_smp_atomic_xchg(&(PS)->woken, (long) 1)) +#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) \ + erts_smp_atomic_set(&(PS)->woken, (long) 1) +#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) \ + erts_smp_atomic_set(&(PS)->woken, (long) 0) +#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) \ + ((int) erts_smp_atomic_read(&(PS)->woken)) + +#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) \ + ((int) erts_smp_atomic_xchg(&(PS)->interrupt, (long) 0)) +#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) \ + erts_smp_atomic_set(&(PS)->interrupt, (long) 0) +#define ERTS_POLLSET_SET_INTERRUPTED(PS) \ + erts_smp_atomic_set(&(PS)->interrupt, (long) 1) +#define ERTS_POLLSET_IS_INTERRUPTED(PS) \ + ((int) erts_smp_atomic_read(&(PS)->interrupt)) + +#else + +#define ERTS_POLLSET_LOCK(PS) +#define ERTS_POLLSET_UNLOCK(PS) +#define ERTS_POLLSET_SET_POLLED_CHK(PS) 0 +#define ERTS_POLLSET_UNSET_POLLED(PS) +#define ERTS_POLLSET_IS_POLLED(PS) 0 +#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) 1 +#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) +#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) +#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) 1 + + +#endif + +/* + * While atomics are not yet implemented for windows in the common library... + * + * MSDN doc states that SMP machines and old compilers require + * InterLockedExchange to properly read and write interlocked + * variables, otherwise the processors might reschedule + * the access and order of atomics access is destroyed... + * While they only mention it in white-papers, the problem + * in VS2003 is due to the IA64 arch, so we can still count + * on the CPU not rescheduling the access to volatile in X86 arch using + * even the slightly older compiler... + * + * So here's (hopefully) a subset of the generally working atomic + * variable access... + */ + +#if defined(__GNUC__) +# if defined(__i386__) || defined(__x86_64__) +# define VOLATILE_IN_SEQUENCE 1 +# else +# define VOLATILE_IN_SEQUENCE 0 +# endif +#elif defined(_MSC_VER) +# if _MSC_VER < 1300 +# define VOLATILE_IN_SEQUENCE 0 /* Dont trust really old compilers */ +# else +# if defined(_M_IX86) +# define VOLATILE_IN_SEQUENCE 1 +# else /* I.e. IA64 */ +# if _MSC_VER >= 1400 +# define VOLATILE_IN_SEQUENCE 1 +# else +# define VOLATILE_IN_SEQUENCE 0 +# endif +# endif +# endif +#else +# define VOLATILE_IN_SEQUENCE 0 +#endif + + + +/* + * Communication with sys_interrupt + */ + +#ifdef ERTS_SMP +extern erts_smp_atomic_t erts_break_requested; +#define ERTS_SET_BREAK_REQUESTED \ + erts_smp_atomic_set(&erts_break_requested, (long) 1) +#define ERTS_UNSET_BREAK_REQUESTED \ + erts_smp_atomic_set(&erts_break_requested, (long) 0) +#else +extern volatile int erts_break_requested; +#define ERTS_SET_BREAK_REQUESTED (erts_break_requested = 1) +#define ERTS_UNSET_BREAK_REQUESTED (erts_break_requested = 0) +#endif + +static erts_mtx_t break_waiter_lock; +static HANDLE break_happened_event; +static erts_atomic_t break_waiter_state; +#define BREAK_WAITER_GOT_BREAK 1 +#define BREAK_WAITER_GOT_HALT 2 + + +/* + * Forward declarations + */ + +static void *threaded_waiter(void *param); +static void *break_waiter(void *param); + +/* + * Sychronization macros and functions + */ +#define START_WAITER(PS, w) \ + SetEvent((w)->go_ahead) + +#define STOP_WAITER(PS,w) \ +do { \ + setup_standby_wait((PS),1); \ + SetEvent((w)->events[0]); \ + wait_standby(PS); \ +} while(0) + +#define START_WAITERS(PS) \ +do { \ + int i; \ + for (i = 0; i < (PS)->num_waiters; i++) { \ + SetEvent((PS)->waiter[i]->go_ahead); \ + } \ + } while(0) + +#define STOP_WAITERS(PS) \ +do { \ + int i; \ + setup_standby_wait((PS),(PS)->num_waiters); \ + for (i = 0; i < (PS)->num_waiters; i++) { \ + SetEvent((PS)->waiter[i]->events[0]); \ + } \ + wait_standby(PS); \ + } while(0) + +#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP) + +static ERTS_INLINE int +unset_interrupted_chk(ErtsPollSet ps) +{ + /* This operation isn't atomic, but we have no need at all for an + atomic operation here... */ + int res = ps->interrupt; + ps->interrupt = 0; + return res; +} + +#endif + +#ifdef ERTS_SMP +static ERTS_INLINE void +wake_poller(ErtsPollSet ps) +{ + if (!ERTS_POLLSET_SET_POLLER_WOKEN_CHK(ps)) { + SetEvent(ps->event_io_ready); + } +} +#endif + +static void setup_standby_wait(ErtsPollSet ps, int num_threads) +{ + EnterCriticalSection(&(ps->standby_crit)); + ps->standby_wait_counter = num_threads; + ResetEvent(ps->standby_wait_event); + LeaveCriticalSection(&(ps->standby_crit)); +} + +static void signal_standby(ErtsPollSet ps) +{ + EnterCriticalSection(&(ps->standby_crit)); + --(ps->standby_wait_counter); + if (ps->standby_wait_counter < 0) { + LeaveCriticalSection(&(ps->standby_crit)); + erl_exit(1,"Standby signalled by more threads than expected"); + } + if (!(ps->standby_wait_counter)) { + SetEvent(ps->standby_wait_event); + } + LeaveCriticalSection(&(ps->standby_crit)); +} + +static void wait_standby(ErtsPollSet ps) +{ + WaitForSingleObject(ps->standby_wait_event,INFINITE); +} + +static void remove_event_from_set(Waiter *w, int j) +{ + w->evdata[j]->event = INVALID_HANDLE_VALUE; + w->evdata[j]->mode = 0; + w->evdata[j]->next = w->first_free_evdata; + w->first_free_evdata = w->evdata[j]; + + /* + * If the event is active, we will overwrite it + * with the last active event and make the hole + * the first non-active event. + */ + + if (j < w->active_events) { + w->active_events--; + w->highwater--; + w->total_events--; + w->events[j] = w->events[w->active_events]; + w->evdata[j] = w->evdata[w->active_events]; + w->events[w->active_events] = w->events[w->highwater]; + w->evdata[w->active_events] = w->evdata[w->highwater]; + w->events[w->highwater] = w->events[w->total_events]; + w->evdata[w->highwater] = w->evdata[w->total_events]; + } else if (j < w->highwater) { + w->highwater--; + w->total_events--; + w->events[j] = w->events[w->highwater]; + w->evdata[j] = w->evdata[w->highwater]; + w->events[w->highwater] = w->events[w->total_events]; + w->evdata[w->highwater] = w->evdata[w->total_events]; + } else { + w->total_events--; + w->events[j] = w->events[w->total_events]; + w->evdata[j] = w->evdata[w->total_events]; + } + +#ifdef DEBUG + w->events[w->total_events] = (HANDLE) CleanLandFill; + w->evdata[w->total_events] = (EventData *) CleanLandFill; + consistency_check(w); +#endif +} + +/* + * Thread handling + */ + +#ifdef DEBUG +static void consistency_check(Waiter* w) +{ + int i; + + ASSERT(w->active_events <= w->total_events); + ASSERT(w->evdata[0] == NULL); + + for (i = 1; i < w->total_events; i++) { + ASSERT(w->events[i] == w->evdata[i]->event); + ASSERT(w->evdata[i]->mode != 0); + } +} + +#endif + +static void new_waiter(ErtsPollSet ps) +{ + register Waiter* w; + DWORD tid; /* Id for thread. */ + erts_tid_t thread; + int i; + int tres; + + if (ps->num_waiters == ps->allocated_waiters) { + Uint old_size = sizeof(Waiter *)*ps->allocated_waiters; + ps->allocated_waiters += 64; + ps->waiter = SEL_REALLOC(ERTS_ALC_T_WAITER_OBJ, + (void *) ps->waiter, + old_size, + sizeof(Waiter *) * (ps->allocated_waiters)); + } + + w = (Waiter *) SEL_ALLOC(ERTS_ALC_T_WAITER_OBJ, sizeof(Waiter)); + ps->waiter[ps->num_waiters] = w; + + w->events[0] = CreateAutoEvent(FALSE); + w->evdata[0] = NULL; /* Should never be used. */ + w->active_events = 1; + w->highwater = 1; + w->total_events = 1; + erts_mtx_init(&w->mtx, "pollwaiter"); + + + /* + * Form the free list of EventData objects. + */ + + w->evdata_heap[0].next = 0; /* Last in free list. */ + for (i = 1; i < MAXIMUM_WAIT_OBJECTS; i++) { + w->evdata_heap[i].next = w->evdata_heap+i-1; + } + w->first_free_evdata = w->evdata_heap+MAXIMUM_WAIT_OBJECTS-1; + + /* + * Create the other events. + */ + + w->go_ahead = CreateAutoEvent(FALSE); + + /* + * Create the thread. + */ + w->xdata = ps; + erts_thr_create(&thread, &threaded_waiter, w, NULL); + w->this = thread; + + /* + * Finally, done. + */ + + (ps->num_waiters)++; +} + +static void *break_waiter(void *param) +{ + HANDLE harr[2]; + int i = 0; + harr[i++] = erts_sys_break_event; + if (erts_service_event != NULL) { + harr[i++] = erts_service_event; + } + + for(;;) { + switch (WaitForMultipleObjects(i,harr,FALSE,INFINITE)) { + case WAIT_OBJECT_0: + ResetEvent(harr[0]); + erts_mtx_lock(&break_waiter_lock); + erts_atomic_set(&break_waiter_state,BREAK_WAITER_GOT_BREAK); + SetEvent(break_happened_event); + erts_mtx_unlock(&break_waiter_lock); + break; + case (WAIT_OBJECT_0+1): + ResetEvent(harr[1]); + erts_mtx_lock(&break_waiter_lock); + erts_atomic_set(&break_waiter_state,BREAK_WAITER_GOT_HALT); + SetEvent(break_happened_event); + erts_mtx_unlock(&break_waiter_lock); + break; + default: + erl_exit(1,"Unexpected event in break_waiter"); + } + } +} + +static void *threaded_waiter(void *param) +{ + register Waiter* w = (Waiter *) param; + ErtsPollSet ps = (ErtsPollSet) w->xdata; +#ifdef HARD_POLL_DEBUG2 + HANDLE oold_fired[64]; + int num_oold_fired; + HANDLE old_fired[64]; + int num_old_fired = 0; + HANDLE fired[64]; + int num_fired = 0; + HANDLE errors[1024]; + int num_errors = 0; + HANDLE save_events[64]; + int save_active_events; + int save_total_events; + int save_highwater; +#endif + + again: + WaitForSingleObject(w->go_ahead, INFINITE); + /* Atomic enough when just checking, skip lock */ + if (w->total_events == 0) { + return NULL; + } + if (w->active_events == 0) { + goto again; + } + ASSERT(w->evdata[0] == NULL); +#ifdef HARD_POLL_DEBUG2 + num_oold_fired = num_old_fired; + memcpy(oold_fired,old_fired,num_old_fired*sizeof(HANDLE)); + num_old_fired = num_fired; + memcpy(old_fired,fired,num_fired*sizeof(HANDLE)); + num_fired = 0; +#endif + for (;;) { + int i; + int j; +#ifdef HARD_POLL_DEBUG2 + erts_mtx_lock(&w->mtx); + memcpy(save_events,w->events,w->active_events*sizeof(HANDLE)); + save_active_events = w->active_events; + save_total_events = w->total_events; + save_highwater = w->highwater; + erts_mtx_unlock(&w->mtx); +#endif + i = WaitForMultipleObjects(w->active_events, w->events, FALSE, INFINITE); + switch (i) { + case WAIT_FAILED: + DEBUGF(("Wait failed: %s\n", last_error())); + erts_mtx_lock(&w->mtx); + /* Dont wait for our signal event */ + for (j = 1; j < w->active_events; j++) { + int tmp; + if ((tmp = WaitForSingleObject(w->events[j], 0)) + == WAIT_FAILED) { + DEBUGF(("Invalid handle: i = %d, handle = 0x%0x\n", + j, w->events[j])); +#ifdef HARD_POLL_DEBUG2 + if (num_errors < 1024) + errors[num_errors++] = w->events[j]; +#endif +#ifdef HARD_POLL_DEBUG + poll_debug_died(w->events[j]); +#endif + remove_event_from_set(w,j); +#ifdef DEBUG + consistency_check(w); +#endif + } else if (tmp == WAIT_OBJECT_0) { + i = WAIT_OBJECT_0 + j; + goto event_happened; + } + } + erts_mtx_unlock(&w->mtx); + break; + case WAIT_OBJECT_0: + signal_standby(ps); + goto again; +#ifdef DEBUG + case WAIT_TIMEOUT: + ASSERT(0); +#endif + default: + erts_mtx_lock(&w->mtx); +#ifdef HARD_POLL_DEBUG2 + { + int x = memcmp(save_events,w->events,w->active_events*sizeof(HANDLE)); + ASSERT(x == 0 && save_active_events == w->active_events); + } +#endif +event_happened: +#ifdef DEBUG + consistency_check(w); +#endif + ASSERT(WAIT_OBJECT_0 < i && i < WAIT_OBJECT_0+w->active_events); + if (!erts_atomic_xchg(&ps->sys_io_ready,1)) { + HARDDEBUGF(("SET EventIoReady (%d)",erts_atomic_read(&ps->sys_io_ready))); + SetEvent(ps->event_io_ready); + } else { + HARDDEBUGF(("DONT SET EventIoReady")); + } + + /* + * The main thread wont start working on our arrays untill we're + * stopped, so we can work in peace although the main thread runs + */ + ASSERT(i >= WAIT_OBJECT_0+1); + i -= WAIT_OBJECT_0; + ASSERT(i >= 1); + w->active_events--; + HARDDEBUGF(("i = %d, a,h,t = %d,%d,%d",i, + w->active_events, w->highwater, w->total_events)); +#ifdef HARD_POLL_DEBUG2 + fired[num_fired++] = w->events[i]; +#endif +#ifdef HARD_POLL_DEBUG + poll_debug_fired(w->events[i]); +#endif + if (i < w->active_events) { + HANDLE te = w->events[i]; + EventData* tp = w->evdata[i]; + w->events[i] = w->events[w->active_events]; + w->evdata[i] = w->evdata[w->active_events]; + w->events[w->active_events] = te; + w->evdata[w->active_events] = tp; + } + HARDDEBUGF(("i = %d, a,h,t = %d,%d,%d",i, + w->active_events, w->highwater, w->total_events)); +#ifdef DEBUG + consistency_check(w); +#endif + erts_mtx_unlock(&w->mtx); + break; + } + } +} + +/* + * The actual adding and removing from pollset utilities + */ + +static int set_driver_select(ErtsPollSet ps, HANDLE event, ErtsPollEvents mode) +{ + int i; + int best_waiter = -1; /* The waiter with lowest number of events. */ + int lowest = MAXIMUM_WAIT_OBJECTS; /* Lowest number of events + * in any waiter. + */ + EventData* ev; + Waiter* w; + + /* + * Find the waiter which is least busy. + */ + +#ifdef HARD_POLL_DEBUG + poll_debug_select(event, mode); +#endif + + /* total_events can no longer be read without the lock, it's changed in the waiter */ + for (i = 0; i < ps->num_waiters; i++) { + erts_mtx_lock(&(ps->waiter[i]->mtx)); + if (ps->waiter[i]->total_events < lowest) { + lowest = ps->waiter[i]->total_events; + best_waiter = i; + } + erts_mtx_unlock(&(ps->waiter[i]->mtx)); + } + + /* + * Stop the selected waiter, or start a new waiter if all were busy. + */ + + if (best_waiter >= 0) { + w = ps->waiter[best_waiter]; + STOP_WAITER(ps,w); + erts_mtx_lock(&w->mtx); + } else { + new_waiter(ps); + w = ps->waiter[(ps->num_waiters)-1]; + erts_mtx_lock(&w->mtx); + } + +#ifdef DEBUG + consistency_check(w); +#endif + + /* + * Allocate and initialize an EventData structure. + */ + + ev = w->first_free_evdata; + w->first_free_evdata = ev->next; + ev->event = event; + ev->mode = mode; + ev->next = NULL; + + /* + * At this point, the selected waiter (newly-created or not) is + * standing by. Put the new event into the active part of the array. + */ + + if (w->active_events < w->total_events) { + /* + * Move the first event beyond the active part of the array to + * the very end to make place for the new event. + */ + +#ifdef HARD_POLL_DEBUG + poll_debug_moved(w->events[w->highwater],w->highwater,w->total_events); +#endif + w->events[w->total_events] = w->events[w->highwater]; + w->evdata[w->total_events] = w->evdata[w->highwater]; +#ifdef HARD_POLL_DEBUG + poll_debug_moved(w->events[w->active_events],w->active_events,w->highwater); +#endif + w->events[w->highwater] = w->events[w->active_events]; + w->evdata[w->highwater] = w->evdata[w->active_events]; + + } + w->events[w->active_events] = event; + w->evdata[w->active_events] = ev; + w->active_events++; + w->highwater++; + w->total_events++; + +#ifdef DEBUG + consistency_check(w); +#endif + erts_mtx_unlock(&w->mtx); + START_WAITER(ps,w); + HARDDEBUGF(("add select %d %d %d %d",best_waiter, + w->active_events,w->highwater,w->total_events)); + return mode; +} + + +static int cancel_driver_select(ErtsPollSet ps, HANDLE event) +{ + int i; + + ASSERT(event != INVALID_HANDLE_VALUE); + restart: + for (i = 0; i < ps->num_waiters; i++) { + Waiter* w = ps->waiter[i]; + int j; + + erts_mtx_lock(&w->mtx); +#ifdef DEBUG + consistency_check(w); +#endif + for (j = 0; j < w->total_events; j++) { + if (w->events[j] == event) { + int stopped = 0; + /* + * Free the event's EventData structure. + */ + + if (j < w->active_events) { + HARDDEBUGF(("Stopped in remove select")); + stopped = 1; + erts_mtx_unlock(&w->mtx); + STOP_WAITER(ps,w); + erts_mtx_lock(&w->mtx); + if ( j >= w->active_events || w->events[j] != event) { + /* things happened while unlocked */ + START_WAITER(ps,w); + erts_mtx_unlock(&w->mtx); + goto restart; + } + } +#ifdef HARD_POLL_DEBUG + poll_debug_deselect(w->events[j]); +#endif + remove_event_from_set(w, j); + if (stopped) { + START_WAITER(ps,w); + } + HARDDEBUGF(("removed select %d,%d %d %d %d",i,j, + w->active_events,w->highwater,w->total_events)); + break; + } + } + erts_mtx_unlock(&w->mtx); + } + return 0; +} + +/* + * Interface functions + */ + +void erts_poll_interrupt(ErtsPollSet ps, int set /* bool */) +{ + HARDTRACEF(("In erts_poll_interrupt(%d)",set)); +#ifdef ERTS_SMP + if (set) { + ERTS_POLLSET_SET_INTERRUPTED(ps); + wake_poller(ps); + } + else { + ERTS_POLLSET_UNSET_INTERRUPTED(ps); + } +#endif + HARDTRACEF(("Out erts_poll_interrupt(%d)",set)); +} + +void erts_poll_interrupt_timed(ErtsPollSet ps, + int set /* bool */, + long msec) +{ + HARDTRACEF(("In erts_poll_interrupt_timed(%d,%ld)",set,msec)); +#ifdef ERTS_SMP + if (set) { + if (erts_smp_atomic_read(&ps->timeout) > msec) { + ERTS_POLLSET_SET_INTERRUPTED(ps); + wake_poller(ps); + } + } + else { + ERTS_POLLSET_UNSET_INTERRUPTED(ps); + } +#endif + HARDTRACEF(("Out erts_poll_interrupt_timed")); +} + + +/* + * Windows is special, there is actually only one event type, and + * the only difference between ERTS_POLL_EV_IN and ERTS_POLL_EV_OUT + * is which driver callback will eventually be called. + */ +static ErtsPollEvents do_poll_control(ErtsPollSet ps, + ErtsSysFdType fd, + ErtsPollEvents pe, + int on /* bool */) +{ + HANDLE event = (HANDLE) fd; + ErtsPollEvents mode; + ErtsPollEvents result; + ASSERT(event != INVALID_HANDLE_VALUE); + + if (on) { + if (pe & ERTS_POLL_EV_IN || !(pe & ERTS_POLL_EV_OUT )) { + mode = ERTS_POLL_EV_IN; + } else { + mode = ERTS_POLL_EV_OUT; /* ready output only in this case */ + } + result = set_driver_select(ps, event, mode); + } else { + result = cancel_driver_select(ps, event); + } + return result; +} + +ErtsPollEvents erts_poll_control(ErtsPollSet ps, + ErtsSysFdType fd, + ErtsPollEvents pe, + int on, + int* do_wake) /* In: Wake up polling thread */ + /* Out: Poller is woken */ +{ + ErtsPollEvents result; + HARDTRACEF(("In erts_poll_control(0x%08X, %u, %d)",(unsigned long) fd, (unsigned) pe, on)); + ERTS_POLLSET_LOCK(ps); + result=do_poll_control(ps,fd,pe,on); + ERTS_POLLSET_UNLOCK(ps); + *do_wake = 0; /* Never any need to wake polling threads on windows */ + HARDTRACEF(("Out erts_poll_control -> %u",(unsigned) result)); + return result; +} + +void erts_poll_controlv(ErtsPollSet ps, + ErtsPollControlEntry pcev[], + int len) +{ + int i; + int hshur = 0; + int do_wake = 0; + + HARDTRACEF(("In erts_poll_controlv(%d)",len)); + ERTS_POLLSET_LOCK(ps); + + for (i = 0; i < len; i++) { + pcev[i].events = do_poll_control(ps, + pcev[i].fd, + pcev[i].events, + pcev[i].on); + } + ERTS_POLLSET_LOCK(ps); + HARDTRACEF(("Out erts_poll_controlv")); +} + +int erts_poll_wait(ErtsPollSet ps, + ErtsPollResFd pr[], + int *len, + SysTimeval *utvp) +{ + SysTimeval *tvp = utvp; + SysTimeval itv; + int no_fds; + DWORD timeout; + EventData* ev; + int res = 0; + int num = 0; + int n; + int i; + int break_state; + + HARDTRACEF(("In erts_poll_wait")); + ERTS_POLLSET_LOCK(ps); + + if (!erts_atomic_read(&ps->sys_io_ready) && ps->restore_events) { + HARDDEBUGF(("Restore events: %d",ps->num_waiters)); + ps->restore_events = 0; + for (i = 0; i < ps->num_waiters; ++i) { + Waiter* w = ps->waiter[i]; + erts_mtx_lock(&w->mtx); + HARDDEBUGF(("Maybe reset %d %d %d %d",i, + w->active_events,w->highwater,w->total_events)); + if (w->active_events < w->total_events) { + erts_mtx_unlock(&w->mtx); + STOP_WAITER(ps,w); + HARDDEBUGF(("Need reset %d %d %d %d",i, + w->active_events,w->highwater,w->total_events)); + erts_mtx_lock(&w->mtx); + /* Need reset, just check that it doesn't have got more to tell */ + if (w->highwater != w->active_events) { + HARDDEBUGF(("Oups!")); + /* Oups, got signalled before we took the lock, can't reset */ + if(erts_atomic_read(&ps->sys_io_ready) == 0) { + erl_exit(1,"Internal error: " + "Inconsistent io structures in erl_poll.\n"); + } + START_WAITER(ps,w); + erts_mtx_unlock(&w->mtx); + ps->restore_events = 1; + continue; + } + w->active_events = w->highwater = w->total_events; + START_WAITER(ps,w); + erts_mtx_unlock(&w->mtx); + } else { + erts_mtx_unlock(&w->mtx); + } + } + } + + no_fds = *len; + +#ifdef ERTS_POLL_MAX_RES + if (no_fds >= ERTS_POLL_MAX_RES) + no_fds = ERTS_POLL_MAX_RES; +#endif + + + ResetEvent(ps->event_io_ready); + ERTS_POLLSET_UNSET_POLLER_WOKEN(ps); + +#ifdef ERTS_SMP + if (ERTS_POLLSET_IS_INTERRUPTED(ps)) { + /* Interrupt use zero timeout */ + itv.tv_sec = 0; + itv.tv_usec = 0; + tvp = &itv; + } +#endif + + timeout = tvp->tv_sec * 1000 + tvp->tv_usec / 1000; + /*HARDDEBUGF(("timeout = %ld",(long) timeout));*/ + erts_smp_atomic_set(&ps->timeout, timeout); + + if (timeout > 0 && ! erts_atomic_read(&ps->sys_io_ready) && ! erts_atomic_read(&break_waiter_state)) { + HANDLE harr[2] = {ps->event_io_ready, break_happened_event}; + int num_h = 2; + + HARDDEBUGF(("Start waiting %d [%d]",num_h, (long) timeout)); + ERTS_POLLSET_UNLOCK(ps); + WaitForMultipleObjects(num_h, harr, FALSE, timeout); + ERTS_POLLSET_LOCK(ps); + HARDDEBUGF(("Stop waiting %d [%d]",num_h, (long) timeout)); + } + + ERTS_UNSET_BREAK_REQUESTED; + if(erts_atomic_read(&break_waiter_state)) { + erts_mtx_lock(&break_waiter_lock); + break_state = erts_atomic_read(&break_waiter_state); + erts_atomic_set(&break_waiter_state,0); + ResetEvent(break_happened_event); + erts_mtx_unlock(&break_waiter_lock); + switch (break_state) { + case BREAK_WAITER_GOT_BREAK: + ERTS_SET_BREAK_REQUESTED; + break; + case BREAK_WAITER_GOT_HALT: + erl_exit(0,""); + break; + default: + break; + } + } + + ERTS_POLLSET_SET_POLLER_WOKEN(ps); + + if (!erts_atomic_read(&ps->sys_io_ready)) { + res = EINTR; + HARDDEBUGF(("EINTR!")); + goto done; + } + + erts_atomic_set(&ps->sys_io_ready,0); + + n = ps->num_waiters; + + for (i = 0; i < n; i++) { + Waiter* w = ps->waiter[i]; + int j; + int first; + int last; + erts_mtx_lock(&w->mtx); +#ifdef DEBUG + consistency_check(w); +#endif + + first = w->active_events; + last = w->highwater; + w->highwater = w->active_events; + + for (j = last-1; j >= first; --j) { + if (num >= no_fds) { + w->highwater=j+1; + erts_mtx_unlock(&w->mtx); + /* This might mean we still have data to report, set + back the global flag! */ + erts_atomic_set(&ps->sys_io_ready,1); + HARDDEBUGF(("To many FD's to report!")); + goto done; + } + HARDDEBUGF(("SET! Restore events")); + ps->restore_events = 1; + HARDDEBUGF(("Report %d,%d",i,j)); + pr[num].fd = (ErtsSysFdType) w->events[j]; + pr[num].events = w->evdata[j]->mode; +#ifdef HARD_POLL_DEBUG + poll_debug_reported(w->events[j],w->highwater | (j << 16)); + poll_debug_reported(w->events[j],first | (last << 16)); +#endif + ++num; + } + +#ifdef DEBUG + consistency_check(w); +#endif + erts_mtx_unlock(&w->mtx); + } + done: + erts_smp_atomic_set(&ps->timeout, LONG_MAX); + *len = num; + ERTS_POLLSET_UNLOCK(ps); + HARDTRACEF(("Out erts_poll_wait")); + return res; + +} + +int erts_poll_max_fds(void) +{ + int res = sys_max_files(); + HARDTRACEF(("In/Out erts_poll_max_fds -> %d",res)); + return res; +} + +void erts_poll_info(ErtsPollSet ps, + ErtsPollInfo *pip) +{ + Uint size = 0; + Uint num_events = 0; + int i; + + HARDTRACEF(("In erts_poll_info")); + ERTS_POLLSET_LOCK(ps); + + size += sizeof(struct ErtsPollSet_); + size += sizeof(Waiter *) * ps->allocated_waiters; + for (i = 0; i < ps->num_waiters; ++i) { + Waiter *w = ps->waiter[i]; + if (w != NULL) { + size += sizeof(Waiter); + erts_mtx_lock(&w->mtx); + size += sizeof(EventData) * w->total_events; + num_events += (w->total_events - 1); /* First event is internal */ + erts_mtx_unlock(&w->mtx); + } + } + + pip->primary = "WaitForMultipleObjects"; + + pip->fallback = NULL; + + pip->kernel_poll = NULL; + + pip->memory_size = size; + + pip->poll_set_size = num_events; + + pip->fallback_poll_set_size = 0; + + pip->lazy_updates = 0; + + pip->pending_updates = 0; + + pip->batch_updates = 0; + + pip->concurrent_updates = 0; + ERTS_POLLSET_UNLOCK(ps); + + pip->max_fds = erts_poll_max_fds(); + HARDTRACEF(("Out erts_poll_info")); + +} + +ErtsPollSet erts_poll_create_pollset(void) +{ + ErtsPollSet ps = SEL_ALLOC(ERTS_ALC_T_POLLSET, + sizeof(struct ErtsPollSet_)); + HARDTRACEF(("In erts_poll_create_pollset")); + + ps->num_waiters = 0; + ps->allocated_waiters = 64; + ps->waiter = SEL_ALLOC(ERTS_ALC_T_WAITER_OBJ, + sizeof(Waiter *)*ps->allocated_waiters); + InitializeCriticalSection(&(ps->standby_crit)); + ps->standby_wait_counter = 0; + ps->event_io_ready = CreateManualEvent(FALSE); + ps->standby_wait_event = CreateManualEvent(FALSE); + erts_atomic_init(&ps->sys_io_ready,0); + ps->restore_events = 0; + +#ifdef ERTS_SMP + erts_smp_atomic_init(&ps->woken, 0); + erts_smp_mtx_init(&ps->mtx, "pollset"); + erts_smp_atomic_init(&ps->interrupt, 0); +#endif + erts_smp_atomic_init(&ps->timeout, LONG_MAX); + + HARDTRACEF(("Out erts_poll_create_pollset")); + return ps; +} + +void erts_poll_destroy_pollset(ErtsPollSet ps) +{ + int i; + HARDTRACEF(("In erts_poll_destroy_pollset")); + ERTS_POLLSET_LOCK(ps); + STOP_WAITERS(ps); + for (i=0;i<ps->num_waiters;++i) { + Waiter *w = ps->waiter[i]; + void *dummy; + erts_tid_t t = w->this; + /* Assume we're alone, no locking here... */ + w->active_events = w->total_events = w->highwater = 0; + START_WAITER(ps,w); + erts_thr_join(t,&dummy); + CloseHandle(w->go_ahead); + CloseHandle(w->events[0]); + erts_mtx_destroy(&w->mtx); + SEL_FREE(ERTS_ALC_T_WAITER_OBJ, (void *) w); + } + SEL_FREE(ERTS_ALC_T_WAITER_OBJ,ps->waiter); + CloseHandle(ps->event_io_ready); + CloseHandle(ps->standby_wait_event); + ERTS_POLLSET_UNLOCK(ps); +#ifdef ERTS_SMP + erts_smp_mtx_destroy(&ps->mtx); +#endif + SEL_FREE(ERTS_ALC_T_POLLSET, (void *) ps); + HARDTRACEF(("Out erts_poll_destroy_pollset")); +} + +/* + * Actually mostly initializes the friend module sys_interrupt... + */ +void erts_poll_init(void) +{ + erts_tid_t thread; + +#ifdef HARD_POLL_DEBUG + poll_debug_init(); +#endif + + HARDTRACEF(("In erts_poll_init")); + erts_sys_break_event = CreateManualEvent(FALSE); + + erts_mtx_init(&break_waiter_lock,"break_waiter_lock"); + break_happened_event = CreateManualEvent(FALSE); + erts_atomic_init(&break_waiter_state, 0); + + erts_thr_create(&thread, &break_waiter, NULL, NULL); + ERTS_UNSET_BREAK_REQUESTED; + HARDTRACEF(("Out erts_poll_init")); +} + +/* + * Non windows friendly interface, not used when fd's are not continous + */ +void erts_poll_get_selected_events(ErtsPollSet ps, + ErtsPollEvents ev[], + int len) +{ + int i; + HARDTRACEF(("In erts_poll_get_selected_events")); + for (i = 0; i < len; ++i) + ev[i] = 0; + HARDTRACEF(("Out erts_poll_get_selected_events")); +} |