From 6d1f1d43aa0a9e0a2cb275ef760339c49518fc69 Mon Sep 17 00:00:00 2001 From: Patrik Nyblom Date: Fri, 16 Aug 2013 17:14:25 +0200 Subject: Create better distribution of files over async threads The actual port id is used to create a key from the pointer value which is the ErlDrvPort. To do this a new driver api function driver_async_port_key is added and the driver API minor version is updated. The documentation is updated and the faulty description of how to spread ports over async threads is updated to use the new API. Testcase also added. --- erts/doc/src/erl_driver.xml | 24 +++++++- erts/emulator/beam/erl_async.c | 14 +++++ erts/emulator/beam/erl_driver.h | 4 +- erts/emulator/drivers/common/efile_drv.c | 3 +- erts/emulator/sys/win32/erl_win_dyn_driver.h | 4 ++ erts/emulator/test/efile_SUITE.erl | 88 ++++++++++++++++++++++++++-- 6 files changed, 127 insertions(+), 10 deletions(-) (limited to 'erts') diff --git a/erts/doc/src/erl_driver.xml b/erts/doc/src/erl_driver.xml index f52d973709..540390e1b1 100644 --- a/erts/doc/src/erl_driver.xml +++ b/erts/doc/src/erl_driver.xml @@ -1981,7 +1981,7 @@ ERL_DRV_EXT2TERM char *buf, ErlDrvUInt len thread, the following call can be used:

@@ -2021,6 +2021,24 @@ ERL_DRV_EXT2TERM char *buf, ErlDrvUInt len + + unsigned intdriver_async_port_key (ErlDrvPort port) + Calculate an async key from an ErlDrvPort + + +

This function calculates a key for later use in driver_async(). The keys are + evenly distributed so that a fair mapping between port id's + and async thread id's is achieved.

+ +

Before OTP-R16, the actual port id could be used as a key + with proper casting, but after the rewrite of the port + subsystem, this is no longer the case. With this function, you + can achieve the same distribution based on port id's as before + OTP-R16.

+
+
+
intdriver_async_cancel(long id) Cancel an asynchronous call @@ -2033,10 +2051,10 @@ ERL_DRV_EXT2TERM char *buf, ErlDrvUInt len The user had to implement synchronization of cancellation anyway. It also unnecessarily complicated the implementation. Therefore, as of OTP-R15B driver_async_cancel() is deprecated, and - scheduled for removal in OTP-R16. It will currently always fail, + scheduled for removal in OTP-R17. It will currently always fail, and return 0.

driver_async_cancel() is deprecated and will - be removed in the OTP-R16 release.

+ be removed in the OTP-R17 release.

