aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
authorPatrik Nyblom <[email protected]>2013-08-16 17:14:25 +0200
committerPatrik Nyblom <[email protected]>2013-08-23 17:03:20 +0200
commit6d1f1d43aa0a9e0a2cb275ef760339c49518fc69 (patch)
treec51d17da4e41d64eddb42a4918a3f9d2b5967ded /erts/emulator
parent7204e78d8d16e41769cfd4b7b4051545052c335e (diff)
downloadotp-6d1f1d43aa0a9e0a2cb275ef760339c49518fc69.tar.gz
otp-6d1f1d43aa0a9e0a2cb275ef760339c49518fc69.tar.bz2
otp-6d1f1d43aa0a9e0a2cb275ef760339c49518fc69.zip
Create better distribution of files over async threads
The actual port id is used to create a key from the pointer value which is the ErlDrvPort. To do this a new driver api function driver_async_port_key is added and the driver API minor version is updated. The documentation is updated and the faulty description of how to spread ports over async threads is updated to use the new API. Testcase also added.
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/beam/erl_async.c14
-rw-r--r--erts/emulator/beam/erl_driver.h4
-rw-r--r--erts/emulator/drivers/common/efile_drv.c3
-rw-r--r--erts/emulator/sys/win32/erl_win_dyn_driver.h4
-rw-r--r--erts/emulator/test/efile_SUITE.erl88
5 files changed, 106 insertions, 7 deletions
diff --git a/erts/emulator/beam/erl_async.c b/erts/emulator/beam/erl_async.c
index 054d1a48f6..e6d72f569b 100644
--- a/erts/emulator/beam/erl_async.c
+++ b/erts/emulator/beam/erl_async.c
@@ -583,6 +583,20 @@ int erts_async_ready_clean(void *varq, void *val)
#endif
/*
+** Generate a fair async key prom an ErlDrvPort
+** The port data gives a fair distribution grom port pointer
+** to unsigned integer - to be used in key for driver_async below.
+*/
+unsigned int driver_async_port_key(ErlDrvPort port)
+{
+ ErlDrvTermData td = driver_mk_port(port);
+ if (td == (ErlDrvTermData) NIL) {
+ return 0;
+ }
+ return (unsigned int) (UWord) internal_port_data(td);
+}
+
+/*
** Schedule async_invoke on a worker thread
** NOTE will be syncrounous when threads are unsupported
** return values:
diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h
index e280563de1..1ab6e17f56 100644
--- a/erts/emulator/beam/erl_driver.h
+++ b/erts/emulator/beam/erl_driver.h
@@ -133,7 +133,7 @@ typedef struct {
#define ERL_DRV_EXTENDED_MARKER (0xfeeeeeed)
#define ERL_DRV_EXTENDED_MAJOR_VERSION 2
-#define ERL_DRV_EXTENDED_MINOR_VERSION 1
+#define ERL_DRV_EXTENDED_MINOR_VERSION 2
/*
* The emulator will refuse to load a driver with different major
@@ -638,6 +638,8 @@ EXTERN int erl_drv_send_term(ErlDrvTermData port,
int len);
/* Async IO functions */
+EXTERN unsigned int driver_async_port_key(ErlDrvPort port);
+
EXTERN long driver_async(ErlDrvPort ix,
unsigned int* key,
void (*async_invoke)(void*),
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c
index 779ecd1bd2..c997fe1bf9 100644
--- a/erts/emulator/drivers/common/efile_drv.c
+++ b/erts/emulator/drivers/common/efile_drv.c
@@ -772,6 +772,7 @@ file_init(void)
return 0;
}
+
/*********************************************************************
* Driver entry point -> start
*/
@@ -788,7 +789,7 @@ file_start(ErlDrvPort port, char* command)
}
desc->fd = FILE_FD_INVALID;
desc->port = port;
- desc->key = (unsigned int) (UWord) port;
+ desc->key = driver_async_port_key(port);
desc->flags = 0;
desc->invoke = NULL;
desc->d = NULL;
diff --git a/erts/emulator/sys/win32/erl_win_dyn_driver.h b/erts/emulator/sys/win32/erl_win_dyn_driver.h
index a7c53c904d..b9a9838a36 100644
--- a/erts/emulator/sys/win32/erl_win_dyn_driver.h
+++ b/erts/emulator/sys/win32/erl_win_dyn_driver.h
@@ -80,6 +80,7 @@ WDD_TYPEDEF(int, erl_drv_output_term, (ErlDrvTermData, ErlDrvTermData*, int));
WDD_TYPEDEF(int, driver_output_term, (ErlDrvPort, ErlDrvTermData*, int));
WDD_TYPEDEF(int, erl_drv_send_term, (ErlDrvTermData, ErlDrvTermData, ErlDrvTermData*, int));
WDD_TYPEDEF(int, driver_send_term, (ErlDrvPort, ErlDrvTermData, ErlDrvTermData*, int));
+WDD_TYPEDEF(unsigned int, driver_async_port_key, (ErlDrvPort));
WDD_TYPEDEF(long, driver_async, (ErlDrvPort,unsigned int*,void (*)(void*),void*,void (*)(void*)));
WDD_TYPEDEF(int, driver_async_cancel, (unsigned int));
WDD_TYPEDEF(int, driver_lock_driver, (ErlDrvPort));
@@ -197,6 +198,7 @@ typedef struct {
WDD_FTYPE(driver_output_term) *driver_output_term;
WDD_FTYPE(erl_drv_send_term) *erl_drv_send_term;
WDD_FTYPE(driver_send_term) *driver_send_term;
+ WDD_FTYPE(driver_async_port_key) *driver_async_port_key;
WDD_FTYPE(driver_async) *driver_async;
WDD_FTYPE(driver_async_cancel) *driver_async_cancel;
WDD_FTYPE(driver_lock_driver) *driver_lock_driver;
@@ -308,6 +310,7 @@ extern TWinDynDriverCallbacks WinDynDriverCallbacks;
#define driver_output_term (WinDynDriverCallbacks.driver_output_term)
#define erl_drv_send_term (WinDynDriverCallbacks.erl_drv_send_term)
#define driver_send_term (WinDynDriverCallbacks.driver_send_term)
+#define driver_async_port_key (WinDynDriverCallbacks.driver_async_port_key)
#define driver_async (WinDynDriverCallbacks.driver_async)
#define driver_async_cancel (WinDynDriverCallbacks.driver_async_cancel)
#define driver_lock_driver (WinDynDriverCallbacks.driver_lock_driver)
@@ -443,6 +446,7 @@ do { \
((W).driver_output_term) = driver_output_term; \
((W).erl_drv_send_term) = erl_drv_send_term; \
((W).driver_send_term) = driver_send_term; \
+((W).driver_async_port_key) = driver_async_port_key; \
((W).driver_async) = driver_async; \
((W).driver_async_cancel) = driver_async_cancel; \
((W).driver_lock_driver) = driver_lock_driver; \
diff --git a/erts/emulator/test/efile_SUITE.erl b/erts/emulator/test/efile_SUITE.erl
index 65367eab98..f79bb761d1 100644
--- a/erts/emulator/test/efile_SUITE.erl
+++ b/erts/emulator/test/efile_SUITE.erl
@@ -19,16 +19,16 @@
-module(efile_SUITE).
-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1,
init_per_group/2,end_per_group/2]).
--export([iter_max_files/1]).
+-export([iter_max_files/1, async_dist/1]).
--export([do_iter_max_files/2]).
+-export([do_iter_max_files/2, do_async_dist/1]).
-include_lib("test_server/include/test_server.hrl").
suite() -> [{ct_hooks,[ts_install_cth]}].
all() ->
- [iter_max_files].
+ [iter_max_files, async_dist].
groups() ->
[].
@@ -45,6 +45,84 @@ init_per_group(_GroupName, Config) ->
end_per_group(_GroupName, Config) ->
Config.
+do_async_dist(Dir) ->
+ X = 100,
+ AT = erlang:system_info(thread_pool_size),
+ Keys = file_keys(Dir,AT*X,[],[]),
+ Tab = ets:new(x,[ordered_set]),
+ [ ets:insert(Tab,{N,0}) || N <- lists:seq(0,AT-1) ],
+ [ ets:update_counter(Tab,(N rem AT),1) || N <- Keys ],
+ Res = [ V || {_,V} <- ets:tab2list(Tab) ],
+ ets:delete(Tab),
+ {Res, sdev(Res)/X}.
+
+sdev(List) ->
+ Len = length(List),
+ Mean = lists:sum(List)/Len,
+ math:sqrt(lists:sum([ (X - Mean) * (X - Mean) || X <- List ]) / Len).
+
+file_keys(_,0,FdList,FnList) ->
+ [ file:close(FD) || FD <- FdList ],
+ [ file:delete(FN) || FN <- FnList ],
+ [];
+file_keys(Dir,Num,FdList,FnList) ->
+ Name = "dummy"++integer_to_list(Num),
+ FN = filename:join([Dir,Name]),
+ case file:open(FN,[write,raw]) of
+ {ok,FD} ->
+ {file_descriptor,prim_file,{Port,_}} = FD,
+ <<X:32/integer-big>> =
+ iolist_to_binary(erlang:port_control(Port,$K,[])),
+ [X | file_keys(Dir,Num-1,[FD|FdList],[FN|FnList])];
+ {error,_} ->
+ % Try freeing up FD's if there are any
+ case FdList of
+ [] ->
+ exit({cannot_open_file,FN});
+ _ ->
+ [ file:close(FD) || FD <- FdList ],
+ [ file:delete(F) || F <- FnList ],
+ file_keys(Dir,Num,[],[])
+ end
+ end.
+
+async_dist(doc) ->
+ "Check that the distribution of files over async threads is fair";
+async_dist(Config) when is_list(Config) ->
+ DataDir = ?config(data_dir,Config),
+ TestFile = filename:join(DataDir, "existing_file"),
+ Dir = filename:dirname(code:which(?MODULE)),
+ AsyncSizes = [7,10,100,255,256,64,63,65],
+ Max = 0.5,
+
+ lists:foreach(fun(Size) ->
+ {ok,Node} =
+ test_server:start_node
+ (test_iter_max_files,slave,
+ [{args,
+ "+A "++integer_to_list(Size)++
+ " -pa " ++ Dir}]),
+ {Distr,SD} = rpc:call(Node,?MODULE,do_async_dist,
+ [DataDir]),
+ test_server:stop_node(Node),
+ if
+ SD > Max ->
+ io:format("Bad async queue distribution for "
+ "~p async threads:~n"
+ " Standard deviation is ~p~n"
+ " Key distribution:~n ~lp~n",
+ [Size,SD,Distr]),
+ exit({bad_async_dist,Size,SD,Distr});
+ true ->
+ io:format("OK async queue distribution for "
+ "~p async threads:~n"
+ " Standard deviation is ~p~n"
+ " Key distribution:~n ~lp~n",
+ [Size,SD,Distr]),
+ ok
+ end
+ end, AsyncSizes),
+ ok.
%%
%% Open as many files as possible. Do this several times and check
@@ -98,7 +176,7 @@ open_files(Name) ->
?line case file:open(Name, [read,raw]) of
{ok, Fd} ->
[Fd| open_files(Name)];
- {error, Reason} ->
-% io:format("Error reason: ~p", [Reason]),
+ {error, _Reason} ->
+% io:format("Error reason: ~p", [_Reason]),
[]
end.