diff options
Diffstat (limited to 'erts/preloaded/src')
-rw-r--r-- | erts/preloaded/src/Makefile | 5 | ||||
-rw-r--r-- | erts/preloaded/src/atomics.erl | 119 | ||||
-rw-r--r-- | erts/preloaded/src/counters.erl | 104 | ||||
-rw-r--r-- | erts/preloaded/src/erl_prim_loader.erl | 7 | ||||
-rw-r--r-- | erts/preloaded/src/erlang.erl | 63 | ||||
-rw-r--r-- | erts/preloaded/src/erts.app.src | 4 | ||||
-rw-r--r-- | erts/preloaded/src/erts_internal.erl | 44 | ||||
-rw-r--r-- | erts/preloaded/src/init.erl | 8 | ||||
-rw-r--r-- | erts/preloaded/src/persistent_term.erl | 62 | ||||
-rw-r--r-- | erts/preloaded/src/prim_file.erl | 32 | ||||
-rw-r--r-- | erts/preloaded/src/prim_inet.erl | 312 |
11 files changed, 621 insertions, 139 deletions
diff --git a/erts/preloaded/src/Makefile b/erts/preloaded/src/Makefile index 4333f6643a..e1bd5bc295 100644 --- a/erts/preloaded/src/Makefile +++ b/erts/preloaded/src/Makefile @@ -47,7 +47,10 @@ PRE_LOADED_ERL_MODULES = \ erts_internal \ erl_tracer \ erts_literal_area_collector \ - erts_dirty_process_signal_handler + erts_dirty_process_signal_handler \ + atomics \ + counters \ + persistent_term PRE_LOADED_BEAM_MODULES = \ prim_eval diff --git a/erts/preloaded/src/atomics.erl b/erts/preloaded/src/atomics.erl new file mode 100644 index 0000000000..d1fe5e65cf --- /dev/null +++ b/erts/preloaded/src/atomics.erl @@ -0,0 +1,119 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% Purpose : Main atomics API module. + +-module(atomics). + +-export([new/2, + put/3, get/2, + add/3, add_get/3, + sub/3, sub_get/3, + exchange/3, compare_exchange/4, + info/1]). + +-export_type([atomics_ref/0]). + +-opaque atomics_ref() :: reference(). + +-define(OPT_SIGNED, (1 bsl 0)). +-define(OPT_DEFAULT, ?OPT_SIGNED). + +-spec new(Arity, Opts) -> atomics_ref() when + Arity :: pos_integer(), + Opts :: [Opt], + Opt :: {signed, boolean()}. +new(Arity, Opts) -> + erts_internal:atomics_new(Arity, encode_opts(Opts, ?OPT_DEFAULT)). + +encode_opts([{signed, true}|T], Acc) -> + encode_opts(T, Acc bor ?OPT_SIGNED); +encode_opts([{signed, false}|T], Acc) -> + encode_opts(T, Acc band (bnot ?OPT_SIGNED)); +encode_opts([], Acc) -> + Acc; +encode_opts(_, _) -> + erlang:error(badarg). + +-spec put(Ref, Ix, Value) -> ok when + Ref :: atomics_ref(), + Ix :: integer(), + Value :: integer(). +put(_Ref, _Ix, _Value) -> + erlang:nif_error(undef). + +-spec get(Ref, Ix) -> integer() when + Ref :: atomics_ref(), + Ix :: integer(). +get(_Ref, _Ix) -> + erlang:nif_error(undef). + +-spec add(Ref, Ix, Incr) -> ok when + Ref :: atomics_ref(), + Ix :: integer(), + Incr :: integer(). +add(_Ref, _Ix, _Incr) -> + erlang:nif_error(undef). + +-spec add_get(Ref, Ix, Incr) -> integer() when + Ref :: atomics_ref(), + Ix :: integer(), + Incr :: integer(). +add_get(_Ref, _Ix, _Incr) -> + erlang:nif_error(undef). + +-spec sub(Ref, Ix, Decr) -> ok when + Ref :: atomics_ref(), + Ix :: integer(), + Decr :: integer(). +sub(Ref, Ix, Decr) -> + ?MODULE:add(Ref, Ix, -Decr). + +-spec sub_get(Ref, Ix, Decr) -> integer() when + Ref :: atomics_ref(), + Ix :: integer(), + Decr :: integer(). +sub_get(Ref, Ix, Decr) -> + ?MODULE:add_get(Ref, Ix, -Decr). + +-spec exchange(Ref, Ix, Desired) -> integer() when + Ref :: atomics_ref(), + Ix :: integer(), + Desired :: integer(). +exchange(_Ref, _Ix, _Desired) -> + erlang:nif_error(undef). + +-spec compare_exchange(Ref, Ix, Expected, Desired) -> ok | integer() when + Ref :: atomics_ref(), + Ix :: integer(), + Expected :: integer(), + Desired :: integer(). +compare_exchange(_Ref, _Ix, _Expected, _Desired) -> + erlang:nif_error(undef). + +-spec info(Ref) -> Info when + Ref :: atomics_ref(), + Info :: #{'size':=Size,'max':=Max,'min':=Min,'memory':=Memory}, + Size :: non_neg_integer(), + Max :: integer(), + Min :: integer(), + Memory :: non_neg_integer(). +info(_Ref) -> + erlang:nif_error(undef). diff --git a/erts/preloaded/src/counters.erl b/erts/preloaded/src/counters.erl new file mode 100644 index 0000000000..a0e3035e0f --- /dev/null +++ b/erts/preloaded/src/counters.erl @@ -0,0 +1,104 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% Purpose : Main atomics API module. + +-module(counters). + +-export([new/2, + get/2, + add/3, + sub/3, + put/3, + info/1]). + +-export_type([counters_ref/0]). + +-opaque counters_ref() :: {atomics, reference()} | {write_concurrency, reference()}. + +-spec new(Size, Opts) -> counters_ref() when + Size :: pos_integer(), + Opts :: [Opt], + Opt :: atomics | write_concurrency. +new(Size, [atomics]) -> + {atomics, atomics:new(Size, [{signed, true}])}; +new(Size, [write_concurrency]) -> + {write_concurrency, erts_internal:counters_new(Size)}; +new(Size, []) -> + new(Size, [atomics]); +new(_, _) -> + erlang:error(badarg). + +-spec get(Ref, Ix) -> integer() when + Ref :: counters_ref(), + Ix :: integer(). +get({atomics,Ref}, Ix) -> + atomics:get(Ref, Ix); +get({write_concurrency, Ref}, Ix) -> + erts_internal:counters_get(Ref, Ix); +get(_, _) -> + erlang:error(badarg). + + + +-spec add(Ref, Ix, Incr) -> ok when + Ref :: counters_ref(), + Ix :: integer(), + Incr :: integer(). +add({atomics, Ref}, Ix, Incr) -> + atomics:add(Ref, Ix, Incr); +add({write_concurrency, Ref}, Ix, Incr) -> + erts_internal:counters_add(Ref, Ix, Incr); +add(_, _, _) -> + erlang:error(badarg). + + +-spec sub(Ref, Ix, Decr) -> ok when + Ref :: counters_ref(), + Ix :: integer(), + Decr :: integer(). +sub(Ref, Ix, Decr) -> + add(Ref, Ix, -Decr). + + +-spec put(Ref, Ix, Value) -> ok when + Ref :: counters_ref(), + Ix :: integer(), + Value :: integer(). +put({atomics, Ref}, Ix, Value) -> + atomics:put(Ref, Ix, Value); +put({write_concurrency, Ref}, Ix, Value) -> + erts_internal:counters_put(Ref, Ix, Value); +put(_, _, _) -> + erlang:error(badarg). + + +-spec info(Ref) -> Info when + Ref :: counters_ref(), + Info :: #{'size':=Size, 'memory':=Memory}, + Size :: non_neg_integer(), + Memory :: non_neg_integer(). +info({atomics, Ref}) -> + atomics:info(Ref); +info({write_concurrency, Ref}) -> + erts_internal:counters_info(Ref); +info(_) -> + erlang:error(badarg). + diff --git a/erts/preloaded/src/erl_prim_loader.erl b/erts/preloaded/src/erl_prim_loader.erl index ae5f86e017..1605c20f2c 100644 --- a/erts/preloaded/src/erl_prim_loader.erl +++ b/erts/preloaded/src/erl_prim_loader.erl @@ -297,12 +297,13 @@ check_file_result(Func, Target, {error,Reason}) -> "Target: " ++ TargetStr ++ ". " ++ "Function: " ++ atom_to_list(Func) ++ ". " ++ Process end, - %% this is equal to calling error_logger:error_report/1 which - %% we don't want to do from code_server during system boot + %% This is equal to calling logger:error/2 which + %% we don't want to do from code_server during system boot. + %% We don't want to call logger:timestamp() either. logger ! {log,error,#{label=>{?MODULE,file_error},report=>Report}, #{pid=>self(), gl=>group_leader(), - time=>erlang:monotonic_time(microsecond), + time=>os:system_time(microsecond), error_logger=>#{tag=>error_report, type=>std_error}}}, error diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 1ed6b6b284..5730e999cb 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -2526,6 +2526,9 @@ subtract(_,_) -> OldSchedulersOnline when SchedulersOnline :: pos_integer(), OldSchedulersOnline :: pos_integer(); + (system_logger, Logger) -> PrevLogger when + Logger :: logger | undefined | pid(), + PrevLogger :: logger | undefined | pid(); (trace_control_word, TCW) -> OldTCW when TCW :: non_neg_integer(), OldTCW :: non_neg_integer(); @@ -2534,7 +2537,7 @@ subtract(_,_) -> %% These are deliberately not documented (internal_cpu_topology, term()) -> term(); (sequential_tracer, pid() | port() | {module(), term()} | false) -> pid() | port() | false; - (1,0) -> true. + (reset_seq_trace,true) -> true. system_flag(_Flag, _Value) -> erlang:nif_error(undefined). @@ -2731,8 +2734,9 @@ tuple_to_list(_Tuple) -> (schedulers | schedulers_online) -> pos_integer(); (smp_support) -> boolean(); (start_time) -> integer(); - (system_version) -> string(); (system_architecture) -> string(); + (system_logger) -> logger | undefined | pid(); + (system_version) -> string(); (threads) -> boolean(); (thread_pool_size) -> non_neg_integer(); (time_correction) -> true | false; @@ -3395,60 +3399,15 @@ get_cookie() -> -spec integer_to_list(Integer, Base) -> string() when Integer :: integer(), Base :: 2..36. -integer_to_list(I, 10) -> - erlang:integer_to_list(I); -integer_to_list(I, Base) - when erlang:is_integer(I), erlang:is_integer(Base), - Base >= 2, Base =< 1+$Z-$A+10 -> - if I < 0 -> - [$-|integer_to_list(-I, Base, [])]; - true -> - integer_to_list(I, Base, []) - end; -integer_to_list(I, Base) -> - erlang:error(badarg, [I, Base]). - -integer_to_list(I0, Base, R0) -> - D = I0 rem Base, - I1 = I0 div Base, - R1 = if D >= 10 -> - [D-10+$A|R0]; - true -> - [D+$0|R0] - end, - if I1 =:= 0 -> - R1; - true -> - integer_to_list(I1, Base, R1) - end. +integer_to_list(_I, _Base) -> + erlang:nif_error(undefined). -spec integer_to_binary(Integer, Base) -> binary() when Integer :: integer(), Base :: 2..36. -integer_to_binary(I, 10) -> - erlang:integer_to_binary(I); -integer_to_binary(I, Base) - when erlang:is_integer(I), erlang:is_integer(Base), - Base >= 2, Base =< 1+$Z-$A+10 -> - if I < 0 -> - <<$-,(integer_to_binary(-I, Base, <<>>))/binary>>; - true -> - integer_to_binary(I, Base, <<>>) - end; -integer_to_binary(I, Base) -> - erlang:error(badarg, [I, Base]). - -integer_to_binary(I0, Base, R0) -> - D = I0 rem Base, - I1 = I0 div Base, - R1 = if - D >= 10 -> <<(D-10+$A),R0/binary>>; - true -> <<(D+$0),R0/binary>> - end, - if - I1 =:= 0 -> R1; - true -> integer_to_binary(I1, Base, R1) - end. +integer_to_binary(_I, _Base) -> + erlang:nif_error(undefined). + -record(cpu, {node = -1, processor = -1, diff --git a/erts/preloaded/src/erts.app.src b/erts/preloaded/src/erts.app.src index 8c34c99a98..ab0b9494b0 100644 --- a/erts/preloaded/src/erts.app.src +++ b/erts/preloaded/src/erts.app.src @@ -33,12 +33,14 @@ prim_file, prim_inet, prim_zip, + atomics, + counters, zlib ]}, {registered, []}, {applications, []}, {env, []}, - {runtime_dependencies, ["stdlib-3.5", "kernel-6.1", "sasl-3.0.1"]} + {runtime_dependencies, ["stdlib-3.5", "kernel-6.1", "sasl-3.3"]} ]}. %% vim: ft=erlang diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 88f47e917b..305b524438 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -90,6 +90,15 @@ -export([create_dist_channel/4]). +-export([erase_persistent_terms/0]). + +-export([atomics_new/2]). + +-export([counters_new/1, counters_get/2, counters_add/3, + counters_put/3, counters_info/1]). + +-export([spawn_system_process/3]). + %% %% Await result of send to port %% @@ -691,3 +700,38 @@ process_flag(_Pid, _Flag, _Value) -> create_dist_channel(_Node, _DistCtrlr, _Flags, _Ver) -> erlang:nif_error(undefined). + +-spec erase_persistent_terms() -> 'ok'. +erase_persistent_terms() -> + erlang:nif_error(undefined). + +-spec atomics_new(pos_integer(), pos_integer()) -> reference(). +atomics_new(_Arity, _EncOpts) -> + erlang:nif_error(undef). + +-spec counters_new(pos_integer()) -> reference(). +counters_new(_Size) -> + erlang:nif_error(undef). + +-spec counters_get(reference(), pos_integer()) -> integer(). +counters_get(_Ref, _Ix) -> + erlang:nif_error(undef). + +-spec counters_add(reference(), pos_integer(), integer()) -> ok. +counters_add(_Ref, _Ix, _Incr) -> + erlang:nif_error(undef). + +-spec counters_put(reference(), pos_integer(), integer()) -> ok. +counters_put(_Ref, _Ix, _Value) -> + erlang:nif_error(undef). + +-spec counters_info(reference()) -> #{}. +counters_info(_Ref) -> + erlang:nif_error(undef). + +-spec spawn_system_process(Mod, Func, Args) -> pid() when + Mod :: atom(), + Func :: atom(), + Args :: list(). +spawn_system_process(_Mod, _Func, _Args) -> + erlang:nif_error(undefined). diff --git a/erts/preloaded/src/init.erl b/erts/preloaded/src/init.erl index 253fcf7a1f..86b4f35ae5 100644 --- a/erts/preloaded/src/init.erl +++ b/erts/preloaded/src/init.erl @@ -483,13 +483,16 @@ do_handle_msg(Msg,State) -> {From, {ensure_loaded, _}} -> From ! {init, not_allowed}; X -> + %% This is equal to calling logger:info/3 which we don't + %% want to do from the init process, at least not during + %% system boot. We don't want to call logger:timestamp() + %% either. case whereis(user) of undefined -> - Time = erlang:monotonic_time(microsecond), catch logger ! {log, info, "init got unexpected: ~p", [X], #{pid=>self(), gl=>self(), - time=>Time, + time=>os:system_time(microsecond), error_logger=>#{tag=>info_msg}}}; User -> User ! X, @@ -552,6 +555,7 @@ stop(Reason,State) -> do_stop(restart,#state{start = Start, flags = Flags, args = Args}) -> %% Make sure we don't have any outstanding messages before doing the restart. flush(), + erts_internal:erase_persistent_terms(), boot(Start,Flags,Args); do_stop(reboot,_) -> halt(); diff --git a/erts/preloaded/src/persistent_term.erl b/erts/preloaded/src/persistent_term.erl new file mode 100644 index 0000000000..ee7e49b6cb --- /dev/null +++ b/erts/preloaded/src/persistent_term.erl @@ -0,0 +1,62 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(persistent_term). + +-export([erase/1,get/0,get/1,get/2,info/0,put/2]). + +-type key() :: term(). +-type value() :: term(). + +-spec erase(Key) -> Result when + Key :: key(), + Result :: boolean(). +erase(_Key) -> + erlang:nif_error(undef). + +-spec get() -> List when + List :: [{key(),value()}]. +get() -> + erlang:nif_error(undef). + +-spec get(Key) -> Value when + Key :: key(), + Value :: value(). +get(_Key) -> + erlang:nif_error(undef). + +-spec get(Key, Default) -> Value when + Key :: key(), + Default :: value(), + Value :: value(). +get(_Key, _Default) -> + erlang:nif_error(undef). + +-spec info() -> Info when + Info :: #{'count':=Count,'memory':=Memory}, + Count :: non_neg_integer(), + Memory :: non_neg_integer(). +info() -> + erlang:nif_error(undef). + +-spec put(Key, Value) -> 'ok' when + Key :: key(), + Value :: value(). +put(_Key, _Value) -> + erlang:nif_error(undef). diff --git a/erts/preloaded/src/prim_file.erl b/erts/preloaded/src/prim_file.erl index 6d85868183..1aa5d85c64 100644 --- a/erts/preloaded/src/prim_file.erl +++ b/erts/preloaded/src/prim_file.erl @@ -46,7 +46,7 @@ -define(MIN_READLINE_SIZE, 256). -define(LARGEFILESIZE, (1 bsl 63)). --export([copy/3]). +-export([copy/3, start/0]). -include("file_int.hrl"). @@ -83,7 +83,23 @@ internal_normalize_utf8(_) -> is_translatable(_) -> erlang:nif_error(undefined). -%% +%% This is a janitor process used to close files whose controlling process has +%% died. The emulator will be torn down if this is killed. +start() -> + helper_loop(). + +helper_loop() -> + receive + {close, FRef} when is_reference(FRef) -> delayed_close_nif(FRef); + _ -> ok + end, + helper_loop(). + +on_load() -> + %% This is spawned as a system process to prevent init:restart/0 from + %% killing it. + Pid = erts_internal:spawn_system_process(?MODULE, start, []), + ok = erlang:load_nif(atom_to_list(?MODULE), Pid). %% Returns {error, Reason} | {ok, BytesCopied} copy(#file_descriptor{module = ?MODULE} = Source, @@ -94,9 +110,6 @@ copy(#file_descriptor{module = ?MODULE} = Source, %% XXX Should be moved down to the driver for optimization. file:copy_opened(Source, Dest, Length). -on_load() -> - ok = erlang:load_nif(atom_to_list(?MODULE), 0). - open(Name, Modes) -> %% The try/catch pattern seen here is used throughout the file to adhere to %% the public file interface, which has leaked through for ages because of @@ -482,6 +495,8 @@ truncate_nif(_FileRef) -> erlang:nif_error(undef). get_handle_nif(_FileRef) -> erlang:nif_error(undef). +delayed_close_nif(_FileRef) -> + erlang:nif_error(undef). %% %% Quality-of-life helpers @@ -557,12 +572,13 @@ list_dir_convert([RawName | Rest], SkipInvalid, Result) -> {error, ignore} -> list_dir_convert(Rest, SkipInvalid, Result); {error, warning} -> - %% this is equal to calling error_logger:warning_msg/2 which - %% we don't want to do from code_server during system boot + %% This is equal to calling logger:warning/3 which + %% we don't want to do from code_server during system boot. + %% We don't want to call logger:timestamp() either. logger ! {log,warning,"Non-unicode filename ~p ignored\n", [RawName], #{pid=>self(), gl=>group_leader(), - time=>erlang:system_time(microsecond), + time=>os:system_time(microsecond), error_logger=>#{tag=>warning_msg}}}, list_dir_convert(Rest, SkipInvalid, Result); {error, _} -> diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl index 8169943dde..2820a5bef4 100644 --- a/erts/preloaded/src/prim_inet.erl +++ b/erts/preloaded/src/prim_inet.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2000-2018. All Rights Reserved. +%% Copyright Ericsson AB 2000-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -49,9 +49,15 @@ -include("inet_sctp.hrl"). -include("inet_int.hrl"). -%-define(DEBUG, 1). +%%%-define(DEBUG, 1). -ifdef(DEBUG). --define(DBG_FORMAT(Format, Args), (io:format((Format), (Args)))). +-define( + DBG_FORMAT(Format, Args), + begin + %% io:format((Format), (Args)), + erlang:display(lists:flatten(io_lib:format((Format), (Args)))), + ok + end). -else. -define(DBG_FORMAT(Format, Args), ok). -endif. @@ -150,39 +156,106 @@ shutdown_1(S, How) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% close(S) when is_port(S) -> + ?DBG_FORMAT("prim_inet:close(~p)~n", [S]), case getopt(S, linger) of {ok,{true,0}} -> close_port(S); - _ -> - case subscribe(S, [subs_empty_out_q]) of - {ok, [{subs_empty_out_q,N}]} when N > 0 -> - close_pend_loop(S, N); %% wait for pending output to be sent - _ -> - close_port(S) - end + {ok,{true,T}} -> + %% Wait for T seconds for pending output to be sent + %% + %% Note that this handling of Linger may look ok, + %% but sweeps some problems under the rug since + %% there are OS buffers that may have remaining data + %% after the inet driver has emptied its buffers. + %% But Linger for nonblocking sockets is broken + %% anyway on all OS:es, according to hearsay, + %% and is a contradiction in itself. + %% We have hereby done our best... + %% + case subscribe(S, [subs_empty_out_q]) of + {ok, [{subs_empty_out_q,0}]} -> + close_port(S); + {ok, [{subs_empty_out_q,N}]} when N > 0 -> + %% Wait for pending output to be sent + Tref = erlang:start_timer(T * 1000, self(), close_port), + close_pend_loop(S, Tref, N); + _ -> + %% Subscribe failed - wait full time + Tref = erlang:start_timer(T * 1000, self(), close_port), + close_pend_loop(S, Tref, undefined) + end; + _ -> % Regard this as {ok,{false,_}} + case subscribe(S, [subs_empty_out_q]) of + {ok, [{subs_empty_out_q,N}]} when N > 0 -> + %% Wait for pending output to be sent + DefaultT = 180000, % Arbitrary system timeout 3 min + Tref = erlang:start_timer(DefaultT, self(), close_port), + close_pend_loop(S, Tref, N); + _ -> + %% Subscribe failed or empty out q - give up or done + close_port(S) + end end. -close_pend_loop(S, N) -> +close_pend_loop(S, Tref, N) -> + ?DBG_FORMAT("prim_inet:close_pend_loop(~p, _, ~p)~n", [S,N]), receive - {empty_out_q,S} -> - close_port(S) + {timeout,Tref,_} -> % Linger timeout + ?DBG_FORMAT("prim_inet:close_pend_loop(~p, _, _) timeout~n", [S]), + close_port(S); + {empty_out_q,S} when N =/= undefined -> + ?DBG_FORMAT( + "prim_inet:close_pend_loop(~p, _, _) empty_out_q~n", [S]), + close_port(S, Tref) after ?INET_CLOSE_TIMEOUT -> case getstat(S, [send_pend]) of {ok, [{send_pend,N1}]} -> + ?DBG_FORMAT( + "prim_inet:close_pend_loop(~p, _, _) send_pend ~p~n", + [S,N1]), if - N1 =:= N -> - close_port(S); - true -> - close_pend_loop(S, N1) + N1 =:= 0 -> + %% Empty outq - done + close_port(S, Tref); + N =:= undefined -> + %% Within linger time - wait some more + close_pend_loop(S, Tref, N); + N1 =:= N -> + %% Inactivity - give up + close_port(S, Tref); + true -> + %% Still moving - wait some more + close_pend_loop(S, Tref, N) end; - _ -> - close_port(S) - end + _Stat -> + %% Failed getstat - give up + ?DBG_FORMAT( + "prim_inet:close_pend_loop(~p, _, _) getstat ~p~n", + [S,_Stat]), + close_port(S, Tref) + end end. + +close_port(S, Tref) -> + ?DBG_FORMAT("prim_inet:close_port(~p, _)~n", [S]), + case erlang:cancel_timer(Tref) of + false -> + receive + {timeout,Tref,_} -> + ok + end; + _N -> + ok + end, + close_port(S). +%% close_port(S) -> - catch erlang:port_close(S), - receive {'EXIT',S,_} -> ok after 0 -> ok end. + ?DBG_FORMAT("prim_inet:close_port(~p)~n", [S]), + _Closed = (catch erlang:port_close(S)), + receive {'EXIT',S,_} -> ok after 0 -> ok end, + ?DBG_FORMAT("prim_inet:close_port(~p) ~p~n", [S,_Closed]), + ok. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @@ -424,23 +497,49 @@ peeloff(S, AssocId) -> %% be called directly -- use "sendmsg" instead: %% send(S, Data, OptList) when is_port(S), is_list(OptList) -> - ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]), + ?DBG_FORMAT("prim_inet:send(~p, _, ~p)~n", [S,OptList]), try erlang:port_command(S, Data, OptList) of false -> % Port busy and nosuspend option passed ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []), {error,busy}; true -> - receive - {inet_reply,S,Status} -> - ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]), - Status - end + send_recv_reply(S, undefined) catch error:_Error -> ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []), {error,einval} end. +send_recv_reply(S, Mref) -> + ReplyTimeout = + case Mref of + undefined -> + ?INET_CLOSE_TIMEOUT; + _ -> + infinity + end, + receive + {inet_reply,S,Status} -> + ?DBG_FORMAT( + "prim_inet:send_recv_reply(~p, _): inet_reply ~p~n", + [S,Status]), + case Mref of + undefined -> ok; + _ -> + demonitor(Mref, [flush]), + ok + end, + Status; + {'DOWN',Mref,_,_,_Reason} when Mref =/= undefined -> + ?DBG_FORMAT( + "prim_inet:send_recv_reply(~p, _) 'DOWN' ~p~n", + [S,_Reason]), + {error,closed} + after ReplyTimeout -> + send_recv_reply(S, monitor(port, S)) + end. + + send(S, Data) -> send(S, Data, []). @@ -520,13 +619,35 @@ sendfile(S, FileHandle, Offset, Length) sendfile(S, FileHandle, Offset, Length) -> case erlang:port_info(S, connected) of {connected, Pid} when Pid =:= self() -> - sendfile_1(S, FileHandle, Offset, Length); + Uncork = sendfile_maybe_cork(S), + Result = sendfile_1(S, FileHandle, Offset, Length), + sendfile_maybe_uncork(S, Uncork), + Result; {connected, Pid} when Pid =/= self() -> {error, not_owner}; _Other -> {error, einval} end. +sendfile_maybe_cork(S) -> + case getprotocol(S) of + tcp -> + case getopts(S, [nopush]) of + {ok, [{nopush,false}]} -> + _ = setopts(S, [{nopush,true}]), + true; + _ -> + false + end; + _ -> false + end. + +sendfile_maybe_uncork(S, true) -> + _ = setopts(S, [{nopush,false}]), + ok; +sendfile_maybe_uncork(_, false) -> + ok. + sendfile_1(S, FileHandle, Offset, 0) -> sendfile_1(S, FileHandle, Offset, (1 bsl 63) - 1); sendfile_1(_S, _FileHandle, Offset, Length) when @@ -870,9 +991,9 @@ chgopts(S, Opts) when is_port(S), is_list(Opts) -> getifaddrs(S) when is_port(S) -> case ctl_cmd(S, ?INET_REQ_GETIFADDRS, []) of - {ok, Data} -> - {ok, comp_ifaddrs(build_ifaddrs(Data), ktree_empty())}; - {error,enotsup} -> + {ok, Data} -> + {ok, comp_ifaddrs(build_ifaddrs(Data))}; + {error,enotsup} -> case getiflist(S) of {ok, IFs} -> {ok, getifaddrs_ifget(S, IFs)}; @@ -881,30 +1002,75 @@ getifaddrs(S) when is_port(S) -> Err2 -> Err2 end. -%% Restructure interface properties per interface and remove duplicates - -comp_ifaddrs([{If,Opts}|IfOpts], T) -> - case ktree_is_defined(If, T) of - true -> - OptSet = comp_ifaddrs_add(ktree_get(If, T), Opts), - comp_ifaddrs(IfOpts, ktree_update(If, OptSet, T)); - false -> - OptSet = comp_ifaddrs_add(ktree_empty(), Opts), - comp_ifaddrs(IfOpts, ktree_insert(If, OptSet, T)) - end; -comp_ifaddrs([], T) -> - [{If,ktree_keys(ktree_get(If, T))} || If <- ktree_keys(T)]. - -comp_ifaddrs_add(OptSet, [Opt|Opts]) -> - case ktree_is_defined(Opt, OptSet) of - true - when element(1, Opt) =:= flags; - element(1, Opt) =:= hwaddr -> - comp_ifaddrs_add(OptSet, Opts); - _ -> - comp_ifaddrs_add(ktree_insert(Opt, undefined, OptSet), Opts) +%% Restructure interface properties per interface + +comp_ifaddrs(IfOpts) -> + comp_ifaddrs(IfOpts, ktree_empty()). +%% +comp_ifaddrs([{If,[{flags,Flags}|Opts]}|IfOpts], IfT) -> + case ktree_is_defined(If, IfT) of + true -> + comp_ifaddrs( + IfOpts, + ktree_update( + If, + comp_ifaddrs_flags(Flags, Opts, ktree_get(If, IfT)), + IfT)); + false -> + comp_ifaddrs( + IfOpts, + ktree_insert( + If, + comp_ifaddrs_flags(Flags, Opts, ktree_empty()), + IfT)) end; -comp_ifaddrs_add(OptSet, []) -> OptSet. +comp_ifaddrs([], IfT) -> + comp_ifaddrs_2(ktree_keys(IfT), IfT). + +comp_ifaddrs_flags(Flags, Opts, FlagsT) -> + case ktree_is_defined(Flags, FlagsT) of + true -> + ktree_update( + Flags, + rev(Opts, ktree_get(Flags, FlagsT)), + FlagsT); + false -> + ktree_insert(Flags, rev(Opts), FlagsT) + end. + +comp_ifaddrs_2([If|Ifs], IfT) -> + FlagsT = ktree_get(If, IfT), + [{If,comp_ifaddrs_3(ktree_keys(FlagsT), FlagsT)} + | comp_ifaddrs_2(Ifs, IfT)]; +comp_ifaddrs_2([], _IfT) -> + []. +%% +comp_ifaddrs_3([Flags|FlagsL], FlagsT) -> + [{flags,Flags}|hwaddr_last(rev(ktree_get(Flags, FlagsT)))] + ++ hwaddr_last(comp_ifaddrs_3(FlagsL, FlagsT)); +comp_ifaddrs_3([], _FlagsT) -> + []. + +%% Place hwaddr last to look more like legacy emulation +hwaddr_last(Opts) -> + hwaddr_last(Opts, Opts, []). +%% +hwaddr_last([{hwaddr,_} = Opt|Opts], L, R) -> + hwaddr_last(Opts, L, [Opt|R]); +hwaddr_last([_|Opts], L, R) -> + hwaddr_last(Opts, L, R); +hwaddr_last([], L, []) -> + L; +hwaddr_last([], L, R) -> + rev(hwaddr_last(L, []), rev(R)). +%% +hwaddr_last([{hwaddr,_}|Opts], R) -> + hwaddr_last(Opts, R); +hwaddr_last([Opt|Opts], R) -> + hwaddr_last(Opts, [Opt|R]); +hwaddr_last([], R) -> + R. + %% Legacy emulation of getifaddrs @@ -912,21 +1078,19 @@ getifaddrs_ifget(_, []) -> []; getifaddrs_ifget(S, [IF|IFs]) -> case ifget(S, IF, [flags]) of {ok,[{flags,Flags}]=FlagsVals} -> - BroadOpts = - case member(broadcast, Flags) of - true -> - [broadaddr,hwaddr]; - false -> - [hwaddr] - end, - P2POpts = - case member(pointtopoint, Flags) of - true -> - [dstaddr|BroadOpts]; - false -> - BroadOpts - end, - getifaddrs_ifget(S, IFs, IF, FlagsVals, [addr,netmask|P2POpts]); + GetOpts = + case member(pointtopoint, Flags) of + true -> + [dstaddr,hwaddr]; + false -> + case member(broadcast, Flags) of + true -> + [broadaddr,hwaddr]; + false -> + [hwaddr] + end + end, + getifaddrs_ifget(S, IFs, IF, FlagsVals, [addr,netmask|GetOpts]); _ -> getifaddrs_ifget(S, IFs, IF, [], [addr,netmask,hwaddr]) end. @@ -1275,6 +1439,7 @@ enc_opt(pktoptions) -> ?INET_OPT_PKTOPTIONS; enc_opt(ttl) -> ?INET_OPT_TTL; enc_opt(recvttl) -> ?INET_OPT_RECVTTL; enc_opt(nodelay) -> ?TCP_OPT_NODELAY; +enc_opt(nopush) -> ?TCP_OPT_NOPUSH; enc_opt(multicast_if) -> ?UDP_OPT_MULTICAST_IF; enc_opt(multicast_ttl) -> ?UDP_OPT_MULTICAST_TTL; enc_opt(multicast_loop) -> ?UDP_OPT_MULTICAST_LOOP; @@ -1336,6 +1501,7 @@ dec_opt(?INET_OPT_PRIORITY) -> priority; dec_opt(?INET_OPT_TOS) -> tos; dec_opt(?INET_OPT_TCLASS) -> tclass; dec_opt(?TCP_OPT_NODELAY) -> nodelay; +dec_opt(?TCP_OPT_NOPUSH) -> nopush; dec_opt(?INET_OPT_RECVTOS) -> recvtos; dec_opt(?INET_OPT_RECVTCLASS) -> recvtclass; dec_opt(?INET_OPT_PKTOPTIONS) -> pktoptions; @@ -1422,6 +1588,7 @@ type_opt_1(pktoptions) -> opts; type_opt_1(ttl) -> int; type_opt_1(recvttl) -> bool; type_opt_1(nodelay) -> bool; +type_opt_1(nopush) -> bool; type_opt_1(ipv6_v6only) -> bool; %% multicast type_opt_1(multicast_ttl) -> int; @@ -2500,7 +2667,7 @@ get_addrs([F|Addrs]) -> [Addr|get_addrs(Rest)]. get_addr(?INET_AF_LOCAL, [N|Addr]) -> - {A,Rest} = lists:split(N, Addr), + {A,Rest} = split(N, Addr), {{local,iolist_to_binary(A)},Rest}; get_addr(?INET_AF_UNSPEC, Rest) -> {{unspec,<<>>},Rest}; @@ -2522,12 +2689,13 @@ get_ip6([X1,X2,X3,X4,X5,X6,X7,X8,X9,X10,X11,X12,X13,X14,X15,X16 | T]) -> ?u16(X9,X10),?u16(X11,X12),?u16(X13,X14),?u16(X15,X16)}, T }. +-define(ERTS_INET_DRV_CONTROL_MAGIC_NUMBER, 16#03f1a300). %% Control command ctl_cmd(Port, Cmd, Args) -> ?DBG_FORMAT("prim_inet:ctl_cmd(~p, ~p, ~p)~n", [Port,Cmd,Args]), Result = - try erlang:port_control(Port, Cmd, Args) of + try erlang:port_control(Port, Cmd+?ERTS_INET_DRV_CONTROL_MAGIC_NUMBER, Args) of [?INET_REP_OK|Reply] -> {ok,Reply}; [?INET_REP] -> inet_reply; [?INET_REP_ERROR|Err] -> {error,list_to_atom(Err)} |