diff options
author | Rickard Green <[email protected]> | 2011-10-09 00:03:14 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2011-11-13 20:40:58 +0100 |
commit | dcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6 (patch) | |
tree | 36335364e0e2ece36709ae73bf57f1d1d1b4b211 /erts/emulator/test | |
parent | 933790021e5fa95e4e6242e3f2eb2fcf64666a57 (diff) | |
download | otp-dcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6.tar.gz otp-dcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6.tar.bz2 otp-dcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6.zip |
Use generic lock-free queue for async threads
Queues used for communication between async threads and scheduler threads
have been replaced with lock-free queues.
Drivers using the driver_async functionality are not automatically locked
to the system anymore, and can be unloaded as any dynamically linked in
driver.
Scheduling of ready async jobs is now also interleaved in between other
jobs. Previously all ready async jobs was performed at once.
Diffstat (limited to 'erts/emulator/test')
-rw-r--r-- | erts/emulator/test/driver_SUITE.erl | 77 | ||||
-rw-r--r-- | erts/emulator/test/driver_SUITE_data/Makefile.src | 3 | ||||
-rw-r--r-- | erts/emulator/test/driver_SUITE_data/async_blast_drv.c | 124 |
3 files changed, 193 insertions, 11 deletions
diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl index bcb0257ed1..c07dbc5871 100644 --- a/erts/emulator/test/driver_SUITE.erl +++ b/erts/emulator/test/driver_SUITE.erl @@ -76,7 +76,8 @@ driver_select_use/1, thread_mseg_alloc_cache_clean/1, otp_9302/1, - thr_free_drv/1]). + thr_free_drv/1, + async_blast/1]). -export([bin_prefix/2]). @@ -145,7 +146,8 @@ all() -> smp_select, driver_select_use, thread_mseg_alloc_cache_clean, otp_9302, - thr_free_drv]. + thr_free_drv, + async_blast]. groups() -> [{timer, [], @@ -1911,17 +1913,30 @@ otp_9302(Config) when is_list(Config) -> ?line port_command(Port, ""), ?line {msg, block} = get_port_msg(Port, infinity), ?line {msg, job} = get_port_msg(Port, infinity), - ?line case erlang:system_info(thread_pool_size) of - 0 -> - {msg, cancel} = get_port_msg(Port, infinity); - _ -> - ok - end, - ?line {msg, job} = get_port_msg(Port, infinity), + ?line C = case erlang:system_info(thread_pool_size) of + 0 -> + ?line {msg, cancel} = get_port_msg(Port, infinity), + ?line {msg, job} = get_port_msg(Port, infinity), + ?line false; + _ -> + case get_port_msg(Port, infinity) of + {msg, cancel} -> %% Cancel always fail in Rel >= 15 + ?line {msg, job} = get_port_msg(Port, infinity), + ?line false; + {msg, job} -> + ?line ok, + ?line true + end + end, ?line {msg, end_of_jobs} = get_port_msg(Port, infinity), ?line no_msg = get_port_msg(Port, 2000), ?line port_close(Port), - ?line ok. + ?line case C of + true -> + ?line {comment, "Async job cancelled"}; + false -> + ?line {comment, "Async job not cancelled"} + end. thr_free_drv(Config) when is_list(Config) -> ?line Path = ?config(data_dir, Config), @@ -1954,6 +1969,48 @@ thr_free_drv_control(Port, N) -> % io:format("N=~p, SID=~p", [N, erlang:system_info(scheduler_id)]), thr_free_drv_control(Port, N+1) end. + +async_blast(Config) when is_list(Config) -> + ?line Path = ?config(data_dir, Config), + ?line erl_ddll:start(), + ?line ok = load_driver(Path, async_blast_drv), + ?line SchedOnln = erlang:system_info(schedulers_online), + ?line MemBefore = driver_alloc_size(), + ?line Start = os:timestamp(), + ?line Blast = fun () -> + Port = open_port({spawn, async_blast_drv}, []), + true = is_port(Port), + port_command(Port, ""), + receive + {Port, done} -> + ok + end, + port_close(Port) + end, + ?line Ps = lists:map(fun (N) -> + spawn_opt(Blast, + [{scheduler, + (N rem SchedOnln)+ 1}, + monitor]) + end, + lists:seq(1, 100)), + ?line MemMid = driver_alloc_size(), + ?line lists:foreach(fun ({Pid, Mon}) -> + receive + {'DOWN',Mon,process,Pid,_} -> ok + end + end, Ps), + ?line End = os:timestamp(), + ?line MemAfter = driver_alloc_size(), + ?line io:format("MemBefore=~p, MemMid=~p, MemAfter=~p~n", + [MemBefore, MemMid, MemAfter]), + ?line AsyncBlastTime = timer:now_diff(End,Start)/1000000, + ?line io:format("AsyncBlastTime=~p~n", [AsyncBlastTime]), + ?line MemBefore = MemAfter, + ?line erlang:display({async_blast_time, AsyncBlastTime}), + ?line ok. + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Utilities diff --git a/erts/emulator/test/driver_SUITE_data/Makefile.src b/erts/emulator/test/driver_SUITE_data/Makefile.src index 62ab5169c0..dd48f6a0f7 100644 --- a/erts/emulator/test/driver_SUITE_data/Makefile.src +++ b/erts/emulator/test/driver_SUITE_data/Makefile.src @@ -13,7 +13,8 @@ MISC_DRVS = outputv_drv@dll@ \ missing_callback_drv@dll@ \ thr_alloc_drv@dll@ \ otp_9302_drv@dll@ \ - thr_free_drv@dll@ + thr_free_drv@dll@ \ + async_blast_drv@dll@ SYS_INFO_DRVS = sys_info_1_0_drv@dll@ \ sys_info_1_1_drv@dll@ \ diff --git a/erts/emulator/test/driver_SUITE_data/async_blast_drv.c b/erts/emulator/test/driver_SUITE_data/async_blast_drv.c new file mode 100644 index 0000000000..3821f7e3dc --- /dev/null +++ b/erts/emulator/test/driver_SUITE_data/async_blast_drv.c @@ -0,0 +1,124 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +#include "erl_driver.h" + +#define NO_ASYNC_JOBS 10000 + +static void stop(ErlDrvData drv_data); +static ErlDrvData start(ErlDrvPort port, + char *command); +static void output(ErlDrvData drv_data, + char *buf, int len); +static void ready_async(ErlDrvData drv_data, + ErlDrvThreadData thread_data); + +static ErlDrvEntry async_blast_drv_entry = { + NULL /* init */, + start, + stop, + output, + NULL /* ready_input */, + NULL /* ready_output */, + "async_blast_drv", + NULL /* finish */, + NULL /* handle */, + NULL /* control */, + NULL /* timeout */, + NULL /* outputv */, + ready_async, + NULL /* flush */, + NULL /* call */, + NULL /* event */, + ERL_DRV_EXTENDED_MARKER, + ERL_DRV_EXTENDED_MAJOR_VERSION, + ERL_DRV_EXTENDED_MINOR_VERSION, + ERL_DRV_FLAG_USE_PORT_LOCKING, + NULL /* handle2 */, + NULL /* handle_monitor */ +}; + +typedef struct { + ErlDrvPort port; + ErlDrvTermData caller; + int counter; +} async_blast_data_t; + + +DRIVER_INIT(async_blast_drv) +{ + return &async_blast_drv_entry; +} + +static void stop(ErlDrvData drv_data) +{ + driver_free((void *) drv_data); +} + +static ErlDrvData start(ErlDrvPort port, + char *command) +{ + async_blast_data_t *abd; + + abd = driver_alloc(sizeof(async_blast_data_t)); + if (!abd) + return ERL_DRV_ERROR_GENERAL; + + abd->port = port; + abd->counter = 0; + return (ErlDrvData) abd; +} + +static void async_invoke(void *data) +{ + +} +#include <stdio.h> + +static void ready_async(ErlDrvData drv_data, + ErlDrvThreadData thread_data) +{ + async_blast_data_t *abd = (async_blast_data_t *) drv_data; + if (--abd->counter == 0) { + ErlDrvTermData spec[] = { + ERL_DRV_PORT, driver_mk_port(abd->port), + ERL_DRV_ATOM, driver_mk_atom("done"), + ERL_DRV_TUPLE, 2 + }; + driver_send_term(abd->port, abd->caller, + spec, sizeof(spec)/sizeof(spec[0])); + } +} + +static void output(ErlDrvData drv_data, + char *buf, int len) +{ + async_blast_data_t *abd = (async_blast_data_t *) drv_data; + if (abd->counter == 0) { + int i; + abd->caller = driver_caller(abd->port); + abd->counter = NO_ASYNC_JOBS; + for (i = 0; i < NO_ASYNC_JOBS; i++) { + if (0 > driver_async(abd->port, NULL, async_invoke, NULL, NULL)) { + driver_failure_atom(abd->port, "driver_async_failed"); + break; + } + } + } +} |