From dcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Sun, 9 Oct 2011 00:03:14 +0200 Subject: Use generic lock-free queue for async threads Queues used for communication between async threads and scheduler threads have been replaced with lock-free queues. Drivers using the driver_async functionality are not automatically locked to the system anymore, and can be unloaded as any dynamically linked in driver. Scheduling of ready async jobs is now also interleaved in between other jobs. Previously all ready async jobs was performed at once. --- erts/emulator/sys/unix/sys.c | 171 ++++---------------------------------- erts/emulator/sys/vxworks/sys.c | 6 ++ erts/emulator/sys/win32/sys.c | 137 ------------------------------ erts/emulator/sys/win32/sys_env.c | 14 +++- 4 files changed, 32 insertions(+), 296 deletions(-) (limited to 'erts/emulator/sys') diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c index 8e8d4cce61..7f851e6007 100644 --- a/erts/emulator/sys/unix/sys.c +++ b/erts/emulator/sys/unix/sys.c @@ -128,7 +128,6 @@ static ErtsSysReportExit *report_exit_list; static ErtsSysReportExit *report_exit_transit_list; #endif -extern int check_async_ready(void); extern int driver_interrupt(int, int); extern void do_break(void); @@ -1125,31 +1124,6 @@ struct erl_drv_entry vanilla_driver_entry = { stop_select }; -#if defined(USE_THREADS) && !defined(ERTS_SMP) -static int async_drv_init(void); -static ErlDrvData async_drv_start(ErlDrvPort, char*, SysDriverOpts*); -static void async_drv_stop(ErlDrvData); -static void async_drv_input(ErlDrvData, ErlDrvEvent); - -/* INTERNAL use only */ - -struct erl_drv_entry async_driver_entry = { - async_drv_init, - async_drv_start, - async_drv_stop, - NULL, - async_drv_input, - NULL, - "async", - NULL, - NULL, - NULL, - NULL, - NULL, - NULL -}; -#endif - /* Handle SIGCHLD signals. */ #if (defined(SIG_SIGSET) || defined(SIG_SIGNAL)) static RETSIGTYPE onchld(void) @@ -2334,87 +2308,6 @@ static void stop_select(ErlDrvEvent fd, void* _) close((int)fd); } -/* -** Async opertation support -*/ -#if defined(USE_THREADS) && !defined(ERTS_SMP) -static void -sys_async_ready_failed(int fd, int r, int err) -{ - char buf[120]; - sprintf(buf, "sys_async_ready(): Fatal error: fd=%d, r=%d, errno=%d\n", - fd, r, err); - erts_silence_warn_unused_result(write(2, buf, strlen(buf))); - abort(); -} - -/* called from threads !! */ -void sys_async_ready(int fd) -{ - int r; - while (1) { - r = write(fd, "0", 1); /* signal main thread fd MUST be async_fd[1] */ - if (r == 1) { - DEBUGF(("sys_async_ready(): r = 1\r\n")); - break; - } - if (r < 0 && errno == EINTR) { - DEBUGF(("sys_async_ready(): r = %d\r\n", r)); - continue; - } - sys_async_ready_failed(fd, r, errno); - } -} - -static int async_drv_init(void) -{ - async_fd[0] = -1; - async_fd[1] = -1; - return 0; -} - -static ErlDrvData async_drv_start(ErlDrvPort port_num, - char* name, SysDriverOpts* opts) -{ - if (async_fd[0] != -1) - return ERL_DRV_ERROR_GENERAL; - if (pipe(async_fd) < 0) - return ERL_DRV_ERROR_GENERAL; - - DEBUGF(("async_drv_start: %d\r\n", port_num)); - - SET_NONBLOCKING(async_fd[0]); - driver_select(port_num, async_fd[0], ERL_DRV_READ, 1); - - if (init_async(async_fd[1]) < 0) - return ERL_DRV_ERROR_GENERAL; - return (ErlDrvData)port_num; -} - -static void async_drv_stop(ErlDrvData e) -{ - int port_num = (int)(long)e; - - DEBUGF(("async_drv_stop: %d\r\n", port_num)); - - exit_async(); - - driver_select(port_num, async_fd[0], ERL_DRV_READ, 0); - - close(async_fd[0]); - close(async_fd[1]); - async_fd[0] = async_fd[1] = -1; -} - - -static void async_drv_input(ErlDrvData e, ErlDrvEvent fd) -{ - char *buf[32]; - DEBUGF(("async_drv_input\r\n")); - while (read((int) fd, (void *) buf, 32) > 0); /* fd MUST be async_fd[0] */ - check_async_ready(); /* invoke all async_ready */ -} -#endif void erts_do_break_handling(void) { @@ -2488,12 +2381,10 @@ erts_sys_putenv(char *buffer, int sep_ix) } int -erts_sys_getenv(char *key, char *value, size_t *size) +erts_sys_getenv__(char *key, char *value, size_t *size) { - char *orig_value; int res; - erts_smp_rwmtx_rlock(&environ_rwmtx); - orig_value = getenv(key); + char *orig_value = getenv(key); if (!orig_value) res = -1; else { @@ -2508,6 +2399,15 @@ erts_sys_getenv(char *key, char *value, size_t *size) res = 0; } } + return res; +} + +int +erts_sys_getenv(char *key, char *value, size_t *size) +{ + int res; + erts_smp_rwmtx_rlock(&environ_rwmtx); + res = erts_sys_getenv__(key, value, size); erts_smp_rwmtx_runlock(&environ_rwmtx); return res; } @@ -2519,31 +2419,6 @@ sys_init_io(void) erts_alloc(ERTS_ALC_T_FD_TAB, max_files * sizeof(struct fd_data)); erts_smp_atomic_add_nob(&sys_misc_mem_sz, max_files * sizeof(struct fd_data)); - -#ifdef USE_THREADS -#ifdef ERTS_SMP - if (init_async(-1) < 0) - erl_exit(1, "Failed to initialize async-threads\n"); -#else - { - /* This is speical stuff, starting a driver from the - * system routines, but is a nice way of handling stuff - * the erlang way - */ - SysDriverOpts dopts; - int ret; - - sys_memset((void*)&dopts, 0, sizeof(SysDriverOpts)); - add_driver_entry(&async_driver_entry); - ret = erts_open_driver(NULL, NIL, "async", &dopts, NULL); - DEBUGF(("open_driver = %d\n", ret)); - if (ret < 0) - erl_exit(1, "Failed to open async driver\n"); - erts_port[ret].status |= ERTS_PORT_SFLG_IMMORTAL; - } -#endif -#endif - } #if (0) /* unused? */ @@ -2770,15 +2645,7 @@ initiate_report_exit_status(ErtsSysReportExit *rep, int status) rep->next = report_exit_transit_list; rep->status = status; report_exit_transit_list = rep; - /* - * We need the scheduler thread to call check_children(). - * If the scheduler thread is sleeping in a poll with a - * timeout, we need to wake the scheduler thread. We use the - * functionality of the async driver to do this, instead of - * implementing yet another driver doing the same thing. A - * little bit ugly, but it works... - */ - sys_async_ready(async_fd[1]); + erts_sys_schedule_interrupt(1); } static int check_children(void) @@ -2865,19 +2732,11 @@ erl_sys_schedule(int runnable) { #ifdef ERTS_SMP ERTS_CHK_IO(!runnable); - ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); #else - if (runnable) { - ERTS_CHK_IO(0); /* Poll for I/O */ - check_async_ready(); /* Check async completions */ - } else { - int wait_for_io = !check_async_ready(); - if (wait_for_io) - wait_for_io = !check_children(); - ERTS_CHK_IO(wait_for_io); - } - (void) check_children(); + ERTS_CHK_IO(runnable ? 0 : !check_children()); #endif + ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); + (void) check_children(); } diff --git a/erts/emulator/sys/vxworks/sys.c b/erts/emulator/sys/vxworks/sys.c index 08c4f3f4e5..fc7e6cec08 100644 --- a/erts/emulator/sys/vxworks/sys.c +++ b/erts/emulator/sys/vxworks/sys.c @@ -1520,6 +1520,12 @@ erts_sys_getenv(char *key, char *value, size_t *size) return res; } +int +erts_sys_getenv__(char *key, char *value, size_t *size) +{ + return erts_sys_getenv(key, value, size); +} + void sys_init_io(void) { diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c index ace1e1fca0..02d16b83a2 100644 --- a/erts/emulator/sys/win32/sys.c +++ b/erts/emulator/sys/win32/sys.c @@ -566,51 +566,6 @@ struct erl_drv_entry vanilla_driver_entry = { stop_select }; -#if defined(USE_THREADS) && !defined(ERTS_SMP) - -static int async_drv_init(void); -static ErlDrvData async_drv_start(ErlDrvPort, char*, SysDriverOpts*); -static void async_drv_stop(ErlDrvData); -static void async_drv_input(ErlDrvData, ErlDrvEvent); - -/* INTERNAL use only */ - -void null_output(ErlDrvData drv_data, char* buf, int len) -{ -} - -void null_ready_output(ErlDrvData drv_data, ErlDrvEvent event) -{ -} - -struct erl_drv_entry async_driver_entry = { - async_drv_init, - async_drv_start, - async_drv_stop, - null_output, - async_drv_input, - null_ready_output, - "async", - NULL, /* finish */ - NULL, /* handle */ - NULL, /* control */ - NULL, /* timeout */ - NULL, /* outputv */ - NULL, /* ready_async */ - NULL, /* flush */ - NULL, /* call */ - NULL, /* event */ - ERL_DRV_EXTENDED_MARKER, - ERL_DRV_EXTENDED_MAJOR_VERSION, - ERL_DRV_EXTENDED_MINOR_VERSION, - 0, /* ERL_DRV_FLAGs */ - NULL, - NULL, /* process_exit */ - stop_select -}; - -#endif - /* * Initialises a DriverData structure. * @@ -2825,30 +2780,6 @@ sys_init_io(void) We estimate the number to twice the amount of ports. We really dont know on windows, do we? */ max_files = 2*erts_max_ports; - -#ifdef USE_THREADS -#ifdef ERTS_SMP - if (init_async(-1) < 0) - erl_exit(1, "Failed to initialize async-threads\n"); -#else - { - /* This is special stuff, starting a driver from the - * system routines, but is a nice way of handling stuff - * the erlang way - */ - SysDriverOpts dopts; - int ret; - - sys_memset((void*)&dopts, 0, sizeof(SysDriverOpts)); - add_driver_entry(&async_driver_entry); - ret = erts_open_driver(NULL, NIL, "async", &dopts, NULL); - DEBUGF(("open_driver = %d\n", ret)); - if (ret < 0) - erl_exit(1, "Failed to open async driver\n"); - erts_port[ret].status |= ERTS_PORT_SFLG_IMMORTAL; - } -#endif -#endif } #ifdef ERTS_SMP @@ -3382,75 +3313,7 @@ erts_sys_schedule_interrupt_timed(int set, long msec) void erl_sys_schedule(int runnable) { -#ifdef ERTS_SMP erts_check_io(!runnable); ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); -#else - if (runnable) { - erts_check_io(0); /* Poll for I/O */ - check_async_ready(); /* Check async completions */ - } else { - erts_check_io(check_async_ready() ? 0 : 1); - } -#endif -} - -#if defined(USE_THREADS) && !defined(ERTS_SMP) -/* - * Async operation support. - */ - -static ErlDrvEvent async_drv_event; - -void -sys_async_ready(int fd) -{ - SetEvent((HANDLE)async_drv_event); } -static int -async_drv_init(void) -{ - async_drv_event = (ErlDrvEvent) NULL; - return 0; -} - -static ErlDrvData -async_drv_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts) -{ - if (async_drv_event != (ErlDrvEvent) NULL) { - return ERL_DRV_ERROR_GENERAL; - } - if ((async_drv_event = (ErlDrvEvent)CreateAutoEvent(FALSE)) == (ErlDrvEvent) NULL) { - return ERL_DRV_ERROR_GENERAL; - } - - driver_select(port_num, async_drv_event, ERL_DRV_READ|ERL_DRV_USE, 1); - if (init_async(async_drv_event) < 0) { - return ERL_DRV_ERROR_GENERAL; - } - return (ErlDrvData)port_num; -} - -static void -async_drv_stop(ErlDrvData port_num) -{ - exit_async(); - driver_select((ErlDrvPort)port_num, async_drv_event, ERL_DRV_READ|ERL_DRV_USE, 0); - /*CloseHandle((HANDLE)async_drv_event);*/ - async_drv_event = (ErlDrvEvent) NULL; -} - - -static void -async_drv_input(ErlDrvData port_num, ErlDrvEvent e) -{ - check_async_ready(); - - /* - * Our event is auto-resetting. - */ -} - -#endif - diff --git a/erts/emulator/sys/win32/sys_env.c b/erts/emulator/sys/win32/sys_env.c index 02c8433a10..7acc7f07ee 100644 --- a/erts/emulator/sys/win32/sys_env.c +++ b/erts/emulator/sys/win32/sys_env.c @@ -55,19 +55,17 @@ erts_sys_putenv(char *key_value, int sep_ix) } int -erts_sys_getenv(char *key, char *value, size_t *size) +erts_sys_getenv__(char *key, char *value, size_t *size) { size_t req_size = 0; int res = 0; DWORD new_size; - erts_smp_rwmtx_rlock(&environ_rwmtx); SetLastError(0); new_size = GetEnvironmentVariable((LPCTSTR) key, (LPTSTR) value, (DWORD) *size); res = !new_size && GetLastError() == ERROR_ENVVAR_NOT_FOUND ? -1 : 0; - erts_smp_rwmtx_runlock(&environ_rwmtx); if (res < 0) return res; res = new_size > *size ? 1 : 0; @@ -75,6 +73,16 @@ erts_sys_getenv(char *key, char *value, size_t *size) return res; } +int +erts_sys_getenv(char *key, char *value, size_t *size) +{ + int res; + erts_smp_rwmtx_rlock(&environ_rwmtx); + res = erts_sys_getenv__(key, value, size); + erts_smp_rwmtx_runlock(&environ_rwmtx); + return res; +} + struct win32_getenv_state { char *env; char *next; -- cgit v1.2.3