diff --git a/erts/emulator/beam/erl_async.c b/erts/emulator/beam/erl_async.c index 054d1a48f6..e6d72f569b 100644 --- a/erts/emulator/beam/erl_async.c +++ b/erts/emulator/beam/erl_async.c @@ -582,6 +582,20 @@ int erts_async_ready_clean(void *varq, void *val) #endif +/* +** Generate a fair async key prom an ErlDrvPort +** The port data gives a fair distribution grom port pointer +** to unsigned integer - to be used in key for driver_async below. +*/ +unsigned int driver_async_port_key(ErlDrvPort port) +{ + ErlDrvTermData td = driver_mk_port(port); + if (td == (ErlDrvTermData) NIL) { + return 0; + } + return (unsigned int) (UWord) internal_port_data(td); +} + /* ** Schedule async_invoke on a worker thread ** NOTE will be syncrounous when threads are unsupported diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h index e280563de1..1ab6e17f56 100644 --- a/erts/emulator/beam/erl_driver.h +++ b/erts/emulator/beam/erl_driver.h @@ -133,7 +133,7 @@ typedef struct { #define ERL_DRV_EXTENDED_MARKER (0xfeeeeeed) #define ERL_DRV_EXTENDED_MAJOR_VERSION 2 -#define ERL_DRV_EXTENDED_MINOR_VERSION 1 +#define ERL_DRV_EXTENDED_MINOR_VERSION 2 /* * The emulator will refuse to load a driver with different major @@ -638,6 +638,8 @@ EXTERN int erl_drv_send_term(ErlDrvTermData port, int len); /* Async IO functions */ +EXTERN unsigned int driver_async_port_key(ErlDrvPort port); + EXTERN long driver_async(ErlDrvPort ix, unsigned int* key, void (*async_invoke)(void*), diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 779ecd1bd2..c997fe1bf9 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -772,6 +772,7 @@ file_init(void) return 0; } + /********************************************************************* * Driver entry point -> start */ @@ -788,7 +789,7 @@ file_start(ErlDrvPort port, char* command) } desc->fd = FILE_FD_INVALID; desc->port = port; - desc->key = (unsigned int) (UWord) port; + desc->key = driver_async_port_key(port); desc->flags = 0; desc->invoke = NULL; desc->d = NULL; diff --git a/erts/emulator/sys/win32/erl_win_dyn_driver.h b/erts/emulator/sys/win32/erl_win_dyn_driver.h index a7c53c904d..b9a9838a36 100644 --- a/erts/emulator/sys/win32/erl_win_dyn_driver.h +++ b/erts/emulator/sys/win32/erl_win_dyn_driver.h @@ -80,6 +80,7 @@ WDD_TYPEDEF(int, erl_drv_output_term, (ErlDrvTermData, ErlDrvTermData*, int)); WDD_TYPEDEF(int, driver_output_term, (ErlDrvPort, ErlDrvTermData*, int)); WDD_TYPEDEF(int, erl_drv_send_term, (ErlDrvTermData, ErlDrvTermData, ErlDrvTermData*, int)); WDD_TYPEDEF(int, driver_send_term, (ErlDrvPort, ErlDrvTermData, ErlDrvTermData*, int)); +WDD_TYPEDEF(unsigned int, driver_async_port_key, (ErlDrvPort)); WDD_TYPEDEF(long, driver_async, (ErlDrvPort,unsigned int*,void (*)(void*),void*,void (*)(void*))); WDD_TYPEDEF(int, driver_async_cancel, (unsigned int)); WDD_TYPEDEF(int, driver_lock_driver, (ErlDrvPort)); @@ -197,6 +198,7 @@ typedef struct { WDD_FTYPE(driver_output_term) *driver_output_term; WDD_FTYPE(erl_drv_send_term) *erl_drv_send_term; WDD_FTYPE(driver_send_term) *driver_send_term; + WDD_FTYPE(driver_async_port_key) *driver_async_port_key; WDD_FTYPE(driver_async) *driver_async; WDD_FTYPE(driver_async_cancel) *driver_async_cancel; WDD_FTYPE(driver_lock_driver) *driver_lock_driver; @@ -308,6 +310,7 @@ extern TWinDynDriverCallbacks WinDynDriverCallbacks; #define driver_output_term (WinDynDriverCallbacks.driver_output_term) #define erl_drv_send_term (WinDynDriverCallbacks.erl_drv_send_term) #define driver_send_term (WinDynDriverCallbacks.driver_send_term) +#define driver_async_port_key (WinDynDriverCallbacks.driver_async_port_key) #define driver_async (WinDynDriverCallbacks.driver_async) #define driver_async_cancel (WinDynDriverCallbacks.driver_async_cancel) #define driver_lock_driver (WinDynDriverCallbacks.driver_lock_driver) @@ -443,6 +446,7 @@ do { \ ((W).driver_output_term) = driver_output_term; \ ((W).erl_drv_send_term) = erl_drv_send_term; \ ((W).driver_send_term) = driver_send_term; \ +((W).driver_async_port_key) = driver_async_port_key; \ ((W).driver_async) = driver_async; \ ((W).driver_async_cancel) = driver_async_cancel; \ ((W).driver_lock_driver) = driver_lock_driver; \ diff --git a/erts/emulator/test/efile_SUITE.erl b/erts/emulator/test/efile_SUITE.erl index 65367eab98..f79bb761d1 100644 --- a/erts/emulator/test/efile_SUITE.erl +++ b/erts/emulator/test/efile_SUITE.erl @@ -19,16 +19,16 @@ -module(efile_SUITE). -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2]). --export([iter_max_files/1]). +-export([iter_max_files/1, async_dist/1]). --export([do_iter_max_files/2]). +-export([do_iter_max_files/2, do_async_dist/1]). -include_lib("test_server/include/test_server.hrl"). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> - [iter_max_files]. + [iter_max_files, async_dist]. groups() -> []. @@ -45,6 +45,84 @@ init_per_group(_GroupName, Config) -> end_per_group(_GroupName, Config) -> Config. +do_async_dist(Dir) -> + X = 100, + AT = erlang:system_info(thread_pool_size), + Keys = file_keys(Dir,AT*X,[],[]), + Tab = ets:new(x,[ordered_set]), + [ ets:insert(Tab,{N,0}) || N <- lists:seq(0,AT-1) ], + [ ets:update_counter(Tab,(N rem AT),1) || N <- Keys ], + Res = [ V || {_,V} <- ets:tab2list(Tab) ], + ets:delete(Tab), + {Res, sdev(Res)/X}. + +sdev(List) -> + Len = length(List), + Mean = lists:sum(List)/Len, + math:sqrt(lists:sum([ (X - Mean) * (X - Mean) || X <- List ]) / Len). + +file_keys(_,0,FdList,FnList) -> + [ file:close(FD) || FD <- FdList ], + [ file:delete(FN) || FN <- FnList ], + []; +file_keys(Dir,Num,FdList,FnList) -> + Name = "dummy"++integer_to_list(Num), + FN = filename:join([Dir,Name]), + case file:open(FN,[write,raw]) of + {ok,FD} -> + {file_descriptor,prim_file,{Port,_}} = FD, + <> = + iolist_to_binary(erlang:port_control(Port,$K,[])), + [X | file_keys(Dir,Num-1,[FD|FdList],[FN|FnList])]; + {error,_} -> + % Try freeing up FD's if there are any + case FdList of + [] -> + exit({cannot_open_file,FN}); + _ -> + [ file:close(FD) || FD <- FdList ], + [ file:delete(F) || F <- FnList ], + file_keys(Dir,Num,[],[]) + end + end. + +async_dist(doc) -> + "Check that the distribution of files over async threads is fair"; +async_dist(Config) when is_list(Config) -> + DataDir = ?config(data_dir,Config), + TestFile = filename:join(DataDir, "existing_file"), + Dir = filename:dirname(code:which(?MODULE)), + AsyncSizes = [7,10,100,255,256,64,63,65], + Max = 0.5, + + lists:foreach(fun(Size) -> + {ok,Node} = + test_server:start_node + (test_iter_max_files,slave, + [{args, + "+A "++integer_to_list(Size)++ + " -pa " ++ Dir}]), + {Distr,SD} = rpc:call(Node,?MODULE,do_async_dist, + [DataDir]), + test_server:stop_node(Node), + if + SD > Max -> + io:format("Bad async queue distribution for " + "~p async threads:~n" + " Standard deviation is ~p~n" + " Key distribution:~n ~lp~n", + [Size,SD,Distr]), + exit({bad_async_dist,Size,SD,Distr}); + true -> + io:format("OK async queue distribution for " + "~p async threads:~n" + " Standard deviation is ~p~n" + " Key distribution:~n ~lp~n", + [Size,SD,Distr]), + ok + end + end, AsyncSizes), + ok. %% %% Open as many files as possible. Do this several times and check @@ -98,7 +176,7 @@ open_files(Name) -> ?line case file:open(Name, [read,raw]) of {ok, Fd} -> [Fd| open_files(Name)]; - {error, Reason} -> -% io:format("Error reason: ~p", [Reason]), + {error, _Reason} -> +% io:format("Error reason: ~p", [_Reason]), [] end. -- cgit v1.2.3