diff options
44 files changed, 2747 insertions, 1088 deletions
diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 7699f64c25..fabca87e9f 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -3339,7 +3339,7 @@ RealSystem = system + MissedSystem</code> <func> <name name="monitor" arity="2" clause_i="1" since=""/> - <name name="monitor" arity="2" clause_i="2" since="?"/> + <name name="monitor" arity="2" clause_i="2" since="OTP 19.0"/> <name name="monitor" arity="2" clause_i="3" since="OTP 18.0"/> <fsummary>Start monitoring.</fsummary> <type name="registered_name"/> @@ -4526,7 +4526,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="port_info" arity="2" clause_i="5" since="?"/> + <name name="port_info" arity="2" clause_i="5" since="OTP R16B"/> <fsummary>Information about the locking of a port.</fsummary> <desc> <p><c><anno>Locking</anno></c> is one of the following:</p> @@ -4547,7 +4547,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="port_info" arity="2" clause_i="6" since="?"/> + <name name="port_info" arity="2" clause_i="6" since="OTP R16B"/> <fsummary>Information about the memory size of a port.</fsummary> <desc> <p><c><anno>Bytes</anno></c> is the total number of @@ -4565,7 +4565,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="port_info" arity="2" clause_i="7" since="?"/> + <name name="port_info" arity="2" clause_i="7" since="OTP R16B"/> <fsummary>Information about the monitors of a port.</fsummary> <desc> <p><c><anno>Monitors</anno></c> represent processes monitored by @@ -4581,7 +4581,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="port_info" arity="2" clause_i="8" since="?"/> + <name name="port_info" arity="2" clause_i="8" since="OTP 19.0"/> <fsummary>Which processes are monitoring this port.</fsummary> <desc> <p>Returns list of pids that are monitoring given port at the @@ -4613,7 +4613,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="port_info" arity="2" clause_i="10" since="?"/> + <name name="port_info" arity="2" clause_i="10" since="OTP R16B"/> <fsummary>Information about the OS pid of a port.</fsummary> <desc> <p><c><anno>OsPid</anno></c> is the process identifier (or equivalent) @@ -4651,7 +4651,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="port_info" arity="2" clause_i="12" since="?"/> + <name name="port_info" arity="2" clause_i="12" since="OTP R16B"/> <fsummary>Information about the parallelism hint of a port.</fsummary> <desc> <p><c><anno>Boolean</anno></c> corresponds to the port parallelism @@ -4662,7 +4662,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="port_info" arity="2" clause_i="13" since="?"/> + <name name="port_info" arity="2" clause_i="13" since="OTP R16B"/> <fsummary>Information about the queue size of a port.</fsummary> <desc> <p><c><anno>Bytes</anno></c> is the total number @@ -4782,7 +4782,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="process_flag" arity="2" clause_i="4" since="?"/> + <name name="process_flag" arity="2" clause_i="4" since="OTP R13B04"/> <fsummary>Set process flag min_bin_vheap_size for the calling process. </fsummary> <desc> @@ -4794,7 +4794,7 @@ RealSystem = system + MissedSystem</code> <func> <name name="process_flag" arity="2" clause_i="5" - anchor="process_flag_max_heap_size" since="?"/> + anchor="process_flag_max_heap_size" since="OTP 19.0"/> <fsummary>Set process flag max_heap_size for the calling process. </fsummary> <type name="max_heap_size"/> @@ -4868,7 +4868,7 @@ RealSystem = system + MissedSystem</code> <func> <name name="process_flag" arity="2" clause_i="6" - anchor="process_flag_message_queue_data" since="?"/> + anchor="process_flag_message_queue_data" since="OTP 19.0"/> <fsummary>Set process flag message_queue_data for the calling process. </fsummary> <type name="message_queue_data"/> @@ -5047,7 +5047,7 @@ RealSystem = system + MissedSystem</code> </func> <func> - <name name="process_flag" arity="3" since="?"/> + <name name="process_flag" arity="3" since=""/> <fsummary>Set process flags for a process.</fsummary> <desc> <p>Sets certain flags for the process <c><anno>Pid</anno></c>, @@ -6399,7 +6399,7 @@ true</pre> <func> <name name="statistics" arity="1" clause_i="1" - anchor="statistics_active_tasks" since="?"/> + anchor="statistics_active_tasks" since="OTP 18.3"/> <fsummary>Information about active processes and ports.</fsummary> <desc> <p>Returns the same as @@ -6414,7 +6414,7 @@ true</pre> <func> <name name="statistics" arity="1" clause_i="2" - anchor="statistics_active_tasks_all" since="?"/> + anchor="statistics_active_tasks_all" since="OTP 20.0"/> <fsummary>Information about active processes and ports.</fsummary> <desc> <p>Returns a list where each element represents the amount @@ -6503,7 +6503,7 @@ true</pre> <func> <name name="statistics" arity="1" clause_i="7" - anchor="statistics_microstate_accounting" since="?"/> + anchor="statistics_microstate_accounting" since="OTP 19.0"/> <fsummary>Information about microstate accounting.</fsummary> <desc> <p>Microstate accounting can be used to measure how much time the Erlang @@ -6682,7 +6682,7 @@ lists:map( <func> <name name="statistics" arity="1" clause_i="10" - anchor="statistics_run_queue_lengths" since="?"/> + anchor="statistics_run_queue_lengths" since="OTP 18.3"/> <fsummary>Information about the run-queue lengths.</fsummary> <desc> <p>Returns the same as @@ -6697,7 +6697,7 @@ lists:map( <func> <name name="statistics" arity="1" clause_i="11" - anchor="statistics_run_queue_lengths_all" since="?"/> + anchor="statistics_run_queue_lengths_all" since="OTP 20.0"/> <fsummary>Information about the run-queue lengths.</fsummary> <desc> <p>Returns a list where each element represents the amount @@ -6758,7 +6758,7 @@ lists:map( <func> <name name="statistics" arity="1" clause_i="13" - anchor="statistics_scheduler_wall_time" since="?"/> + anchor="statistics_scheduler_wall_time" since="OTP R15B01"/> <fsummary>Information about each schedulers work time.</fsummary> <desc> <p>Returns a list of tuples with @@ -6882,7 +6882,7 @@ ok <func> <name name="statistics" arity="1" clause_i="14" - anchor="statistics_scheduler_wall_time_all" since="?"/> + anchor="statistics_scheduler_wall_time_all" since="OTP 20.0"/> <fsummary>Information about each schedulers work time.</fsummary> <desc> <p>The same as @@ -6910,7 +6910,7 @@ ok </func> <func> <name name="statistics" arity="1" clause_i="15" - anchor="statistics_total_active_tasks" since="?"/> + anchor="statistics_total_active_tasks" since="OTP 18.3"/> <fsummary>Information about active processes and ports.</fsummary> <desc> <p>The same as calling @@ -6921,7 +6921,7 @@ ok <func> <name name="statistics" arity="1" clause_i="16" - anchor="statistics_total_active_tasks_all" since="?"/> + anchor="statistics_total_active_tasks_all" since="OTP 20.0"/> <fsummary>Information about active processes and ports.</fsummary> <desc> <p>The same as calling @@ -6932,7 +6932,7 @@ ok <func> <name name="statistics" arity="1" clause_i="17" - anchor="statistics_total_run_queue_lengths" since="?"/> + anchor="statistics_total_run_queue_lengths" since="OTP 18.3"/> <fsummary>Information about the run-queue lengths.</fsummary> <desc> <p>The same as calling @@ -6943,7 +6943,7 @@ ok <func> <name name="statistics" arity="1" clause_i="18" - anchor="statistics_total_run_queue_lengths_all" since="?"/> + anchor="statistics_total_run_queue_lengths_all" since="OTP 20.0"/> <fsummary>Information about the run-queue lengths.</fsummary> <desc> <p>The same as calling @@ -7186,7 +7186,7 @@ ok <func> <name name="system_flag" arity="2" clause_i="3" - anchor="system_flag_dirty_cpu_schedulers_online" since="?"/> + anchor="system_flag_dirty_cpu_schedulers_online" since="OTP 17.0"/> <fsummary>Set system_flag_dirty_cpu_schedulers_online.</fsummary> <desc> <p> @@ -7214,7 +7214,7 @@ ok </func> <func> - <name name="system_flag" arity="2" clause_i="4" since="?"/> + <name name="system_flag" arity="2" clause_i="4" since="OTP 20.2.3"/> <fsummary>Set system flag for erts_alloc.</fsummary> <desc> <p>Sets system flags for @@ -7251,7 +7251,7 @@ ok <func> <name name="system_flag" arity="2" clause_i="6" - anchor="system_flag_microstate_accounting" since="?"/> + anchor="system_flag_microstate_accounting" since="OTP 19.0"/> <fsummary>Set system flag microstate_accounting.</fsummary> <desc> <p> @@ -7279,7 +7279,7 @@ ok </func> <func> - <name name="system_flag" arity="2" clause_i="8" since="?"/> + <name name="system_flag" arity="2" clause_i="8" since="OTP R13B04"/> <fsummary>Set system flag min_bin_vheap_size.</fsummary> <desc> <p>Sets the default minimum binary virtual heap size for @@ -7297,7 +7297,7 @@ ok <func> <name name="system_flag" arity="2" clause_i="9" - anchor="system_flag_max_heap_size" since="?"/> + anchor="system_flag_max_heap_size" since="OTP 19.0"/> <fsummary>Set system flag max_heap_size.</fsummary> <type name="max_heap_size"/> <desc> @@ -7498,7 +7498,7 @@ ok <func> <name name="system_flag" arity="2" clause_i="12" - anchor="system_flag_scheduler_wall_time" since="?"/> + anchor="system_flag_scheduler_wall_time" since="OTP R15B01"/> <fsummary>Set system flag scheduler_wall_time.</fsummary> <desc> <p> @@ -7586,7 +7586,7 @@ Metadata = #{ pid => pid(), <func> <name name="system_flag" arity="2" clause_i="16" - anchor="system_flag_time_offset" since="?"/> + anchor="system_flag_time_offset" since="OTP 18.0"/> <fsummary>Finalize the time offset.</fsummary> <desc> <p> @@ -7909,7 +7909,7 @@ Metadata = #{ pid => pid(), anchor="system_info_cpu_topology" since=""/> <!-- cpu_topology --> <name name="system_info" arity="1" clause_i="13" since=""/> <!-- {cpu_topology, _} --> <name name="system_info" arity="1" clause_i="38" since=""/> <!-- logical_processors --> - <name name="system_info" arity="1" clause_i="74" since="?"/> <!-- update_cpu_info --> + <name name="system_info" arity="1" clause_i="74" since="OTP R14B"/> <!-- update_cpu_info --> <fsummary>Information about the CPU topology of the system.</fsummary> <type name="cpu_topology"/> <type name="level_entry"/> @@ -8061,14 +8061,14 @@ Metadata = #{ pid => pid(), <func> <name name="system_info" arity="1" clause_i="31" - anchor="system_info_process" since="?"/> <!-- fullsweep_after --> + anchor="system_info_process" since=""/> <!-- fullsweep_after --> <name name="system_info" arity="1" clause_i="32" since=""/> <!-- garbage_collection --> <name name="system_info" arity="1" clause_i="33" since=""/> <!-- heap_sizes --> <name name="system_info" arity="1" clause_i="34" since=""/> <!-- heap_type --> - <name name="system_info" arity="1" clause_i="40" since="?"/> <!-- max_heap_size --> - <name name="system_info" arity="1" clause_i="41" since="?"/> <!-- message_queue_data --> - <name name="system_info" arity="1" clause_i="42" since="?"/> <!-- min_heap_size --> - <name name="system_info" arity="1" clause_i="43" since="?"/> <!-- min_bin_vheap_size --> + <name name="system_info" arity="1" clause_i="40" since="OTP 19.0"/> <!-- max_heap_size --> + <name name="system_info" arity="1" clause_i="41" since="OTP 19.0"/> <!-- message_queue_data --> + <name name="system_info" arity="1" clause_i="42" since="OTP R13B04"/> <!-- min_heap_size --> + <name name="system_info" arity="1" clause_i="43" since="OTP R13B04"/> <!-- min_bin_vheap_size --> <name name="system_info" arity="1" clause_i="57" since=""/> <!-- procs --> <fsummary>Information about the default process heap settings.</fsummary> <type name="message_queue_data"/> @@ -8179,12 +8179,12 @@ Metadata = #{ pid => pid(), </func> <func> - <name name="system_info" arity="1" clause_i="6" anchor="system_info_limits" since="?"/> <!-- atom_count --> - <name name="system_info" arity="1" clause_i="7" since="?"/> <!-- atom_limit --> - <name name="system_info" arity="1" clause_i="29" since="?"/> <!-- ets_count --> - <name name="system_info" arity="1" clause_i="30" since="?"/> <!-- ets_limit --> - <name name="system_info" arity="1" clause_i="53" since="?"/> <!-- port_count --> - <name name="system_info" arity="1" clause_i="54" since="?"/> <!-- port_limit --> + <name name="system_info" arity="1" clause_i="6" anchor="system_info_limits" since="OTP 20.0"/> <!-- atom_count --> + <name name="system_info" arity="1" clause_i="7" since="OTP 20.0"/> <!-- atom_limit --> + <name name="system_info" arity="1" clause_i="29" since="OTP 21.1"/> <!-- ets_count --> + <name name="system_info" arity="1" clause_i="30" since="OTP R16B03"/> <!-- ets_limit --> + <name name="system_info" arity="1" clause_i="53" since="OTP R16B"/> <!-- port_count --> + <name name="system_info" arity="1" clause_i="54" since="OTP R16B"/> <!-- port_limit --> <name name="system_info" arity="1" clause_i="55" since=""/> <!-- process_count --> <name name="system_info" arity="1" clause_i="56" since=""/> <!-- process_limit --> <fsummary>Information about various system limits.</fsummary> @@ -8267,7 +8267,7 @@ Metadata = #{ pid => pid(), <name name="system_info" arity="1" clause_i="69" since="OTP 18.0"/> <!-- time_correction --> <name name="system_info" arity="1" clause_i="70" since="OTP 18.0"/> <!-- time_offset --> <name name="system_info" arity="1" clause_i="71" since="OTP 18.0"/> <!-- time_warp_mode --> - <name name="system_info" arity="1" clause_i="72" since="?"/> <!-- tolerant_timeofday --> + <name name="system_info" arity="1" clause_i="72" since="OTP 17.1"/> <!-- tolerant_timeofday --> <fsummary>Information about system time.</fsummary> <desc> <marker id="system_info_time_tags"/> @@ -8488,12 +8488,12 @@ Metadata = #{ pid => pid(), <func> <name name="system_info" arity="1" clause_i="17" - anchor="system_info_scheduler" since="?"/> <!-- dirty_cpu_schedulers --> - <name name="system_info" arity="1" clause_i="18" since="?"/> <!-- dirty_cpu_schedulers_online --> - <name name="system_info" arity="1" clause_i="19" since="?"/> <!-- dirty_io_schedulers --> + anchor="system_info_scheduler" since="OTP 17.0"/> <!-- dirty_cpu_schedulers --> + <name name="system_info" arity="1" clause_i="18" since="OTP 17.0"/> <!-- dirty_cpu_schedulers_online --> + <name name="system_info" arity="1" clause_i="19" since="OTP 17.0"/> <!-- dirty_io_schedulers --> <name name="system_info" arity="1" clause_i="45" since=""/> <!-- multi_scheduling --> <name name="system_info" arity="1" clause_i="46" since=""/> <!-- multi_scheduling_blockers --> - <name name="system_info" arity="1" clause_i="49" since="?"/> <!-- normal_multi_scheduling_blockers --> + <name name="system_info" arity="1" clause_i="49" since="OTP 19.0"/> <!-- normal_multi_scheduling_blockers --> <name name="system_info" arity="1" clause_i="58" since=""/> <!-- scheduler_bind_type --> <name name="system_info" arity="1" clause_i="59" since=""/> <!-- scheduler_bindings --> <name name="system_info" arity="1" clause_i="60" since=""/> <!-- scheduler_id --> @@ -8789,9 +8789,9 @@ Metadata = #{ pid => pid(), <func> <name name="system_info" arity="1" clause_i="14" anchor="system_info_dist" since=""/> <!-- creation --> - <name name="system_info" arity="1" clause_i="16" since="?"/> <!-- delayed_node_table_gc --> + <name name="system_info" arity="1" clause_i="16" since="OTP 18.0"/> <!-- delayed_node_table_gc --> <name name="system_info" arity="1" clause_i="20" since=""/> <!-- dist --> - <name name="system_info" arity="1" clause_i="21" since="?"/> <!-- dist_buf_busy_limit --> + <name name="system_info" arity="1" clause_i="21" since="OTP R14B01"/> <!-- dist_buf_busy_limit --> <name name="system_info" arity="1" clause_i="22" since=""/> <!-- dist_ctrl --> <fsummary>Information about erlang distribution.</fsummary> <desc> @@ -8866,7 +8866,7 @@ Metadata = #{ pid => pid(), <!-- <name name="system_info" arity="1" clause_i="6"/> atom_count --> <!-- <name name="system_info" arity="1" clause_i="7"/> atom_limit --> <name name="system_info" arity="1" clause_i="8" - anchor="system_info_misc" since="?"/> <!-- build_type --> + anchor="system_info_misc" since="OTP R14B"/> <!-- build_type --> <name name="system_info" arity="1" clause_i="9" since=""/> <!-- c_compiler_used --> <name name="system_info" arity="1" clause_i="10" since=""/> <!-- check_io --> <name name="system_info" arity="1" clause_i="11" since=""/> <!-- compat_rel --> @@ -8882,8 +8882,8 @@ Metadata = #{ pid => pid(), <!-- <name name="system_info" arity="1" clause_i="21"/> dist_buf_busy_limit --> <!-- <name name="system_info" arity="1" clause_i="22"/> dist_ctrl --> <name name="system_info" arity="1" clause_i="23" since=""/> <!-- driver_version --> - <name name="system_info" arity="1" clause_i="24" since="?"/> <!-- dynamic_trace --> - <name name="system_info" arity="1" clause_i="25" since="?"/> <!-- dynamic_trace_probes --> + <name name="system_info" arity="1" clause_i="24" since="OTP R15B01"/> <!-- dynamic_trace --> + <name name="system_info" arity="1" clause_i="25" since="OTP R15B01"/> <!-- dynamic_trace_probes --> <!-- <name name="system_info" arity="1" clause_i="26"/> end_time --> <!-- <name name="system_info" arity="1" clause_i="27"/> elib_malloc --> <!-- <name name="system_info" arity="1" clause_i="28"/> eager_check_io, removed --> @@ -8905,12 +8905,12 @@ Metadata = #{ pid => pid(), <name name="system_info" arity="1" clause_i="44" since=""/> <!-- modified_timing_level --> <!-- <name name="system_info" arity="1" clause_i="45"/> multi_scheduling --> <!-- <name name="system_info" arity="1" clause_i="46"/> multi_scheduling_blockers --> - <name name="system_info" arity="1" clause_i="47" since="?"/> <!-- nif_version --> + <name name="system_info" arity="1" clause_i="47" since="OTP 17.4"/> <!-- nif_version --> <!-- n<name name="system_info" arity="1" clause_i="48"/> ormal_multi_scheduling_blockers --> <name name="system_info" arity="1" clause_i="49" since=""/> <!-- otp_release --> <!-- <name name="system_info" arity="1" clause_i="50"/> os_monotonic_time_source --> <!-- <name name="system_info" arity="1" clause_i="51"/> os_system_time_source --> - <name name="system_info" arity="1" clause_i="52" since="?"/> <!-- port_parallelism --> + <name name="system_info" arity="1" clause_i="52" since="OTP R16B"/> <!-- port_parallelism --> <!-- <name name="system_info" arity="1" clause_i="53"/> port_count --> <!-- <name name="system_info" arity="1" clause_i="54"/> port_limit --> <!-- <name name="system_info" arity="1" clause_i="55"/> process_count --> @@ -10537,7 +10537,7 @@ timestamp() -> </func> <func> - <name name="trace_pattern" arity="3" clause_i="1" since="?"/> + <name name="trace_pattern" arity="3" clause_i="1" since="OTP 19.0"/> <fsummary>Set trace pattern for message sending.</fsummary> <type name="trace_match_spec"/> <type name="match_variable"/> @@ -10608,7 +10608,7 @@ timestamp() -> </func> <func> - <name name="trace_pattern" arity="3" clause_i="2" since="?"/> + <name name="trace_pattern" arity="3" clause_i="2" since="OTP 19.0"/> <fsummary>Set trace pattern for tracing of message receiving.</fsummary> <type name="trace_match_spec"/> <type name="match_variable"/> diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 19093ebfdd..dca502c939 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -340,6 +340,7 @@ erts_sched_stat_t erts_sched_stat; static erts_tsd_key_t ERTS_WRITE_UNLIKELY(sched_data_key); #if ERTS_POLL_USE_SCHEDULER_POLLING +static erts_atomic32_t function_calls; static erts_atomic32_t doing_sys_schedule; #endif static erts_atomic32_t no_empty_run_queues; @@ -3247,6 +3248,7 @@ poll_thread(void *arg) static ERTS_INLINE void clear_sys_scheduling(void) { + erts_atomic32_set_relb(&function_calls, 0); erts_atomic32_set_mb(&doing_sys_schedule, 0); } @@ -3269,28 +3271,6 @@ prepare_for_sys_schedule(void) return 0; } -static void -check_io_timer(void *null) -{ - ErtsSchedulerData *esdp = erts_get_scheduler_data(); - if (prepare_for_sys_schedule()) { - erts_check_io(esdp->ssi->psi, ERTS_POLL_NO_TIMEOUT); - clear_sys_scheduling(); - } - - /* The timer is cleared if this schedulers run-queue became empty - or if the CHECKIO flag was cleared. The CHECKIO flags is cleared - when a check_balance assigns another scheduler to be the poller in - the overload scenario. */ - if ((ERTS_RUNQ_FLGS_GET_NOB(esdp->run_queue) & (ERTS_RUNQ_FLG_OUT_OF_WORK|ERTS_RUNQ_FLG_CHECKIO)) - == ERTS_RUNQ_FLG_CHECKIO) { - erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, - check_io_timer, NULL); - } else { - ERTS_RUNQ_FLGS_UNSET(esdp->run_queue, ERTS_RUNQ_FLG_CHECKIO); - } -} - #else #define clear_sys_scheduling() #define prepare_for_sys_schedule() 0 @@ -3448,6 +3428,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) current_time = erts_get_monotonic_time(esdp); } } + *fcalls = 0; clear_sys_scheduling(); } else { if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { @@ -4711,15 +4692,6 @@ check_balance(ErtsRunQueue *c_rq) if (blnc_no_rqs == 1) { c_rq->check_balance_reds = INT_MAX; erts_atomic32_set_nob(&balance_info.checking_balance, 0); -#if ERTS_POLL_USE_SCHEDULER_POLLING - c_rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; - if ((ERTS_RUNQ_FLGS_GET_NOB(c_rq) & (ERTS_RUNQ_FLG_OUT_OF_WORK|ERTS_RUNQ_FLG_CHECKIO)) - == 0) { - ERTS_RUNQ_FLGS_SET(c_rq, ERTS_RUNQ_FLG_CHECKIO); - erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, check_io_timer, NULL); - } - ERTS_RUNQ_FLGS_UNSET(c_rq, ERTS_RUNQ_FLGS_MIGRATION_INFO); -#endif return; } @@ -5239,19 +5211,6 @@ erts_fprintf(stderr, "--------------------------------\n"); /* Publish new migration paths... */ erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths); -#if ERTS_POLL_USE_SCHEDULER_POLLING - if (full_scheds == current_active) { - ERTS_ASSERT(full_scheds <= current_active); - /* All active schedulers ran for full, we need to do active polling, - so we setup a timer that does active polling */ - if (!(ERTS_RUNQ_FLGS_GET_NOB(c_rq) & ERTS_RUNQ_FLG_CHECKIO)) { - /* Active polling is not running, start it */ - erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, check_io_timer, NULL); - } - run_queue_info[c_rq->ix].flags |= ERTS_RUNQ_FLG_CHECKIO; - } -#endif - /* Reset balance statistics in all online queues */ for (qix = 0; qix < blnc_no_rqs; qix++) { Uint32 flags = run_queue_info[qix].flags; @@ -5261,8 +5220,6 @@ erts_fprintf(stderr, "--------------------------------\n"); ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK)); if (rq->waiting) flags |= ERTS_RUNQ_FLG_OUT_OF_WORK; - if (rq != c_rq) - flags &= ~ERTS_RUNQ_FLG_CHECKIO; rq->full_reds_history_sum = run_queue_info[qix].full_reds_history_sum; @@ -5272,7 +5229,7 @@ erts_fprintf(stderr, "--------------------------------\n"); ERTS_DBG_CHK_FULL_REDS_HISTORY(rq); rq->out_of_work_count = 0; - (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO|ERTS_RUNQ_FLG_CHECKIO, flags); + (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags); rq->max_len = erts_atomic32_read_dirty(&rq->len); for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { ErtsRunQueueInfo *rqi; @@ -5923,6 +5880,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online, int no_poll_th erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS, size_runqs); #if ERTS_POLL_USE_SCHEDULER_POLLING erts_atomic32_init_nob(&doing_sys_schedule, 0); + erts_atomic32_init_nob(&function_calls, 0); #endif erts_atomic32_init_nob(&no_empty_run_queues, 0); @@ -9277,7 +9235,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) Process *proxy_p = NULL; ErtsRunQueue *rq; int context_reds; - int fcalls; + int fcalls = 0; int actual_reds; int reds; Uint32 flags; @@ -9351,6 +9309,10 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) reds = ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST; esdp->virtual_reds = 0; +#if ERTS_POLL_USE_SCHEDULER_POLLING + fcalls = (int) erts_atomic32_add_read_acqb(&function_calls, reds); +#endif + ASSERT(esdp && esdp == erts_get_scheduler_data()); rq = erts_get_runq_current(esdp); @@ -9567,7 +9529,33 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) non_empty_runq(rq); goto check_activities_to_run; - } + } else if (is_normal_sched && + fcalls > (2 * context_reds) && + prepare_for_sys_schedule()) { + ErtsMonotonicTime current_time; + /* + * Schedule system-level activities. + */ + + ERTS_MSACC_PUSH_STATE_CACHED_M(); + + erts_runq_unlock(rq); + + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO); + LTTNG2(scheduler_poll, esdp->no, 1); + + erts_check_io(esdp->ssi->psi, ERTS_POLL_NO_TIMEOUT); + ERTS_MSACC_POP_STATE_M(); + + current_time = erts_get_monotonic_time(esdp); + if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) + erts_bump_timers(esdp->timer_wheel, current_time); + + erts_runq_lock(rq); + fcalls = 0; + clear_sys_scheduling(); + goto continue_check_activities_to_run; + } if (flags & ERTS_RUNQ_FLG_MISC_OP) exec_misc_ops(rq); diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 0aa19e7bde..43937f216c 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -173,8 +173,6 @@ extern int erts_dio_sched_thread_suggested_stack_size; (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 9)) #define ERTS_RUNQ_FLG_HALTING \ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 10)) -#define ERTS_RUNQ_FLG_CHECKIO \ - (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 11)) #define ERTS_RUNQ_FLG_MAX (ERTS_RUNQ_FLG_BASE2 + 12) diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c index 701fb38147..ae7084b7f4 100644 --- a/erts/emulator/beam/erl_trace.c +++ b/erts/emulator/beam/erl_trace.c @@ -72,7 +72,7 @@ static ErtsTracer default_port_tracer; static Eterm system_monitor; static Eterm system_profile; -static erts_aint_t system_logger; +static erts_atomic_t system_logger; #ifdef HAVE_ERTS_NOW_CPU int erts_cpu_timestamp; diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index b44464d6da..ed687b8d70 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -12960,38 +12960,40 @@ make_noninheritable_handle(SOCKET s) static void fire_multi_timers(tcp_descriptor *desc, ErlDrvPort port, ErlDrvData data) { - ErlDrvTime next_timeout; - MultiTimerData *curr = desc->mtd; - if (!curr) { - ASSERT(0); - return; + ErlDrvTime next_timeout = 0; + if (!desc->mtd) { + ASSERT(0); + return; } #ifdef DEBUG { ErlDrvTime chk = erl_drv_monotonic_time(ERL_DRV_MSEC); - ASSERT(chk >= curr->when); + ASSERT(chk >= desc->mtd->when); } #endif do { - MultiTimerData *save = curr; + MultiTimerData save = *desc->mtd; - (*(save->timeout_function))(data,save->caller); + /* We first remove the timer so that the timeout_functions has + can call clean_multi_timers without breaking anything */ + if (desc->mtd_cache == NULL) { + desc->mtd_cache = desc->mtd; + } else { + FREE(desc->mtd); + } - curr = curr->next; + desc->mtd = save.next; + if (desc->mtd != NULL) + desc->mtd->prev = NULL; - if (desc->mtd_cache == NULL) - desc->mtd_cache = save; - else - FREE(save); + (*(save.timeout_function))(data,save.caller); - if (curr == NULL) { - desc->mtd = NULL; + if (desc->mtd == NULL) return; - } - curr->prev = NULL; - next_timeout = curr->when - erl_drv_monotonic_time(ERL_DRV_MSEC); + + next_timeout = desc->mtd->when - erl_drv_monotonic_time(ERL_DRV_MSEC); } while (next_timeout <= 0); - desc->mtd = curr; + driver_set_timer(port, (unsigned long) next_timeout); } diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index c681fa481f..ac9a070bce 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -2262,14 +2262,14 @@ erts_check_io_info(void *proc) #if ERTS_POLL_USE_FALLBACK erts_poll_info_flbk(get_fallback_pollset(), &piv[0]); - piv[0].poll_threads = 1; + piv[0].poll_threads = 0; piv[0].active_fds = 0; piv++; #endif #if ERTS_POLL_USE_SCHEDULER_POLLING erts_poll_info(get_scheduler_pollset(0), &piv[0]); - piv[0].poll_threads = 1; + piv[0].poll_threads = 0; piv[0].active_fds = 0; piv++; #endif diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c index 51d50933ff..27ffba58bd 100644 --- a/erts/emulator/sys/common/erl_poll.c +++ b/erts/emulator/sys/common/erl_poll.c @@ -2326,6 +2326,7 @@ uint32_t epoll_events(int kp_fd, int fd) { /* For epoll we read the information about what is selected upon from the proc fs.*/ char fname[30]; + char s[256]; FILE *f; unsigned int pos, flags, mnt_id; int line = 0; @@ -2343,12 +2344,12 @@ uint32_t epoll_events(int kp_fd, int fd) } if (fscanf(f,"\nmnt_id:\t%x\n", &mnt_id)); line += 3; - while (!feof(f)) { + while (fgets(s, sizeof(s) / sizeof(*s), f)) { /* tfd: 10 events: 40000019 data: 180000000a */ int ev_fd; uint32_t events; uint64_t data; - if (fscanf(f,"tfd:%d events:%x data:%llx\n", &ev_fd, &events, + if (sscanf(s,"tfd:%d events:%x data:%llx", &ev_fd, &events, (unsigned long long*)&data) != 3) { fprintf(stderr,"failed to parse file %s on line %d, errno = %d\n", fname, line, @@ -2392,6 +2393,7 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet *ps, /* For epoll we read the information about what is selected upon from the proc fs.*/ char fname[30]; + char s[256]; FILE *f; unsigned int pos, flags, mnt_id; int line = 0; @@ -2410,12 +2412,12 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet *ps, } if (fscanf(f,"\nmnt_id:\t%x\n", &mnt_id)); line += 3; - while (!feof(f)) { + while (fgets(s, sizeof(s) / sizeof(*s), f)) { /* tfd: 10 events: 40000019 data: 180000000a */ int fd; uint32_t events; uint64_t data; - if (fscanf(f,"tfd:%d events:%x data:%llx\n", &fd, &events, + if (sscanf(s,"tfd:%d events:%x data:%llx", &fd, &events, (unsigned long long*)&data) != 3) { fprintf(stderr,"failed to parse file %s on line %d, errno = %d\n", fname, line, errno); diff --git a/erts/emulator/sys/unix/sys_drivers.c b/erts/emulator/sys/unix/sys_drivers.c index 816bdea9c5..2f5459bee5 100644 --- a/erts/emulator/sys/unix/sys_drivers.c +++ b/erts/emulator/sys/unix/sys_drivers.c @@ -998,9 +998,9 @@ static void clear_fd_data(ErtsSysFdData *fdd) fdd->psz = 0; } -static void nbio_stop_fd(ErlDrvPort prt, ErtsSysFdData *fdd) +static void nbio_stop_fd(ErlDrvPort prt, ErtsSysFdData *fdd, int use) { - driver_select(prt, abs(fdd->fd), ERL_DRV_USE_NO_CALLBACK|DO_READ|DO_WRITE, 0); + driver_select(prt, abs(fdd->fd), use ? ERL_DRV_USE_NO_CALLBACK : 0|DO_READ|DO_WRITE, 0); clear_fd_data(fdd); SET_BLOCKING(abs(fdd->fd)); @@ -1020,11 +1020,11 @@ static void fd_stop(ErlDrvData ev) /* Does not close the fds */ if (dd->ifd) { sz += sizeof(ErtsSysFdData); - nbio_stop_fd(prt, dd->ifd); + nbio_stop_fd(prt, dd->ifd, 1); } if (dd->ofd && dd->ofd != dd->ifd) { sz += sizeof(ErtsSysFdData); - nbio_stop_fd(prt, dd->ofd); + nbio_stop_fd(prt, dd->ofd, 1); } erts_free(ERTS_ALC_T_DRV_TAB, dd); @@ -1070,12 +1070,12 @@ static void stop(ErlDrvData ev) ErlDrvPort prt = dd->port_num; if (dd->ifd) { - nbio_stop_fd(prt, dd->ifd); + nbio_stop_fd(prt, dd->ifd, 0); driver_select(prt, abs(dd->ifd->fd), ERL_DRV_USE, 0); /* close(ifd); */ } if (dd->ofd && dd->ofd != dd->ifd) { - nbio_stop_fd(prt, dd->ofd); + nbio_stop_fd(prt, dd->ofd, 0); driver_select(prt, abs(dd->ofd->fd), ERL_DRV_USE, 0); /* close(ofd); */ } diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl index bd62708aa7..1d2ae4fb51 100644 --- a/erts/emulator/test/driver_SUITE.erl +++ b/erts/emulator/test/driver_SUITE.erl @@ -1069,14 +1069,19 @@ get_stable_check_io_info(N) -> get_check_io_total(ChkIo) -> ct:log("ChkIo = ~p~n",[ChkIo]), {Fallback, Rest} = get_fallback(ChkIo), + OnlyPollThreads = [PS || PS <- Rest, not is_scheduler_pollset(PS)], add_fallback_infos(Fallback, - lists:foldl(fun(Pollset, Acc) -> - lists:zipwith(fun(A, B) -> - add_pollset_infos(A,B) - end, - Pollset, Acc) - end, - hd(Rest), tl(Rest))). + lists:foldl( + fun(Pollset, Acc) -> + lists:zipwith(fun(A, B) -> + add_pollset_infos(A,B) + end, + Pollset, Acc) + end, + hd(OnlyPollThreads), tl(OnlyPollThreads))). + +is_scheduler_pollset(Pollset) -> + proplists:get_value(poll_threads, Pollset) == 0. add_pollset_infos({Tag, A}=TA , {Tag, B}=TB) -> case tag_type(Tag) of diff --git a/erts/emulator/test/scheduler_SUITE.erl b/erts/emulator/test/scheduler_SUITE.erl index 2e0dfa42f3..f61949c75b 100644 --- a/erts/emulator/test/scheduler_SUITE.erl +++ b/erts/emulator/test/scheduler_SUITE.erl @@ -1450,28 +1450,26 @@ poll_threads(Config) when is_list(Config) -> {Conc, PollType, KP} = get_ioconfig(Config), {Sched, SchedOnln, _} = get_sstate(Config, ""), + [1, 1] = get_ionum(Config,"+IOt 2 +IOp 2"), + [1, 1, 1, 1, 1] = get_ionum(Config,"+IOt 5 +IOp 5"), + [1, 1] = get_ionum(Config, "+S 2 +IOPt 100 +IOPp 100"), + if Conc -> - [1, 1, 1] = get_ionum(Config,"+IOt 2 +IOp 2"), - [1, 1, 1, 1, 1, 1] = get_ionum(Config,"+IOt 5 +IOp 5"), - [1, 1, 1] = get_ionum(Config, "+S 2 +IOPt 100 +IOPp 100"), - [5, 1] = get_ionum(Config,"+IOt 5 +IOp 1"), - [3, 2, 1] = get_ionum(Config,"+IOt 5 +IOp 2"), - [2, 2, 2, 2, 2, 1] = get_ionum(Config,"+IOt 10 +IOPp 50"), + [5] = get_ionum(Config,"+IOt 5 +IOp 1"), + [3, 2] = get_ionum(Config,"+IOt 5 +IOp 2"), + [2, 2, 2, 2, 2] = get_ionum(Config,"+IOt 10 +IOPp 50"), - [2, 1] = get_ionum(Config, "+S 2 +IOPt 100"), - [4, 1] = get_ionum(Config, "+S 4 +IOPt 100"), - [4, 1] = get_ionum(Config, "+S 4:2 +IOPt 100"), - [4, 4, 1] = get_ionum(Config, "+S 8 +IOPt 100 +IOPp 25"), + [2] = get_ionum(Config, "+S 2 +IOPt 100"), + [4] = get_ionum(Config, "+S 4 +IOPt 100"), + [4] = get_ionum(Config, "+S 4:2 +IOPt 100"), + [4, 4] = get_ionum(Config, "+S 8 +IOPt 100 +IOPp 25"), fail = get_ionum(Config, "+IOt 1 +IOp 2"), ok; not Conc -> - [1, 1] = get_ionum(Config,"+IOt 2 +IOp 2"), - [1, 1, 1, 1, 1] = get_ionum(Config,"+IOt 5 +IOp 5"), - [1, 1] = get_ionum(Config, "+S 2 +IOPt 100 +IOPp 100"), [1, 1, 1, 1, 1] = get_ionum(Config,"+IOt 5 +IOp 1"), [1, 1, 1, 1, 1] = get_ionum(Config,"+IOt 5 +IOp 2"), @@ -1515,7 +1513,8 @@ get_iostate(Config, Cmd)-> erlang:system_info(check_io) end]), IO = [IOState || IOState <- IOStates, - proplists:get_value(fallback, IOState) == false], + proplists:get_value(fallback, IOState) == false, + proplists:get_value(poll_threads, IOState) /= 0], stop_node(Node), IO; {error,timeout} -> diff --git a/erts/emulator/test/z_SUITE.erl b/erts/emulator/test/z_SUITE.erl index 1c52e1a934..6549108126 100644 --- a/erts/emulator/test/z_SUITE.erl +++ b/erts/emulator/test/z_SUITE.erl @@ -251,7 +251,7 @@ pollset_size(Config) when is_list(Config) -> end. check_io_debug(Config) when is_list(Config) -> - case lists:keysearch(name, 1, erlang:system_info(check_io)) of + case lists:keysearch(name, 1, hd(erlang:system_info(check_io))) of {value, {name, erts_poll}} -> check_io_debug_test(); _ -> {skipped, "Not implemented in this emulator"} end. diff --git a/lib/common_test/doc/src/ct_netconfc.xml b/lib/common_test/doc/src/ct_netconfc.xml index 32a1175d81..8fbe5f3df6 100644 --- a/lib/common_test/doc/src/ct_netconfc.xml +++ b/lib/common_test/doc/src/ct_netconfc.xml @@ -412,11 +412,11 @@ </func> <func> - <name since="OTP 18.3">create_subscription(Client) -> Result</name> - <name since="OTP 18.3">create_subscription(Client, Stream) -> Result</name> - <name since="OTP 18.3">create_subscription(Client, Stream, Filter) -> Result</name> - <name since="OTP 18.3">create_subscription(Client, Stream, Filter, Timeout) -> Result</name> - <name name="create_subscription" arity="5" clause_i="2" since="OTP 18.3"/> + <name since="OTP R15B02">create_subscription(Client) -> Result</name> + <name since="OTP R15B02">create_subscription(Client, Stream) -> Result</name> + <name since="OTP R15B02">create_subscription(Client, Stream, Filter) -> Result</name> + <name since="OTP R15B02">create_subscription(Client, Stream, Filter, Timeout) -> Result</name> + <name name="create_subscription" arity="5" clause_i="2" since="OTP R15B02"/> <name name="create_subscription" arity="6" since="OTP R15B02"/> <fsummary>Creates a subscription for event notifications.</fsummary> <desc> @@ -515,7 +515,7 @@ create_subscription(Client, Stream, Filter, StartTime, StopTime, Timeout)</pre> <func> <name name="edit_config" arity="3" since="OTP R15B02"/> - <name name="edit_config" arity="4" clause_i="1" since="OTP R15B02"/> + <name name="edit_config" arity="4" clause_i="1" since="OTP 18.0"/> <name name="edit_config" arity="4" clause_i="2" since="OTP R15B02"/> <name name="edit_config" arity="5" since="OTP 18.0"/> <fsummary>Edits configuration data.</fsummary> @@ -599,7 +599,7 @@ create_subscription(Client, Stream, Filter, StartTime, StopTime, Timeout)</pre> <func> <name name="get_event_streams" arity="1" since="OTP 20.0"/> <name name="get_event_streams" arity="2" clause_i="1" since="OTP R15B02"/> - <name name="get_event_streams" arity="2" clause_i="2" since="OTP R15B02"/> + <name name="get_event_streams" arity="2" clause_i="2" since="OTP 20.0"/> <name name="get_event_streams" arity="3" since="OTP R15B02"/> <fsummary>Sends a request to get the specified event streams.</fsummary> <desc> diff --git a/lib/common_test/test_server/ts_erl_config.erl b/lib/common_test/test_server/ts_erl_config.erl index 537628e39a..f3972bea4e 100644 --- a/lib/common_test/test_server/ts_erl_config.erl +++ b/lib/common_test/test_server/ts_erl_config.erl @@ -208,7 +208,11 @@ erl_interface(Vars,OsType) -> {filename:join(Dir, "lib"), filename:join([Dir, "src", "eidefs.mk"])}; {srctree, _Root, Target} -> - {filename:join([Dir, "obj", Target]), + Obj = case is_debug_build() of + true -> "obj.debug"; + false -> "obj" + end, + {filename:join([Dir, Obj, Target]), filename:join([Dir, "src", Target, "eidefs.mk"])} end} end, diff --git a/lib/erl_interface/configure.in b/lib/erl_interface/configure.in index a155ceef7e..747750c1fb 100644 --- a/lib/erl_interface/configure.in +++ b/lib/erl_interface/configure.in @@ -82,6 +82,15 @@ AC_ARG_ENABLE(threads, esac ], [ threads_disabled=maybe ]) +AC_ARG_ENABLE(mask-real-errno, +[ --disable-mask-real-errno do not mask real 'errno'], +[ case "$enableval" in + no) mask_real_errno=no ;; + *) mask_real_errno=yes ;; + esac ], +[ mask_real_errno=yes ]) + + dnl ---------------------------------------------------------------------- dnl Checks for programs dnl ---------------------------------------------------------------------- @@ -100,6 +109,10 @@ AC_CHECK_SIZEOF(long) AC_CHECK_SIZEOF(void *) AC_CHECK_SIZEOF(long long) +if test $mask_real_errno = yes; then + AC_DEFINE(EI_HIDE_REAL_ERRNO, 1, [Define if 'errno' should not be exposed as is in 'erl_errno']) +fi + dnl We set EI_64BIT mode when long is 8 bytes, this makes things dnl work on windows and unix correctly if test $ac_cv_sizeof_long = 8; then @@ -158,7 +171,7 @@ AC_CHECK_LIB([socket], [getpeername]) # Checks for header files. AC_HEADER_STDC AC_HEADER_SYS_WAIT -AC_CHECK_HEADERS([arpa/inet.h fcntl.h limits.h malloc.h netdb.h netinet/in.h stddef.h stdlib.h string.h sys/param.h sys/socket.h sys/select.h sys/time.h unistd.h sys/types.h]) +AC_CHECK_HEADERS([arpa/inet.h fcntl.h limits.h malloc.h netdb.h netinet/in.h stddef.h stdlib.h string.h sys/param.h sys/socket.h sys/select.h sys/time.h unistd.h sys/types.h sys/uio.h]) # Checks for typedefs, structures, and compiler characteristics. # fixme AC_C_CONST & AC_C_VOLATILE needed for Windows? @@ -193,7 +206,7 @@ AC_CHECK_FUNCS([dup2 gethostbyaddr gethostbyname \ gethostbyaddr_r \ gethostbyname_r gethostname writev \ gethrtime gettimeofday inet_ntoa memchr memmove memset select \ - socket strchr strerror strrchr strstr uname]) + socket strchr strerror strrchr strstr uname sysconf]) AC_CHECK_FUNC(res_gethostbyname, [], AC_CHECK_LIB(resolv, res_gethostbyname) ) @@ -255,6 +268,7 @@ AC_SUBST(EI_THREADS) case "$threads_disabled" in no|maybe) LM_CHECK_THR_LIB + ETHR_CHK_GCC_ATOMIC_OPS([]) case "$THR_LIB_NAME" in "") @@ -268,7 +282,7 @@ case "$threads_disabled" in EI_THREADS="true" THR_DEFS="$THR_DEFS -D_WIN32_WINNT=0x0600 -DWINVER=0x0600" ;; - pthread) + pthread) EI_THREADS="true" ;; *) diff --git a/lib/erl_interface/doc/src/ei_connect.xml b/lib/erl_interface/doc/src/ei_connect.xml index 6f16c0652e..e318dd6664 100644 --- a/lib/erl_interface/doc/src/ei_connect.xml +++ b/lib/erl_interface/doc/src/ei_connect.xml @@ -85,6 +85,273 @@ the <c>_tmo</c> suffix.</p> </section> + <section> + <marker id="ussi"/> + <title>User Supplied Socket Implementation</title> + <p>By default <c>ei</c> supplies a TCP/IPv4 socket interface + that is used when communicating. The user can however plug in + his/her own IPv4 socket implementation. This, for example, in order + to communicate over TLS. A user supplied socket implementation + is plugged in by passing a + <seealso marker="#ei_socket_callbacks">callback structure</seealso> + to either + <seealso marker="#ei_connect_init"><c>ei_connect_init_ussi()</c></seealso> + or + <seealso marker="#ei_connect_init"><c>ei_connect_xinit_ussi()</c></seealso>.</p> + + <p>All callbacks in the <c>ei_socket_callbacks</c> structure + <em>should</em> return zero on success; and a posix error + code on failure.</p> + + <p>The <c>addr</c> argument of the <c>listen</c>, <c>accept</c>, + and <c>connect</c> callbacks refer to appropriate address + structure for currently used protocol. Currently <c>ei</c> + only supports IPv4. That is, at this time <c>addr</c> always + points to a <c>struct sockaddr_in</c> structure.</p> + + <p>The <c>ei_socket_callbacks</c> structure may be enlarged in + the future. All fields not set, <em>needs</em> to be zeroed out.</p> + + <marker id="ei_socket_callbacks"/> + <code type="none"><![CDATA[ +typedef struct { + int flags; + int (*socket)(void **ctx, void *setup_ctx); + int (*close)(void *ctx); + int (*listen)(void *ctx, void *addr, int *len, int backlog); + int (*accept)(void **ctx, void *addr, int *len, unsigned tmo); + int (*connect)(void *ctx, void *addr, int len, unsigned tmo); + int (*writev)(void *ctx, const void *iov, int iovcnt, ssize_t *len, unsigned tmo); + int (*write)(void *ctx, const char *buf, ssize_t *len, unsigned tmo); + int (*read)(void *ctx, char *buf, ssize_t *len, unsigned tmo); + int (*handshake_packet_header_size)(void *ctx, int *sz); + int (*connect_handshake_complete)(void *ctx); + int (*accept_handshake_complete)(void *ctx); + int (*get_fd)(void *ctx, int *fd); +} ei_socket_callbacks; + ]]></code> + + <taglist> + + <tag><c>flags</c></tag> + <item> + <p>Flags informing <c>ei</c> about the behaviour of the + callbacks. Flags should be bitwise or:ed together. If no flag, + is set, the <c>flags</c> field should contain <c>0</c>. Currently, + supported flags:</p> + <taglist> + <tag><c>EI_SCLBK_FLG_FULL_IMPL</c></tag> + <item> + <p> + If set, the <c>accept()</c>, <c>connect()</c>, + <c>writev()</c>, <c>write()</c>, and <c>read()</c> callbacks + implements timeouts. The timeout is passed in the <c>tmo</c> + argument and is given in milli seconds. Note that the + <c>tmo</c> argument to these callbacks differ from the + timeout arguments in the <c>ei</c> API. Zero means a zero + timeout. That is, poll and timeout immediately unless the + operation is successful. <c>EI_SCLBK_INF_TMO</c> + (max <c>unsigned</c>) means infinite timeout. The file + descriptor is in blocking mode when a callback is called, + and it must be in blocking mode when the callback returns. + </p> + <p> + If not set, <c>ei</c> will implement the timeout using + <c>select()</c> in order to determine when to call the + callbacks and when to time out. The <c>tmo</c> arguments + of the <c>accept()</c>, <c>connect()</c>, <c>writev()</c>, + <c>write()</c>, and <c>read()</c> callbacks should be + ignored. The callbacks may be called in non-blocking mode. + The callbacks are not allowed to change between blocking + and non-blocking mode. In order for this to work, + <c>select()</c> needs to interact with the socket primitives + used the same way as it interacts with the ordinary socket + primitives. If this is not the case, the callbacks + <em>need</em> to implement timeouts and this flag should + be set. + </p> + </item> + </taglist> + <p>More flags may be introduced in the future.</p> + </item> + + <tag><c>int (*socket)(void **ctx, void *setup_ctx)</c></tag> + <item> + <p>Create a socket and a context for the socket.</p> + + <p>On success it should set <c>*ctx</c> to point to a context for + the created socket. This context will be passed to all other + socket callbacks. This function will be passed the same + <c>setup_context</c> as passed to the preceeding + <seealso marker="#ei_connect_init"><c>ei_connect_init_ussi()</c></seealso> + or + <seealso marker="#ei_connect_init"><c>ei_connect_xinit_ussi()</c></seealso> + call.</p> + + <note><p>During the lifetime of a socket, the pointer <c>*ctx</c> + <em>has</em> to remain the same. That is, it cannot later be + relocated.</p></note> + + <p>This callback is mandatory.</p> + </item> + + <tag><c>int (*close)(void *ctx)</c></tag> + <item> + <p>Close the socket identified by <c>ctx</c> and destroy the context.</p> + + <p>This callback is mandatory.</p> + </item> + + <tag><c>int (*listen)(void *ctx, void *addr, int *len, int backlog)</c></tag> + <item> + <p>Bind the socket identified by <c>ctx</c> to a local interface + and then listen on it.</p> + + <p>The <c>addr</c> and <c>len</c> arguments are both input and output + arguments. When called <c>addr</c> points to an address structure of + lenght <c>*len</c> containing information on how to bind the socket. + Uppon return this callback should have updated the structure referred + by <c>addr</c> with information on how the socket actually was bound. + <c>*len</c> should be updated to reflect the size of <c>*addr</c> + updated. <c>backlog</c> identifies the size of the backlog for the + listen socket.</p> + + <p>This callback is mandatory.</p> + </item> + + <tag><c>int (*accept)(void **ctx, void *addr, int *len, unsigned tmo)</c></tag> + <item> + <p>Accept connections on the listen socket identified by + <c>*ctx</c>.</p> + + <p>When a connection is accepted, a new context for the accepted + connection should be created and <c>*ctx</c> should be updated + to point to the new context for the accepted connection. When + called <c>addr</c> points to an uninitialized address structure + of lenght <c>*len</c>. Uppon return this callback should have + updated this structure with information about the client address. + <c>*len</c> should be updated to reflect the size of <c>*addr</c> + updated. + </p> + + <p>If the <c>EI_SCLBK_FLG_FULL_IMPL</c> flag has been set, + <c>tmo</c> contains timeout time in milliseconds.</p> + + <note><p>During the lifetime of a socket, the pointer <c>*ctx</c> + <em>has</em> to remain the same. That is, it cannot later be + relocated.</p></note> + + <p>This callback is mandatory.</p> + </item> + + <tag><c>int (*connect)(void *ctx, void *addr, int len, unsigned tmo)</c></tag> + <item> + <p>Connect the socket identified by <c>ctx</c> to the address + identified by <c>addr</c>.</p> + + <p>When called <c>addr</c> points to an address structure of + lenght <c>len</c> containing information on where to connect.</p> + + <p>If the <c>EI_SCLBK_FLG_FULL_IMPL</c> flag has been set, + <c>tmo</c> contains timeout time in milliseconds.</p> + + <p>This callback is mandatory.</p> + </item> + + <tag><c>int (*writev)(void *ctx, const void *iov, long iovcnt, ssize_t *len, unsigned tmo)</c></tag> + <item> + <p>Write data on the connected socket identified by <c>ctx</c>.</p> + + <p><c>iov</c> points to an array of <c>struct iovec</c> structures of + length <c>iovcnt</c> containing data to write to the socket. On success, + this callback should set <c>*len</c> to the amount of bytes successfully + written on the socket.</p> + + <p>If the <c>EI_SCLBK_FLG_FULL_IMPL</c> flag has been set, + <c>tmo</c> contains timeout time in milliseconds.</p> + + <p>This callback is optional. Set the <c>writev</c> field + in the the <c>ei_socket_callbacks</c> structure to <c>NULL</c> if not + implemented.</p> + </item> + + <tag><c>int (*write)(void *ctx, const char *buf, ssize_t *len, unsigned tmo)</c></tag> + <item> + <p>Write data on the connected socket identified by <c>ctx</c>.</p> + + <p>When called <c>buf</c> points to a buffer of length <c>*len</c> + containing the data to write on the socket. On success, this callback + should set <c>*len</c> to the amount of bytes successfully written on + the socket.</p> + + <p>If the <c>EI_SCLBK_FLG_FULL_IMPL</c> flag has been set, + <c>tmo</c> contains timeout time in milliseconds.</p> + + <p>This callback is mandatory.</p> + </item> + + <tag><c>int (*read)(void *ctx, char *buf, ssize_t *len, unsigned tmo)</c></tag> + <item> + <p>Read data on the connected socket identified by <c>ctx</c>.</p> + + <p><c>buf</c> points to a buffer of length <c>*len</c> where the + read data should be placed. On success, this callback should update + <c>*len</c> to the amount of bytes successfully read on the socket.</p> + + <p>If the <c>EI_SCLBK_FLG_FULL_IMPL</c> flag has been set, + <c>tmo</c> contains timeout time in milliseconds.</p> + + <p>This callback is mandatory.</p> + </item> + + <tag><c>int (*handshake_packet_header_size)(void *ctx, int *sz)</c></tag> + <item> + <p>Inform about handshake packet header size to use during the Erlang + distribution handshake.</p> + + <p>On success, <c>*sz</c> should be set to the handshake packet header + size to use. Valid values are <c>2</c> and <c>4</c>. Erlang TCP + distribution use a handshake packet size of <c>2</c> and Erlang TLS + distribution use a handshake packet size of <c>4</c>.</p> + + <p>This callback is mandatory.</p> + </item> + + <tag><c>int (*connect_handshake_complete)(void *ctx)</c></tag> + <item> + <p>Called when a locally started handshake has completed successfully.</p> + + <p>This callback is optional. Set the <c>connect_handshake_complete</c> field + in the <c>ei_socket_callbacks</c> structure to <c>NULL</c> if not implemented.</p> + </item> + + <tag><c>int (*accept_handshake_complete)(void *ctx)</c></tag> + <item> + <p>Called when a remotely started handshake has completed successfully.</p> + + <p>This callback is optional. Set the <c>accept_handshake_complete</c> field in + the <c>ei_socket_callbacks</c> structure to <c>NULL</c> if not implemented.</p> + </item> + + <tag><c>int (*get_fd)(void *ctx, int *fd)</c></tag> + <item> + <p>Inform about file descriptor used by the socket which is identified + by <c>ctx</c>.</p> + + <note><p>During the lifetime of a socket, the file descriptor + <em>has</em> to remain the same. That is, repeated calls to this + callback with the same context <c>should</c> always report the same + file descriptor.</p> + <p>The file descriptor <em>has</em> to be a real file descriptor. + That is, no other operation should be able to get the same file + descriptor until it has been released by the <c>close()</c> + callback.</p> + </note> + + <p>This callback is mandatory.</p> + </item> + </taglist> + </section> <funcs> <func> <name since=""><ret>struct hostent *</ret><nametext>ei_gethostbyaddr(const char *addr, int len, int type)</nametext></name> @@ -96,6 +363,7 @@ <p>Convenience functions for some common name lookup functions.</p> </desc> </func> + <func> <name since=""><ret>int</ret><nametext>ei_accept(ei_cnode *ec, int listensock, ErlConnect *conp)</nametext></name> @@ -141,6 +409,14 @@ typedef struct { </func> <func> + <name since="OTP @OTP-15442@"><ret>int</ret><nametext>ei_close_connection(int fd)</nametext></name> + <fsummary>Close a connection.</fsummary> + <desc> + <p>Closes a previously opened connection or listen socket.</p> + </desc> + </func> + + <func> <name since=""><ret>int</ret><nametext>ei_connect(ei_cnode* ec, char *nodename)</nametext></name> <name since=""><ret>int</ret><nametext>ei_xconnect(ei_cnode* ec, Erl_IpAddr adr, char *alivename)</nametext></name> <fsummary>Establish a connection to an Erlang node.</fsummary> @@ -193,7 +469,9 @@ fd = ei_xconnect(&ec, &addr, ALIVE); <func> <name since=""><ret>int</ret><nametext>ei_connect_init(ei_cnode* ec, const char* this_node_name, const char *cookie, short creation)</nametext></name> + <name since="OTP @OTP-15442@"><ret>int</ret><nametext>ei_connect_init_ussi(ei_cnode* ec, const char* this_node_name, const char *cookie, short creation, ei_socket_callbacks *cbs, int cbs_sz, void *setup_context)</nametext></name> <name since=""><ret>int</ret><nametext>ei_connect_xinit(ei_cnode* ec, const char *thishostname, const char *thisalivename, const char *thisnodename, Erl_IpAddr thisipaddr, const char *cookie, short creation)</nametext></name> + <name since="OTP @OTP-15442@"><ret>int</ret><nametext>ei_connect_xinit_ussi(ei_cnode* ec, const char *thishostname, const char *thisalivename, const char *thisnodename, Erl_IpAddr thisipaddr, const char *cookie, short creation, ei_socket_callbacks *cbs, int cbs_sz, void *setup_context)</nametext></name> <fsummary>Initialize for a connection.</fsummary> <desc> <p>Initializes the <c>ec</c> structure, to @@ -236,6 +514,21 @@ fd = ei_xconnect(&ec, &addr, ALIVE); <item> <p><c>thispaddr</c> if the IP address of the host.</p> </item> + <item> + <p><c>cbs</c> is a pointer to a + <seealso marker="#ei_socket_callbacks">callback structure</seealso> + implementing and alternative socket interface.</p> + </item> + <item> + <p><c>cbs_sz</c> is the size of the structure + pointed to by <c>cbs</c>.</p> + </item> + <item> + <p><c>setup_context</c> is a pointer to a structure that + will be passed as second argument to the <c>socket</c> callback + in the <c>cbs</c> structure.</p> + </item> + </list> <p>A C-node acting as a server is assigned a creation number when it calls <c>ei_publish()</c>.</p> @@ -299,6 +592,45 @@ if (ei_connect_init(&ec, "madonna", "cookie...", n++) < 0) { </func> <func> + <name since="OTP @OTP-15442@"><ret>int</ret><nametext>ei_listen(ei_cnode *ec, int *port, int backlog)</nametext></name> + <name since="OTP @OTP-15442@"><ret>int</ret><nametext>ei_xlisten(ei_cnode *ec, Erl_IpAddr adr, int *port, int backlog)</nametext></name> + <fsummary>Create a listen socket.</fsummary> + <desc> + <p>Used by a server process to setup a listen socket which + later can be used for accepting connections from client processes. + </p> + <list type="bulleted"> + <item> + <p><c>ec</c> is the C-node structure.</p> + </item> + <item> + <p><c>adr</c> is local interface to bind to.</p> + </item> + <item> + <p><c>port</c> is a pointer to an integer containing the + port number to bind to. If <c>*port</c> equals <c>0</c> + when calling <c>ei_listen()</c>, the socket will be bound to + an ephemeral port. On success, <c>ei_listen()</c> will update + the value of <c>*port</c> to the port actually bound to. + </p> + </item> + <item> + <p><c>backlog</c> is maximum backlog of pending connections.</p> + </item> + </list> + <p><c>ei_listen</c> will create a socket, bind to a port on the + local interface identified by <c>adr</c> (or all local interfaces if + <c>ei_listen()</c> is called), and mark the socket as a passive socket + (that is, a socket that will be used for accepting incoming connections). + </p> + <p> + On success, a file descriptor is returned which can be used in a call to + <c>ei_accept()</c>. On failure, <c>ERL_ERROR</c> is returned and + <c>erl_errno</c> is set to <c>EIO</c>.</p> + </desc> + </func> + + <func> <name since=""><ret>int</ret><nametext>ei_publish(ei_cnode *ec, int port)</nametext></name> <fsummary>Publish a node name.</fsummary> <desc> diff --git a/lib/erl_interface/include/ei.h b/lib/erl_interface/include/ei.h index 948f89be85..92674571e2 100644 --- a/lib/erl_interface/include/ei.h +++ b/lib/erl_interface/include/ei.h @@ -35,6 +35,7 @@ #include <winsock2.h> #include <windows.h> #include <winbase.h> +typedef LONG_PTR ssize_t; /* Sigh... */ #endif #include <stdio.h> /* Need type FILE */ @@ -286,6 +287,31 @@ typedef struct { char nodename[MAXNODELEN+1]; } ErlConnect; +#define EI_SCLBK_INF_TMO (~((unsigned) 0)) + +#define EI_SCLBK_FLG_FULL_IMPL (1 << 0) + +typedef struct { + int flags; + + int (*socket)(void **ctx, void *setup_ctx); + int (*close)(void *ctx); + int (*listen)(void *ctx, void *addr, int *len, int backlog); + int (*accept)(void **ctx, void *addr, int *len, unsigned tmo); + int (*connect)(void *ctx, void *addr, int len, unsigned tmo); + int (*writev)(void *ctx, const void *iov, int iovcnt, ssize_t *len, unsigned tmo); + int (*write)(void *ctx, const char *buf, ssize_t *len, unsigned tmo); + int (*read)(void *ctx, char *buf, ssize_t *len, unsigned tmo); + + int (*handshake_packet_header_size)(void *ctx, int *sz); + int (*connect_handshake_complete)(void *ctx); + int (*accept_handshake_complete)(void *ctx); + int (*get_fd)(void *ctx, int *fd); + + /* end of version 1 */ + +} ei_socket_callbacks; + typedef struct ei_cnode_s { char thishostname[EI_MAXHOSTNAMELEN+1]; char thisnodename[MAXNODELEN+1]; @@ -295,6 +321,8 @@ typedef struct ei_cnode_s { char ei_connect_cookie[EI_MAX_COOKIE_SIZE+1]; short creation; erlang_pid self; + ei_socket_callbacks *cbs; + void *setup_context; } ei_cnode; typedef struct in_addr *Erl_IpAddr; @@ -308,7 +336,6 @@ typedef struct ei_x_buff_TAG { int index; } ei_x_buff; - /* -------------------------------------------------------------------- */ /* Function definitions (listed in same order as documentation) */ /* -------------------------------------------------------------------- */ @@ -322,6 +349,16 @@ int ei_connect_xinit (ei_cnode* ec, const char *thishostname, Erl_IpAddr thisipaddr, const char *cookie, const short creation); +int ei_connect_init_ussi(ei_cnode* ec, const char* this_node_name, + const char *cookie, short creation, + ei_socket_callbacks *cbs, int cbs_sz, + void *setup_context); +int ei_connect_xinit_ussi(ei_cnode* ec, const char *thishostname, + const char *thisalivename, const char *thisnodename, + Erl_IpAddr thisipaddr, const char *cookie, + const short creation, ei_socket_callbacks *cbs, + int cbs_sz, void *setup_context); + int ei_connect(ei_cnode* ec, char *nodename); int ei_connect_tmo(ei_cnode* ec, char *nodename, unsigned ms); int ei_xconnect(ei_cnode* ec, Erl_IpAddr adr, char *alivename); @@ -348,11 +385,15 @@ int ei_rpc_from(ei_cnode* ec, int fd, int timeout, erlang_msg* msg, int ei_publish(ei_cnode* ec, int port); int ei_publish_tmo(ei_cnode* ec, int port, unsigned ms); +int ei_listen(ei_cnode *ec, int *port, int backlog); +int ei_xlisten(ei_cnode *ec, Erl_IpAddr adr, int *port, int backlog); int ei_accept(ei_cnode* ec, int lfd, ErlConnect *conp); int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms); int ei_unpublish(ei_cnode* ec); int ei_unpublish_tmo(const char *alive, unsigned ms); +int ei_close_connection(int fd); + const char *ei_thisnodename(const ei_cnode* ec); const char *ei_thishostname(const ei_cnode* ec); const char *ei_thisalivename(const ei_cnode* ec); diff --git a/lib/erl_interface/src/connect/ei_connect.c b/lib/erl_interface/src/connect/ei_connect.c index 9df4fa3b6c..1132c9fc23 100644 --- a/lib/erl_interface/src/connect/ei_connect.c +++ b/lib/erl_interface/src/connect/ei_connect.c @@ -21,8 +21,6 @@ * Purpose: Connect to any node at any host. (EI version) */ -#include "eidef.h" - #include <stdlib.h> #include <sys/types.h> #include <fcntl.h> @@ -84,7 +82,9 @@ #include <string.h> #include <errno.h> #include <ctype.h> +#include <stddef.h> +#include "eidef.h" #include "eiext.h" #include "ei_portio.h" #include "ei_internal.h" @@ -103,6 +103,10 @@ int ei_tracelevel = 0; #define COOKIE_FILE "/.erlang.cookie" #define EI_MAX_HOME_PATH 1024 +#define EI_SOCKET_CALLBACKS_SZ_V1 \ + (offsetof(ei_socket_callbacks, get_fd) \ + + sizeof(int (*)(void *))) + /* FIXME why not macro? */ static char *null_cookie = ""; @@ -113,35 +117,51 @@ static int get_home(char *buf, int size); static unsigned gen_challenge(void); static void gen_digest(unsigned challenge, char cookie[], unsigned char digest[16]); -static int send_status(int fd, char *status, unsigned ms); -static int recv_status(int fd, unsigned ms); -static int send_challenge(int fd, char *nodename, - unsigned challenge, unsigned version, unsigned ms); -static int recv_challenge(int fd, unsigned *challenge, - unsigned *version, - unsigned *flags, ErlConnect *namebuf, unsigned ms); -static int send_challenge_reply(int fd, unsigned char digest[16], +static int send_status(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, char *status, unsigned ms); +static int recv_status(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned ms); +static int send_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + char *nodename, unsigned challenge, + unsigned version, unsigned ms); +static int recv_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + unsigned *challenge, unsigned *version, + unsigned *flags, char *namebuf, unsigned ms); +static int send_challenge_reply(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned char digest[16], unsigned challenge, unsigned ms); -static int recv_challenge_reply(int fd, - unsigned our_challenge, +static int recv_challenge_reply(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned our_challenge, char cookie[], unsigned *her_challenge, unsigned ms); -static int send_challenge_ack(int fd, unsigned char digest[16], unsigned ms); -static int recv_challenge_ack(int fd, - unsigned our_challenge, +static int send_challenge_ack(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned char digest[16], + unsigned ms); +static int recv_challenge_ack(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned our_challenge, char cookie[], unsigned ms); -static int send_name(int fd, char *nodename, - unsigned version, unsigned ms); +static int send_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + char *nodename, unsigned version, unsigned ms); -/* Common for both handshake types */ -static int recv_name(int fd, - unsigned *version, - unsigned *flags, ErlConnect *namebuf, unsigned ms); +static int recv_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + unsigned *version, unsigned *flags, char *namebuf, + unsigned ms); static struct hostent* dyn_gethostbyname_r(const char *name, struct hostent *hostp, char **buffer_p, int buflen, int *h_errnop); +static void abort_connection(ei_socket_callbacks *cbs, void *ctx); +static int close_connection(ei_socket_callbacks *cbs, void *ctx, int fd); + +static char * +estr(int e) +{ + char *str = strerror(e); + if (!str) + return "unknown error"; + return str; +} /*************************************************************************** @@ -154,25 +174,206 @@ dyn_gethostbyname_r(const char *name, struct hostent *hostp, char **buffer_p, typedef struct ei_socket_info_s { int socket; + ei_socket_callbacks *cbs; + void *ctx; int dist_version; ei_cnode cnode; /* A copy, not a pointer. We don't know when freed */ char cookie[EI_MAX_COOKIE_SIZE+1]; } ei_socket_info; +/*************************************************************************** + * + * XXX + * + ***************************************************************************/ + +#ifndef ETHR_HAVE___atomic_compare_exchange_n +# define ETHR_HAVE___atomic_compare_exchange_n 0 +#endif +#ifndef ETHR_HAVE___atomic_load_n +# define ETHR_HAVE___atomic_load_n 0 +#endif +#ifndef ETHR_HAVE___atomic_store_n +# define ETHR_HAVE___atomic_store_n 0 +#endif + +#if defined(_REENTRANT) \ + && (!(ETHR_HAVE___atomic_compare_exchange_n & SIZEOF_VOID_P) \ + || !(ETHR_HAVE___atomic_load_n & SIZEOF_VOID_P) \ + || !(ETHR_HAVE___atomic_store_n & SIZEOF_VOID_P)) +# undef EI_DISABLE_SEQ_SOCKET_INFO +# define EI_DISABLE_SEQ_SOCKET_INFO +#endif + +#ifdef __WIN32__ +# undef EI_DISABLE_SEQ_SOCKET_INFO +# define EI_DISABLE_SEQ_SOCKET_INFO +#endif + +#ifndef EI_DISABLE_SEQ_SOCKET_INFO + +#ifdef _REENTRANT + +#define EI_ATOMIC_CMPXCHG_ACQ_REL(VARP, XCHGP, NEW) \ + __atomic_compare_exchange_n((VARP), (XCHGP), (NEW), 0, \ + __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE) +#define EI_ATOMIC_LOAD_ACQ(VARP) \ + __atomic_load_n((VARP), __ATOMIC_ACQUIRE) +#define EI_ATOMIC_STORE_REL(VARP, NEW) \ + __atomic_store_n((VARP), (NEW), __ATOMIC_RELEASE) + +#else /* ! _REENTRANT */ + +#define EI_ATOMIC_CMPXCHG_ACQ_REL(VARP, XCHGP, NEW) \ + (*(VARP) == *(XCHGP) \ + ? ((*(VARP) = (NEW)), !0) \ + : ((*(XCHGP) = *(VARP)), 0)) +#define EI_ATOMIC_LOAD_ACQ(VARP) (*(VARP)) +#define EI_ATOMIC_STORE_REL(VARP, NEW) (*(VARP) = (NEW)) + +#endif /* ! _REENTRANT */ + +#define EI_SOCKET_INFO_SEG_BITS 5 +#define EI_SOCKET_INFO_SEG_SIZE (1 << EI_SOCKET_INFO_SEG_BITS) +#define EI_SOCKET_INFO_SEG_MASK (EI_SOCKET_INFO_SEG_SIZE - 1) + +typedef struct { + int max_fds; + ei_socket_info *segments[1]; /* Larger in reality... */ +} ei_socket_info_data__; + +static ei_socket_info_data__ *socket_info_data = NULL; + +static int init_socket_info(void) +{ + int max_fds; + int i; + size_t segments_len; + ei_socket_info_data__ *info_data, *xchg; + + if (EI_ATOMIC_LOAD_ACQ(&socket_info_data) != NULL) + return !0; /* Already initialized... */ + +#if defined(HAVE_SYSCONF) && defined(_SC_OPEN_MAX) + max_fds = sysconf(_SC_OPEN_MAX); +#else + max_fds = 1024; +#endif + + if (max_fds < 0) + return 0; + + segments_len = ((max_fds-1)/EI_SOCKET_INFO_SEG_SIZE + 1); + + info_data = malloc(sizeof(ei_socket_info_data__) + + (sizeof(ei_socket_info *)*(segments_len-1))); + if (!info_data) + return 0; + + info_data->max_fds = max_fds; + for (i = 0; i < segments_len; i++) + info_data->segments[i] = NULL; + + xchg = NULL; + if (!EI_ATOMIC_CMPXCHG_ACQ_REL(&socket_info_data, &xchg, info_data)) + free(info_data); /* Already initialized... */ + + return !0; +} + +static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode *ec, + ei_socket_callbacks *cbs, void *ctx) +{ + int six; + ei_socket_info *seg, *si; + int socket; + + if (fd < 0 || socket_info_data->max_fds <= fd) + return -1; + + socket = fd; + six = fd >> EI_SOCKET_INFO_SEG_BITS; + seg = EI_ATOMIC_LOAD_ACQ(&socket_info_data->segments[six]); + + if (!seg) { + ei_socket_info *xchg; + int i; + seg = malloc(sizeof(ei_socket_info)*EI_SOCKET_INFO_SEG_SIZE); + if (!seg) + return -1; + for (i = 0; i < EI_SOCKET_INFO_SEG_SIZE; i++) { + seg[i].socket = -1; + } + + xchg = NULL; + if (!EI_ATOMIC_CMPXCHG_ACQ_REL(&socket_info_data->segments[six], &xchg, seg)) { + free(seg); + seg = xchg; + } + } + + si = &seg[fd & EI_SOCKET_INFO_SEG_MASK]; + + if (dist_version < 0) { + socket = -1; + si->cbs = NULL; + si->ctx = NULL; + } + else { + si->dist_version = dist_version; + si->cnode = *ec; + si->cbs = cbs; + si->ctx = ctx; + strcpy(si->cookie, cookie); + } + + EI_ATOMIC_STORE_REL(&si->socket, socket); + + return 0; +} + +static ei_socket_info* get_ei_socket_info(int fd) +{ + int six, socket; + ei_socket_info *seg, *si; + + if (fd < 0 || socket_info_data->max_fds <= fd) + return NULL; + + six = fd >> EI_SOCKET_INFO_SEG_BITS; + seg = EI_ATOMIC_LOAD_ACQ(&socket_info_data->segments[six]); + + if (!seg) + return NULL; + + si = &seg[fd & EI_SOCKET_INFO_SEG_MASK]; + socket = EI_ATOMIC_LOAD_ACQ(&si->socket); + if (socket != fd) + return NULL; + return si; +} + +#else /* EI_DISABLE_SEQ_SOCKET_INFO */ + int ei_n_sockets = 0, ei_sz_sockets = 0; ei_socket_info *ei_sockets = NULL; + #ifdef _REENTRANT ei_mutex_t* ei_sockets_lock = NULL; #endif /* _REENTRANT */ +static int init_socket_info(void) +{ +#ifdef _REENTRANT + if (ei_sockets_lock == NULL) { + ei_sockets_lock = ei_mutex_create(); + } +#endif /* _REENTRANT */ + return !0; +} -/*************************************************************************** - * - * XXX - * - ***************************************************************************/ - -static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode *ec) +static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode *ec, + ei_socket_callbacks *cbs, void *ctx) { int i; @@ -182,11 +383,13 @@ static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode * for (i = 0; i < ei_n_sockets; ++i) { if (ei_sockets[i].socket == fd) { if (dist_version == -1) { - memmove(&ei_sockets[i], &ei_sockets[i+1], + memmove(&ei_sockets[i], &ei_sockets[i+1], sizeof(ei_sockets[0])*(ei_n_sockets-i-1)); } else { ei_sockets[i].dist_version = dist_version; /* Copy the content, see ei_socket_info */ + ei_sockets[i].cbs = cbs; + ei_sockets[i].ctx = ctx; ei_sockets[i].cnode = *ec; strcpy(ei_sockets[i].cookie, cookie); } @@ -209,7 +412,9 @@ static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode * } ei_sockets[ei_n_sockets].socket = fd; ei_sockets[ei_n_sockets].dist_version = dist_version; - ei_sockets[i].cnode = *ec; + ei_sockets[ei_n_sockets].cnode = *ec; + ei_sockets[ei_n_sockets].cbs = cbs; + ei_sockets[ei_n_sockets].ctx = ctx; strcpy(ei_sockets[ei_n_sockets].cookie, cookie); ++ei_n_sockets; } @@ -219,14 +424,6 @@ static int put_ei_socket_info(int fd, int dist_version, char* cookie, ei_cnode * return 0; } -#if 0 -/* FIXME not used ?! */ -static int remove_ei_socket_info(int fd, int dist_version, char* cookie) -{ - return put_ei_socket_info(fd, -1, NULL); -} -#endif - static ei_socket_info* get_ei_socket_info(int fd) { int i; @@ -248,6 +445,13 @@ static ei_socket_info* get_ei_socket_info(int fd) return NULL; } +#endif /* EI_DISABLE_SEQ_SOCKET_INFO */ + +static int remove_ei_socket_info(int fd) +{ + return put_ei_socket_info(fd, -1, NULL, NULL, NULL, NULL); +} + ei_cnode *ei_fd_to_cnode(int fd) { ei_socket_info *sockinfo = get_ei_socket_info(fd); @@ -255,6 +459,19 @@ ei_cnode *ei_fd_to_cnode(int fd) return &sockinfo->cnode; } +int ei_get_cbs_ctx__(ei_socket_callbacks **cbs, void **ctx, int fd) +{ + ei_socket_info *sockinfo = get_ei_socket_info(fd); + if (sockinfo) { + *cbs = sockinfo->cbs; + *ctx = sockinfo->ctx; + return 0; + } + + *cbs = NULL; + *ctx = NULL; + return EBADF; +} /*************************************************************************** * Get/Set tracelevel @@ -333,21 +550,6 @@ const char *ei_getfdcookie(int fd) return r; } -/* call with cookie to set value to use on descriptor fd, -* or specify NULL to use default -*/ -/* FIXME why defined but not used? */ -#if 0 -static int ei_setfdcookie(ei_cnode* ec, int fd, char *cookie) -{ - int dist_version = ei_distversion(fd); - - if (cookie == NULL) - cookie = ec->ei_connect_cookie; - return put_ei_socket_info(fd, dist_version, cookie); -} -#endif - static int get_int32(unsigned char *s) { return ((s[0] << 24) | (s[1] << 16) | (s[2] << 8) | (s[3] )); @@ -405,12 +607,16 @@ static int initWinSock(void) * Initailize by setting: * thishostname, thisalivename, thisnodename and thisipaddr */ -int ei_connect_xinit(ei_cnode* ec, const char *thishostname, - const char *thisalivename, const char *thisnodename, - Erl_IpAddr thisipaddr, const char *cookie, - const short creation) +int ei_connect_xinit_ussi(ei_cnode* ec, const char *thishostname, + const char *thisalivename, const char *thisnodename, + Erl_IpAddr thisipaddr, const char *cookie, + const short creation, ei_socket_callbacks *cbs, + int cbs_sz, void *setup_context) { char *dbglevel; + + if (cbs != &ei_default_socket_callbacks) + EI_SET_HAVE_PLUGIN_SOCKET_IMPL__; /* FIXME this code was enabled for 'erl'_connect_xinit(), why not here? */ #if 0 @@ -422,12 +628,16 @@ int ei_connect_xinit(ei_cnode* ec, const char *thishostname, #endif #endif -#ifdef _REENTRANT - if (ei_sockets_lock == NULL) { - ei_sockets_lock = ei_mutex_create(); + if (!init_socket_info()) { + EI_TRACE_ERR0("ei_connect_xinit","can't initiate socket info"); + return ERL_ERROR; } -#endif /* _REENTRANT */ + if (cbs_sz < EI_SOCKET_CALLBACKS_SZ_V1) { + EI_TRACE_ERR0("ei_connect_xinit","invalid size of ei_socket_callbacks struct"); + return ERL_ERROR; + } + ec->creation = creation & 0x3; /* 2 bits */ if (cookie) { @@ -469,6 +679,9 @@ int ei_connect_xinit(ei_cnode* ec, const char *thishostname, ec->self.serial = 0; ec->self.creation = creation & 0x3; /* 2 bits */ + ec->cbs = cbs; + ec->setup_context = setup_context; + if ((dbglevel = getenv("EI_TRACELEVEL")) != NULL || (dbglevel = getenv("ERL_DEBUG_DIST")) != NULL) ei_tracelevel = atoi(dbglevel); @@ -476,14 +689,27 @@ int ei_connect_xinit(ei_cnode* ec, const char *thishostname, return 0; } +int ei_connect_xinit(ei_cnode* ec, const char *thishostname, + const char *thisalivename, const char *thisnodename, + Erl_IpAddr thisipaddr, const char *cookie, + const short creation) +{ + return ei_connect_xinit_ussi(ec, thishostname, thisalivename, thisnodename, + thisipaddr, cookie, creation, + &ei_default_socket_callbacks, + sizeof(ei_default_socket_callbacks), + NULL); +} /* * Initialize by set: thishostname, thisalivename, * thisnodename and thisipaddr. At success return 0, * otherwise return -1. */ -int ei_connect_init(ei_cnode* ec, const char* this_node_name, - const char *cookie, short creation) +int ei_connect_init_ussi(ei_cnode* ec, const char* this_node_name, + const char *cookie, short creation, + ei_socket_callbacks *cbs, int cbs_sz, + void *setup_context) { char thishostname[EI_MAXHOSTNAMELEN+1]; char thisnodename[MAXNODELEN+1]; @@ -500,12 +726,7 @@ int ei_connect_init(ei_cnode* ec, const char* this_node_name, return ERL_ERROR; } #endif /* win32 */ -#ifdef _REENTRANT - if (ei_sockets_lock == NULL) { - ei_sockets_lock = ei_mutex_create(); - } -#endif /* _REENTRANT */ - + /* gethostname requires len to be max(hostname) + 1 */ if (gethostname(thishostname, EI_MAXHOSTNAMELEN+1) == -1) { #ifdef __WIN32__ @@ -561,43 +782,22 @@ int ei_connect_init(ei_cnode* ec, const char* this_node_name, sprintf(thisnodename, "%s@%s", this_node_name, hp->h_name); } } - res = ei_connect_xinit(ec, thishostname, thisalivename, thisnodename, - (struct in_addr *)*hp->h_addr_list, cookie, creation); + res = ei_connect_xinit_ussi(ec, thishostname, thisalivename, thisnodename, + (struct in_addr *)*hp->h_addr_list, cookie, creation, + cbs, cbs_sz, setup_context); if (buf != buffer) free(buf); return res; } - -/* connects to port at ip-address ip_addr -* and returns fd to socket -* port has to be in host byte order -*/ -static int cnct(uint16 port, struct in_addr *ip_addr, int addr_len, unsigned ms) +int ei_connect_init(ei_cnode* ec, const char* this_node_name, + const char *cookie, short creation) { - int s, res; - struct sockaddr_in iserv_addr; - - if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - erl_errno = errno; - return ERL_ERROR; - } - - memset((char*)&iserv_addr, 0, sizeof(struct sockaddr_in)); - memcpy((char*)&iserv_addr.sin_addr, (char*)ip_addr, addr_len); - iserv_addr.sin_family = AF_INET; - iserv_addr.sin_port = htons(port); - - if ((res = ei_connect_t(s, (struct sockaddr*)&iserv_addr, - sizeof(iserv_addr),ms)) < 0) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - closesocket(s); - return ERL_ERROR; - } - - return s; -} /* cnct */ - + return ei_connect_init_ussi(ec, this_node_name, cookie, creation, + &ei_default_socket_callbacks, + sizeof(ei_default_socket_callbacks), + NULL); +} /* * Same as ei_gethostbyname_r, but also handles ERANGE error @@ -758,91 +958,218 @@ int ei_connect(ei_cnode* ec, char *nodename) * the node through epmd at that host * */ -int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr adr, char *alivename, unsigned ms) +int ei_xconnect_tmo(ei_cnode* ec, Erl_IpAddr ip_addr, char *alivename, unsigned ms) { - struct in_addr *ip_addr=(struct in_addr *) adr; + ei_socket_callbacks *cbs = ec->cbs; + void *ctx; int rport = 0; /*uint16 rport = 0;*/ int sockd; - int one = 1; int dist = 0; - ErlConnect her_name; unsigned her_flags, her_version; - + unsigned our_challenge, her_challenge; + unsigned char our_digest[16]; + int err; + int pkt_sz; + struct sockaddr_in addr; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + erl_errno = EIO; /* Default error code */ EI_TRACE_CONN1("ei_xconnect","-> CONNECT attempt to connect to %s", alivename); - if ((rport = ei_epmd_port_tmo(ip_addr,alivename,&dist, ms)) < 0) { + if ((rport = ei_epmd_port_tmo(ip_addr,alivename,&dist, tmo)) < 0) { EI_TRACE_ERR0("ei_xconnect","-> CONNECT can't get remote port"); /* ei_epmd_port_tmo() has set erl_errno */ return ERL_NO_PORT; } - - /* we now have port number to enode, try to connect */ - if((sockd = cnct((uint16)rport, ip_addr, sizeof(struct in_addr),ms)) < 0) { - EI_TRACE_ERR0("ei_xconnect","-> CONNECT socket connect failed"); - /* cnct() has set erl_errno */ - return ERL_CONNECT_FAIL; - } - - EI_TRACE_CONN0("ei_xconnect","-> CONNECT connected to remote"); - /* FIXME why connect before checking 'dist' output from ei_epmd_port() ?! */ if (dist <= 4) { EI_TRACE_ERR0("ei_xconnect","-> CONNECT remote version not compatible"); - goto error; + return ERL_ERROR; } - else { - unsigned our_challenge, her_challenge; - unsigned char our_digest[16]; - - if (send_name(sockd, ec->thisnodename, (unsigned) dist, ms)) - goto error; - if (recv_status(sockd, ms)) - goto error; - if (recv_challenge(sockd, &her_challenge, &her_version, - &her_flags, &her_name, ms)) - goto error; - our_challenge = gen_challenge(); - gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); - if (send_challenge_reply(sockd, our_digest, our_challenge, ms)) - goto error; - if (recv_challenge_ack(sockd, our_challenge, - ec->ei_connect_cookie, ms)) - goto error; - put_ei_socket_info(sockd, dist, null_cookie, ec); /* FIXME check == 0 */ + + err = ei_socket_ctx__(cbs, &ctx, ec->setup_context); + if (err) { + EI_TRACE_ERR2("ei_xconnect","-> SOCKET failed: %s (%d)", + estr(err), err); + erl_errno = err; + return ERL_CONNECT_FAIL; + } + + memset((void *) &addr, 0, sizeof(struct sockaddr_in)); + memcpy((void *) &addr.sin_addr, (void *) ip_addr, sizeof(addr.sin_addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(rport); + + err = ei_connect_ctx_t__(cbs, ctx, (void *) &addr, sizeof(addr), tmo); + if (err) { + EI_TRACE_ERR2("ei_xconnect","-> CONNECT socket connect failed: %s (%d)", + estr(err), err); + abort_connection(cbs, ctx); + erl_errno = err; + return ERL_CONNECT_FAIL; } - setsockopt(sockd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); - setsockopt(sockd, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); + EI_TRACE_CONN0("ei_xconnect","-> CONNECT connected to remote"); - EI_TRACE_CONN1("ei_xconnect","-> CONNECT (ok) remote = %s",alivename); + err = EI_GET_FD__(cbs, ctx, &sockd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + goto error; + } + + err = cbs->handshake_packet_header_size(ctx, &pkt_sz); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + goto error; + } + + if (send_name(cbs, ctx, pkt_sz, ec->thisnodename, (unsigned) dist, tmo)) + goto error; + if (recv_status(cbs, ctx, pkt_sz, tmo)) + goto error; + if (recv_challenge(cbs, ctx, pkt_sz, &her_challenge, + &her_version, &her_flags, NULL, tmo)) + goto error; + our_challenge = gen_challenge(); + gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); + if (send_challenge_reply(cbs, ctx, pkt_sz, our_digest, our_challenge, tmo)) + goto error; + if (recv_challenge_ack(cbs, ctx, pkt_sz, our_challenge, + ec->ei_connect_cookie, tmo)) + goto error; + if (put_ei_socket_info(sockd, dist, null_cookie, ec, cbs, ctx) != 0) + goto error; + + if (cbs->connect_handshake_complete) { + err = cbs->connect_handshake_complete(ctx); + if (err) { + EI_TRACE_ERR2("ei_xconnect","-> CONNECT failed: %s (%d)", + estr(err), err); + close_connection(cbs, ctx, sockd); + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + } + EI_TRACE_CONN1("ei_xconnect","-> CONNECT (ok) remote = %s",alivename); + erl_errno = 0; return sockd; error: EI_TRACE_ERR0("ei_xconnect","-> CONNECT failed"); - closesocket(sockd); + abort_connection(cbs, ctx); return ERL_ERROR; } /* ei_xconnect */ -int ei_xconnect(ei_cnode* ec, Erl_IpAddr adr, char *alivename) +int ei_xconnect(ei_cnode* ec, Erl_IpAddr ip_addr, char *alivename) { - return ei_xconnect_tmo(ec, adr, alivename, 0); + return ei_xconnect_tmo(ec, ip_addr, alivename, 0); } +int ei_listen(ei_cnode *ec, int *port, int backlog) +{ + struct in_addr ip_addr; + ip_addr.s_addr = htonl(INADDR_ANY); + return ei_xlisten(ec, &ip_addr, port, backlog); +} + +int ei_xlisten(ei_cnode *ec, struct in_addr *ip_addr, int *port, int backlog) +{ + ei_socket_callbacks *cbs = ec->cbs; + struct sockaddr_in sock_addr; + void *ctx; + int fd, err, len; + + err = ei_socket_ctx__(cbs, &ctx, ec->setup_context); + if (err) { + EI_TRACE_ERR2("ei_xlisten","-> SOCKET failed: %s (%d)", + estr(err), err); + erl_errno = err; + return ERL_ERROR; + } + + memset((void *) &sock_addr, 0, sizeof(struct sockaddr_in)); + memcpy((void *) &sock_addr.sin_addr, (void *) ip_addr, sizeof(*ip_addr)); + sock_addr.sin_family = AF_INET; + sock_addr.sin_port = htons((short) *port); + + len = sizeof(sock_addr); + err = ei_listen_ctx__(cbs, ctx, (void *) &sock_addr, &len, backlog); + if (err) { + EI_TRACE_ERR2("ei_xlisten","-> listen failed: %s (%d)", + estr(err), err); + erl_errno = err; + goto error; + } + + if (len != sizeof(sock_addr)) { + if (len < offsetof(struct sockaddr_in, sin_addr) + sizeof(sock_addr.sin_addr) + || len < offsetof(struct sockaddr_in, sin_port) + sizeof(sock_addr.sin_port)) { + erl_errno = EIO; + EI_TRACE_ERR0("ei_xlisten","-> get info failed"); + goto error; + } + } + + memcpy((void *) ip_addr, (void *) &sock_addr.sin_addr, sizeof(*ip_addr)); + *port = (int) ntohs(sock_addr.sin_port); + + err = EI_GET_FD__(cbs, ctx, &fd); + if (err) { + erl_errno = err; + goto error; + } + + if (put_ei_socket_info(fd, 0, null_cookie, ec, cbs, ctx) != 0) { + EI_TRACE_ERR0("ei_xlisten","-> save socket info failed"); + erl_errno = EIO; + goto error; + } + + erl_errno = 0; + + return fd; + +error: + abort_connection(cbs, ctx); + return ERL_ERROR; +} + +static int close_connection(ei_socket_callbacks *cbs, void *ctx, int fd) +{ + int err; + remove_ei_socket_info(fd); + err = ei_close_ctx__(cbs, ctx); + if (err) { + erl_errno = err; + return ERL_ERROR; + } + return 0; +} - /* - * For symmetry reasons -*/ -#if 0 int ei_close_connection(int fd) { - return closesocket(fd); + ei_socket_callbacks *cbs; + void *ctx; + int err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) + erl_errno = err; + else { + if (close_connection(cbs, ctx, fd) == 0) + return 0; + } + EI_TRACE_ERR2("ei_close_connection","<- CLOSE socket close failed: %s (%d)", + estr(erl_errno), erl_errno); + return ERL_ERROR; } /* ei_close_connection */ -#endif + +static void abort_connection(ei_socket_callbacks *cbs, void *ctx) +{ + (void) ei_close_ctx__(cbs, ctx); +} /* * Accept and initiate a connection from another @@ -857,25 +1184,71 @@ int ei_accept(ei_cnode* ec, int lfd, ErlConnect *conp) int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms) { int fd; - struct sockaddr_in cli_addr; - int cli_addr_len=sizeof(struct sockaddr_in); unsigned her_version, her_flags; - ErlConnect her_name; + char tmp_nodename[MAXNODELEN+1]; + char *her_name; + int pkt_sz, err; + struct sockaddr_in addr; + int addr_len = sizeof(struct sockaddr_in); + ei_socket_callbacks *cbs; + void *ctx; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; erl_errno = EIO; /* Default error code */ + + err = EI_GET_CBS_CTX__(&cbs, &ctx, lfd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + EI_TRACE_CONN0("ei_accept","<- ACCEPT waiting for connection"); + + if (conp) { + her_name = &conp->nodename[0]; + } + else { + her_name = &tmp_nodename[0]; + } - if ((fd = ei_accept_t(lfd, (struct sockaddr*) &cli_addr, - &cli_addr_len, ms )) < 0) { - EI_TRACE_ERR0("ei_accept","<- ACCEPT socket accept failed"); - erl_errno = (fd == -2) ? ETIMEDOUT : EIO; - goto error; + /* + * ei_accept_ctx_t__() replaces the pointer to the listen context + * with a pointer to the accepted connection context on success. + */ + err = ei_accept_ctx_t__(cbs, &ctx, (void *) &addr, &addr_len, tmo); + if (err) { + EI_TRACE_ERR2("ei_accept","<- ACCEPT socket accept failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + + err = EI_GET_FD__(cbs, ctx, &fd); + if (err) { + EI_TRACE_ERR2("ei_accept","<- ACCEPT get fd failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); + } + + if (addr_len != sizeof(struct sockaddr_in)) { + if (addr_len < (offsetof(struct sockaddr_in, sin_addr) + + sizeof(addr.sin_addr))) { + EI_TRACE_ERR0("ei_accept","<- ACCEPT get addr failed"); + goto error; + } + } + + err = cbs->handshake_packet_header_size(ctx, &pkt_sz); + if (err) { + EI_TRACE_ERR2("ei_accept","<- ACCEPT get packet size failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); } EI_TRACE_CONN0("ei_accept","<- ACCEPT connected to remote"); - if (recv_name(fd, &her_version, &her_flags, &her_name, ms)) { + if (recv_name(cbs, ctx, pkt_sz, &her_version, &her_flags, her_name, tmo)) { EI_TRACE_ERR0("ei_accept","<- ACCEPT initial ident failed"); goto error; } @@ -888,34 +1261,46 @@ int ei_accept_tmo(ei_cnode* ec, int lfd, ErlConnect *conp, unsigned ms) unsigned our_challenge; unsigned her_challenge; unsigned char our_digest[16]; - - if (send_status(fd,"ok", ms)) + + if (send_status(cbs, ctx, pkt_sz, "ok", tmo)) goto error; our_challenge = gen_challenge(); - if (send_challenge(fd, ec->thisnodename, - our_challenge, her_version, ms)) + if (send_challenge(cbs, ctx, pkt_sz, ec->thisnodename, + our_challenge, her_version, tmo)) goto error; - if (recv_challenge_reply(fd, our_challenge, - ec->ei_connect_cookie, - &her_challenge, ms)) + if (recv_challenge_reply(cbs, ctx, pkt_sz, our_challenge, + ec->ei_connect_cookie, &her_challenge, tmo)) goto error; gen_digest(her_challenge, ec->ei_connect_cookie, our_digest); - if (send_challenge_ack(fd, our_digest, ms)) + if (send_challenge_ack(cbs, ctx, pkt_sz, our_digest, tmo)) goto error; - put_ei_socket_info(fd, her_version, null_cookie, ec); + if (put_ei_socket_info(fd, her_version, null_cookie, ec, cbs, ctx) != 0) + goto error; + } + if (conp) { + memcpy((void *) conp->ipadr, (void *) &addr.sin_addr, sizeof(conp->ipadr)); + strcpy(&conp->nodename[0], her_name); + } + + if (cbs->accept_handshake_complete) { + err = cbs->accept_handshake_complete(ctx); + if (err) { + EI_TRACE_ERR2("ei_xconnect","-> ACCEPT handshake failed: %s (%d)", + estr(err), err); + close_connection(cbs, ctx, fd); + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } } - if (conp) - *conp = her_name; - EI_TRACE_CONN1("ei_accept","<- ACCEPT (ok) remote = %s",her_name.nodename); + EI_TRACE_CONN1("ei_accept","<- ACCEPT (ok) remote = %s",her_name); erl_errno = 0; /* No error */ return fd; error: EI_TRACE_ERR0("ei_accept","<- ACCEPT failed"); - if (fd>=0) - closesocket(fd); + abort_connection(cbs, ctx); return ERL_ERROR; } /* ei_accept */ @@ -927,36 +1312,57 @@ error: */ int ei_receive_tmo(int fd, unsigned char *bufp, int bufsize, unsigned ms) { - int len; + ssize_t len; unsigned char fourbyte[4]={0,0,0,0}; - int res; - - if ((res = ei_read_fill_t(fd, (char *) bufp, 4, ms)) != 4) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + int err; + ei_socket_callbacks *cbs; + void *ctx; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + + len = (ssize_t) 4; + err = ei_read_fill_ctx_t__(cbs, ctx, (char *) bufp, &len, tmo); + if (!err && len != (ssize_t) 4) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return ERL_ERROR; } /* Tick handling */ - if ((len = get_int32(bufp)) == ERL_TICK) - { - ei_write_fill_t(fd, (char *) fourbyte, 4, ms); + len = get_int32(bufp); + if (len == ERL_TICK) { + len = 4; + ei_write_fill_ctx_t__(cbs, ctx, (char *) fourbyte, &len, tmo); /* FIXME ok to ignore error or timeout? */ erl_errno = EAGAIN; return ERL_TICK; } - else if (len > bufsize) - { + + if (len > bufsize) { /* FIXME: We should drain the message. */ erl_errno = EMSGSIZE; return ERL_ERROR; } - else if ((res = ei_read_fill_t(fd, (char *) bufp, len, ms)) != len) - { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return ERL_ERROR; + else { + ssize_t need = len; + err = ei_read_fill_ctx_t__(cbs, ctx, (char *) bufp, &len, tmo); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } + if (len != need) { + erl_errno = EIO; + return ERL_ERROR; + } } - return len; + return (int) len; } @@ -1112,36 +1518,11 @@ int ei_rpc_to(ei_cnode *ec, int fd, char *mod, char *fun, int ei_rpc_from(ei_cnode *ec, int fd, int timeout, erlang_msg *msg, ei_x_buff *x) { - fd_set readmask; - struct timeval tv; - struct timeval *t = NULL; - - if (timeout >= 0) { - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout % 1000) * 1000; - t = &tv; - } - - FD_ZERO(&readmask); - FD_SET(fd,&readmask); - - switch (select(fd+1, &readmask, NULL, NULL, t)) { - case -1: - erl_errno = EIO; - return ERL_ERROR; - - case 0: - erl_errno = ETIMEDOUT; - return ERL_TIMEOUT; - - default: - if (FD_ISSET(fd, &readmask)) { - return ei_xreceive_msg(fd, msg, x); - } else { - erl_errno = EIO; - return ERL_ERROR; - } - } + unsigned tmo = timeout < 0 ? EI_SCLBK_INF_TMO : (unsigned) timeout; + int res = ei_xreceive_msg_tmo(fd, msg, x, tmo); + if (res < 0 && erl_errno == ETIMEDOUT) + return ERL_TIMEOUT; + return res; } /* rpc_from */ /* @@ -1295,19 +1676,34 @@ static char *hex(char digest[16], char buff[33]) return buff; } -static int read_2byte_package(int fd, char **buf, int *buflen, - int *is_static, unsigned ms) +static int read_hs_package(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, char **buf, int *buflen, + int *is_static, unsigned ms) { - unsigned char nbuf[2]; + unsigned char nbuf[4]; unsigned char *x = nbuf; - unsigned len; - int res; - - if((res = ei_read_fill_t(fd, (char *)nbuf, 2, ms)) != 2) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + ssize_t len, need; + int err; + + len = (ssize_t) pkt_sz; + err = ei_read_fill_ctx_t__(cbs, ctx, (char *)nbuf, &len, ms); + if (!err && len != (ssize_t) pkt_sz) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return -1; } - len = get16be(x); + + switch (pkt_sz) { + case 2: + len = get16be(x); + break; + case 4: + len = get32be(x); + break; + default: + return -1; + } if (len > *buflen) { if (*is_static) { @@ -1329,20 +1725,26 @@ static int read_2byte_package(int fd, char **buf, int *buflen, *buflen = len; } } - if ((res = ei_read_fill_t(fd, *buf, len, ms)) != len) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + need = len; + err = ei_read_fill_ctx_t__(cbs, ctx, *buf, &len, ms); + if (!err && len != need) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return -1; } return len; } -static int send_status(int fd, char *status, unsigned ms) +static int send_status(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, char *status, unsigned ms) { char *buf, *s; char dbuf[DEFBUF_SIZ]; - int siz = strlen(status) + 1 + 2; - int res; + int siz = strlen(status) + 1 + pkt_sz; + int err; + ssize_t len; buf = (siz > DEFBUF_SIZ) ? malloc(siz) : dbuf; if (!buf) { @@ -1350,14 +1752,28 @@ static int send_status(int fd, char *status, unsigned ms) return -1; } s = buf; - put16be(s,siz - 2); + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } put8(s, 's'); memcpy(s, status, strlen(status)); - if ((res = ei_write_fill_t(fd, buf, siz, ms)) != siz) { - EI_TRACE_ERR0("send_status","-> SEND_STATUS socket write failed"); + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { + EI_TRACE_ERR2("send_status","-> SEND_STATUS socket write failed: %s (%d)", + estr(err), err); if (buf != dbuf) - free(buf); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + free(buf); + EI_CONN_SAVE_ERRNO__(err); return -1; } EI_TRACE_CONN1("send_status","-> SEND_STATUS (%s)",status); @@ -1367,7 +1783,8 @@ static int send_status(int fd, char *status, unsigned ms) return 0; } -static int recv_status(int fd, unsigned ms) +static int recv_status(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; @@ -1375,7 +1792,8 @@ static int recv_status(int fd, unsigned ms) int buflen = DEFBUF_SIZ; int rlen; - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) <= 0) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, + &buf, &buflen, &is_static, ms)) <= 0) { EI_TRACE_ERR1("recv_status", "<- RECV_STATUS socket read failed (%d)", rlen); goto error; @@ -1396,7 +1814,10 @@ error: return -1; } -static int send_name_or_challenge(int fd, char *nodename, +static int send_name_or_challenge(ei_socket_callbacks *cbs, + void *ctx, + int pkt_sz, + char *nodename, int f_chall, unsigned challenge, unsigned version, @@ -1405,9 +1826,10 @@ static int send_name_or_challenge(int fd, char *nodename, char *buf; unsigned char *s; char dbuf[DEFBUF_SIZ]; - int siz = 2 + 1 + 2 + 4 + strlen(nodename); + int siz = pkt_sz + 1 + 2 + 4 + strlen(nodename); const char* function[] = {"SEND_NAME", "SEND_CHALLENGE"}; - int res; + int err; + ssize_t len; if (f_chall) siz += 4; @@ -1417,7 +1839,16 @@ static int send_name_or_challenge(int fd, char *nodename, return -1; } s = (unsigned char *)buf; - put16be(s,siz - 2); + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } put8(s, 'n'); put16be(s, version); put32be(s, (DFLAG_EXTENDED_REFERENCES @@ -1433,13 +1864,16 @@ static int send_name_or_challenge(int fd, char *nodename, if (f_chall) put32be(s, challenge); memcpy(s, nodename, strlen(nodename)); - - if ((res = ei_write_fill_t(fd, buf, siz, ms)) != siz) { + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { EI_TRACE_ERR1("send_name_or_challenge", "-> %s socket write failed", function[f_chall]); if (buf != dbuf) free(buf); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + EI_CONN_SAVE_ERRNO__(err); return -1; } @@ -1448,9 +1882,9 @@ static int send_name_or_challenge(int fd, char *nodename, return 0; } -static int recv_challenge(int fd, unsigned *challenge, - unsigned *version, - unsigned *flags, ErlConnect *namebuf, unsigned ms) +static int recv_challenge(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned *challenge, unsigned *version, + unsigned *flags, char *namebuf, unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; @@ -1458,13 +1892,13 @@ static int recv_challenge(int fd, unsigned *challenge, int buflen = DEFBUF_SIZ; int rlen; char *s; - struct sockaddr_in sin; - socklen_t sin_len = sizeof(sin); char tag; - + char tmp_nodename[MAXNODELEN+1]; + erl_errno = EIO; /* Default */ - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) <= 0) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, + &is_static, ms)) <= 0) { EI_TRACE_ERR1("recv_challenge", "<- RECV_CHALLENGE socket read failed (%d)",rlen); goto error; @@ -1505,22 +1939,19 @@ static int recv_challenge(int fd, unsigned *challenge, goto error; } - if (getpeername(fd, (struct sockaddr *) &sin, &sin_len) < 0) { - EI_TRACE_ERR0("recv_challenge","<- RECV_CHALLENGE can't get peername"); - erl_errno = errno; - goto error; - } - memcpy(namebuf->ipadr, &(sin.sin_addr.s_addr), - sizeof(sin.sin_addr.s_addr)); - memcpy(namebuf->nodename, s, rlen - 11); - namebuf->nodename[rlen - 11] = '\0'; + if (!namebuf) + namebuf = &tmp_nodename[0]; + + memcpy(namebuf, s, rlen - 11); + namebuf[rlen - 11] = '\0'; + if (!is_static) free(buf); EI_TRACE_CONN4("recv_challenge","<- RECV_CHALLENGE (ok) node = %s, " "version = %u, " "flags = %u, " "challenge = %d", - namebuf->nodename, + namebuf, *version, *flags, *challenge @@ -1533,24 +1964,40 @@ error: return -1; } -static int send_challenge_reply(int fd, unsigned char digest[16], +static int send_challenge_reply(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned char digest[16], unsigned challenge, unsigned ms) { char *s; char buf[DEFBUF_SIZ]; - int siz = 2 + 1 + 4 + 16; - int res; + int siz = pkt_sz + 1 + 4 + 16; + int err; + ssize_t len; s = buf; - put16be(s,siz - 2); + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } put8(s, 'r'); put32be(s, challenge); memcpy(s, digest, 16); - - if ((res = ei_write_fill_t(fd, buf, siz, ms)) != siz) { - EI_TRACE_ERR0("send_challenge_reply", - "-> SEND_CHALLENGE_REPLY socket write failed"); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { + EI_TRACE_ERR2("send_challenge_reply", + "-> SEND_CHALLENGE_REPLY socket write failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); return -1; } @@ -1563,11 +2010,13 @@ static int send_challenge_reply(int fd, unsigned char digest[16], return 0; } -static int recv_challenge_reply (int fd, - unsigned our_challenge, - char cookie[], - unsigned *her_challenge, - unsigned ms) +static int recv_challenge_reply(ei_socket_callbacks *cbs, + void *ctx, + int pkt_sz, + unsigned our_challenge, + char cookie[], + unsigned *her_challenge, + unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; @@ -1580,7 +2029,7 @@ static int recv_challenge_reply (int fd, erl_errno = EIO; /* Default */ - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) != 21) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, &is_static, ms)) != 21) { EI_TRACE_ERR1("recv_challenge_reply", "<- RECV_CHALLENGE_REPLY socket read failed (%d)",rlen); goto error; @@ -1620,23 +2069,38 @@ error: return -1; } -static int send_challenge_ack(int fd, unsigned char digest[16], unsigned ms) +static int send_challenge_ack(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + unsigned char digest[16], unsigned ms) { char *s; char buf[DEFBUF_SIZ]; - int siz = 2 + 1 + 16; - int res; + int siz = pkt_sz + 1 + 16; + int err; + ssize_t len; s = buf; - - put16be(s,siz - 2); + switch (pkt_sz) { + case 2: + put16be(s,siz - 2); + break; + case 4: + put32be(s,siz - 4); + break; + default: + return -1; + } put8(s, 'a'); memcpy(s, digest, 16); - if ((res = ei_write_fill_t(fd, buf, siz, ms)) != siz) { - EI_TRACE_ERR0("recv_challenge_reply", - "-> SEND_CHALLENGE_ACK socket write failed"); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + len = (ssize_t) siz; + err = ei_write_fill_ctx_t__(cbs, ctx, buf, &len, ms); + if (!err && len != (ssize_t) siz) + err = EIO; + if (err) { + EI_TRACE_ERR2("recv_challenge_reply", + "-> SEND_CHALLENGE_ACK socket write failed: %s (%d)", + estr(err), err); + EI_CONN_SAVE_ERRNO__(err); return -1; } @@ -1649,8 +2113,8 @@ static int send_challenge_ack(int fd, unsigned char digest[16], unsigned ms) return 0; } -static int recv_challenge_ack(int fd, - unsigned our_challenge, +static int recv_challenge_ack(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned our_challenge, char cookie[], unsigned ms) { char dbuf[DEFBUF_SIZ]; @@ -1664,7 +2128,7 @@ static int recv_challenge_ack(int fd, erl_errno = EIO; /* Default */ - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) != 17) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, &is_static, ms)) != 17) { EI_TRACE_ERR1("recv_challenge_ack", "<- RECV_CHALLENGE_ACK socket read failed (%d)",rlen); goto error; @@ -1701,20 +2165,24 @@ error: return -1; } -static int send_name(int fd, char *nodename, unsigned version, unsigned ms) +static int send_name(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + char *nodename, unsigned version, unsigned ms) { - return send_name_or_challenge(fd, nodename, 0, 0, version, ms); + return send_name_or_challenge(cbs, ctx, pkt_sz, nodename, 0, + 0, version, ms); } -static int send_challenge(int fd, char *nodename, - unsigned challenge, unsigned version, unsigned ms) +static int send_challenge(ei_socket_callbacks *cbs, void *ctx, int pkt_sz, + char *nodename, unsigned challenge, unsigned version, + unsigned ms) { - return send_name_or_challenge(fd, nodename, 1, challenge, version, ms); + return send_name_or_challenge(cbs, ctx, pkt_sz, nodename, 1, + challenge, version, ms); } -static int recv_name(int fd, - unsigned *version, - unsigned *flags, ErlConnect *namebuf, unsigned ms) +static int recv_name(ei_socket_callbacks *cbs, void *ctx, + int pkt_sz, unsigned *version, + unsigned *flags, char *namebuf, unsigned ms) { char dbuf[DEFBUF_SIZ]; char *buf = dbuf; @@ -1722,13 +2190,13 @@ static int recv_name(int fd, int buflen = DEFBUF_SIZ; int rlen; char *s; - struct sockaddr_in sin; - socklen_t sin_len = sizeof(sin); + char tmp_nodename[MAXNODELEN+1]; char tag; erl_errno = EIO; /* Default */ - if ((rlen = read_2byte_package(fd, &buf, &buflen, &is_static, ms)) <= 0) { + if ((rlen = read_hs_package(cbs, ctx, pkt_sz, &buf, &buflen, + &is_static, ms)) <= 0) { EI_TRACE_ERR1("recv_name","<- RECV_NAME socket read failed (%d)",rlen); goto error; } @@ -1759,21 +2227,18 @@ static int recv_name(int fd, erl_errno = EIO; goto error; } - - if (getpeername(fd, (struct sockaddr *) &sin, &sin_len) < 0) { - EI_TRACE_ERR0("recv_name","<- RECV_NAME can't get peername"); - erl_errno = errno; - goto error; - } - memcpy(namebuf->ipadr, &(sin.sin_addr.s_addr), - sizeof(sin.sin_addr.s_addr)); - memcpy(namebuf->nodename, s, rlen - 7); - namebuf->nodename[rlen - 7] = '\0'; + + if (!namebuf) + namebuf = &tmp_nodename[0]; + + memcpy(namebuf, s, rlen - 7); + namebuf[rlen - 7] = '\0'; + if (!is_static) free(buf); EI_TRACE_CONN3("recv_name", "<- RECV_NAME (ok) node = %s, version = %u, flags = %u", - namebuf->nodename,*version,*flags); + namebuf,*version,*flags); erl_errno = 0; return 0; @@ -1867,3 +2332,4 @@ static int get_cookie(char *buf, int bufsize) return 1; /* Success! */ } + diff --git a/lib/erl_interface/src/connect/eirecv.c b/lib/erl_interface/src/connect/eirecv.c index 7b9dbfc387..47eea06ced 100644 --- a/lib/erl_interface/src/connect/eirecv.c +++ b/lib/erl_interface/src/connect/eirecv.c @@ -60,22 +60,36 @@ ei_recv_internal (int fd, int arity; int version; int index = 0; - int i = 0; - int res; + int err; int show_this_msg = 0; + ei_socket_callbacks *cbs; + void *ctx; + ssize_t rlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } /* get length field */ - if ((res = ei_read_fill_t(fd, header, 4, ms)) != 4) - { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; + rlen = 4; + err = ei_read_fill_ctx_t__(cbs, ctx, header, &rlen, tmo); + if (!err && rlen != 4) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return -1; } + len = get32be(s); /* got tick - respond and return */ if (!len) { char tock[] = {0,0,0,0}; - ei_write_fill_t(fd, tock, sizeof(tock), ms); /* Failure no problem */ + ssize_t wlen = sizeof(tock); + ei_write_fill_ctx_t__(cbs, ctx, tock, &wlen, tmo); /* Failure no problem */ *msglenp = 0; return 0; /* maybe flag ERL_EAGAIN [sverkerw] */ } @@ -86,9 +100,12 @@ ei_recv_internal (int fd, ei_trace(-1,NULL); /* read enough to get at least entire header */ - bytesread = (len > EIRECVBUF ? EIRECVBUF : len); - if ((i = ei_read_fill_t(fd,header,bytesread,ms)) != bytesread) { - erl_errno = (i == -2) ? ETIMEDOUT : EIO; + rlen = bytesread = (len > EIRECVBUF ? EIRECVBUF : len); + err = ei_read_fill_ctx_t__(cbs, ctx, header, &rlen, tmo); + if (!err && rlen != bytesread) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); return -1; } @@ -212,12 +229,17 @@ ei_recv_internal (int fd, */ if (msglen > *bufsz) { if (staticbufp) { - int sz = EIRECVBUF; /* flush in rest of packet */ while (remain > 0) { - if (remain < sz) sz = remain; - if ((i=ei_read_fill_t(fd,header,sz,ms)) <= 0) break; - remain -= i; + rlen = remain > EIRECVBUF ? EIRECVBUF : remain; + err = ei_read_fill_ctx_t__(cbs, ctx, header, &rlen, tmo); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } + if (rlen == 0) + break; + remain -= rlen; } erl_errno = EMSGSIZE; return -1; @@ -247,11 +269,15 @@ ei_recv_internal (int fd, /* read the rest of the message into callers buffer */ if (remain > 0) { - if ((i = ei_read_fill_t(fd,mbuf+bytesread-index,remain,ms)) != remain) { - *msglenp = bytesread-index+1; /* actual bytes in users buffer */ - erl_errno = (i == -2) ? ETIMEDOUT : EIO; - return -1; - } + rlen = remain; + err = ei_read_fill_ctx_t__(cbs, ctx, mbuf+bytesread-index, &rlen, tmo); + if (!err && rlen != remain) + err = EIO; + if (err) { + *msglenp = bytesread-index+1; /* actual bytes in users buffer */ + EI_CONN_SAVE_ERRNO__(err); + return -1; + } } if (show_this_msg) diff --git a/lib/erl_interface/src/connect/send.c b/lib/erl_interface/src/connect/send.c index 37d7db6d68..d97532d123 100644 --- a/lib/erl_interface/src/connect/send.c +++ b/lib/erl_interface/src/connect/send.c @@ -58,10 +58,17 @@ int ei_send_encoded_tmo(int fd, const erlang_pid *to, char *s, header[1200]; /* see size calculation below */ erlang_trace *token = NULL; int index = 5; /* reserve 5 bytes for control message */ - int res; -#ifdef HAVE_WRITEV - struct iovec v[2]; -#endif + int err; + ei_socket_callbacks *cbs; + void *ctx; + ssize_t len, tot_len; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } /* are we tracing? */ /* check that he can receive trace tokens first */ @@ -91,30 +98,47 @@ int ei_send_encoded_tmo(int fd, const erlang_pid *to, if (ei_tracelevel >= 4) ei_show_sendmsg(stderr,header,msg); -#ifdef HAVE_WRITEV - - v[0].iov_base = (char *)header; - v[0].iov_len = index; - v[1].iov_base = (char *)msg; - v[1].iov_len = msglen; - - if ((res = ei_writev_fill_t(fd,v,2,ms)) != index+msglen) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; - } - -#else /* !HAVE_WRITEV */ - - if ((res = ei_write_fill_t(fd,header,index,ms)) != index) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + +#ifdef EI_HAVE_STRUCT_IOVEC__ + if (ei_socket_callbacks_have_writev__(cbs)) { + struct iovec v[2]; + + v[0].iov_base = (char *)header; + v[0].iov_len = index; + v[1].iov_base = (char *)msg; + v[1].iov_len = msglen; + + len = tot_len = (ssize_t) index+msglen; + err = ei_writev_fill_ctx_t__(cbs, ctx, v, 2, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } + + return 0; } - if ((res = ei_write_fill_t(fd,msg,msglen,ms)) != msglen) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; +#endif /* EI_HAVE_STRUCT_IOVEC__ */ + + /* no writev() */ + len = tot_len = (ssize_t) index; + err = ei_write_fill_ctx_t__(cbs, ctx, header, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; } -#endif /* !HAVE_WRITEV */ + len = tot_len = (ssize_t) msglen; + err = ei_write_fill_ctx_t__(cbs, ctx, msg, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } return 0; } diff --git a/lib/erl_interface/src/connect/send_exit.c b/lib/erl_interface/src/connect/send_exit.c index 2e298e3221..b4f7e14c7f 100644 --- a/lib/erl_interface/src/connect/send_exit.c +++ b/lib/erl_interface/src/connect/send_exit.c @@ -55,6 +55,17 @@ int ei_send_exit_tmo(int fd, const erlang_pid *from, const erlang_pid *to, char *s; int index = 0; int len = strlen(reason) + 1080; /* see below */ + ei_socket_callbacks *cbs; + void *ctx; + int err; + ssize_t wlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } if (len > EISMALLBUF) if (!(dbuf = malloc(len))) @@ -92,10 +103,16 @@ int ei_send_exit_tmo(int fd, const erlang_pid *from, const erlang_pid *to, if (ei_tracelevel >= 4) ei_show_sendmsg(stderr,msgbuf,NULL); - ei_write_fill_t(fd,msgbuf,index,ms); - /* FIXME ignore timeout etc? erl_errno?! */ - - if (dbuf) free(dbuf); + wlen = (ssize_t) index; + err = ei_write_fill_ctx_t__(cbs, ctx, msgbuf, &wlen, tmo); + if (!err && wlen != (ssize_t) index) + err = EIO; + if (dbuf) + free(dbuf); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } return 0; } diff --git a/lib/erl_interface/src/connect/send_reg.c b/lib/erl_interface/src/connect/send_reg.c index 62478f042d..80d61e57b5 100644 --- a/lib/erl_interface/src/connect/send_reg.c +++ b/lib/erl_interface/src/connect/send_reg.c @@ -51,11 +51,17 @@ int ei_send_reg_encoded_tmo(int fd, const erlang_pid *from, char *s, header[1400]; /* see size calculation below */ erlang_trace *token = NULL; int index = 5; /* reserve 5 bytes for control message */ - int res; + int err; + ei_socket_callbacks *cbs; + void *ctx; + ssize_t len, tot_len; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; -#ifdef HAVE_WRITEV - struct iovec v[2]; -#endif + err = EI_GET_CBS_CTX__(&cbs, &ctx, fd); + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return ERL_ERROR; + } /* are we tracing? */ /* check that he can receive trace tokens first */ @@ -86,29 +92,45 @@ int ei_send_reg_encoded_tmo(int fd, const erlang_pid *from, if (ei_tracelevel >= 4) ei_show_sendmsg(stderr,header,msg); -#ifdef HAVE_WRITEV +#ifdef EI_HAVE_STRUCT_IOVEC__ + if (ei_socket_callbacks_have_writev__(cbs)) { + struct iovec v[2]; - v[0].iov_base = (char *)header; - v[0].iov_len = index; - v[1].iov_base = (char *)msg; - v[1].iov_len = msglen; + v[0].iov_base = (char *)header; + v[0].iov_len = index; + v[1].iov_base = (char *)msg; + v[1].iov_len = msglen; - if ((res = ei_writev_fill_t(fd,v,2,ms)) != index+msglen) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + len = tot_len = (ssize_t) index+msglen; + err = ei_writev_fill_ctx_t__(cbs, ctx, v, 2, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; + } + return 0; } -#else - +#endif /* EI_HAVE_STRUCT_IOVEC__ */ + /* no writev() */ - if ((res = ei_write_fill_t(fd,header,index,ms)) != index) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + len = tot_len = (ssize_t) index; + err = ei_write_fill_ctx_t__(cbs, ctx, header, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; } - if ((res = ei_write_fill_t(fd,msg,msglen,ms)) != msglen) { - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + + len = tot_len = (ssize_t) msglen; + err = ei_write_fill_ctx_t__(cbs, ctx, msg, &len, tmo); + if (!err && len != tot_len) + err = EIO; + if (err) { + EI_CONN_SAVE_ERRNO__(err); + return -1; } -#endif return 0; } diff --git a/lib/erl_interface/src/epmd/epmd_port.c b/lib/erl_interface/src/epmd/epmd_port.c index 2ec418b24a..492c3fb3aa 100644 --- a/lib/erl_interface/src/epmd/epmd_port.c +++ b/lib/erl_interface/src/epmd/epmd_port.c @@ -62,31 +62,38 @@ int ei_epmd_connect_tmo(struct in_addr *inaddr, unsigned ms) { static unsigned int epmd_port = 0; - struct sockaddr_in saddr; - int sd; - int res; + int port, sd, err; + struct in_addr ip_addr; + struct sockaddr_in addr; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + + err = ei_socket__(&sd); + if (err) { + erl_errno = err; + return -1; + } if (epmd_port == 0) { char* port_str = getenv("ERL_EPMD_PORT"); epmd_port = (port_str != NULL) ? atoi(port_str) : EPMD_PORT; } - memset(&saddr, 0, sizeof(saddr)); - saddr.sin_port = htons(epmd_port); - saddr.sin_family = AF_INET; - if (!inaddr) saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - else memmove(&saddr.sin_addr,inaddr,sizeof(saddr.sin_addr)); + port = (int) epmd_port; - if (((sd = socket(PF_INET, SOCK_STREAM, 0)) < 0)) - { - erl_errno = errno; - return -1; + if (!inaddr) { + ip_addr.s_addr = htonl(INADDR_LOOPBACK); + inaddr = &ip_addr; } + + memset((void *) &addr, 0, sizeof(struct sockaddr_in)); + memcpy((void *) &addr.sin_addr, (void *) inaddr, sizeof(addr.sin_addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); - if ((res = ei_connect_t(sd,(struct sockaddr *)&saddr,sizeof(saddr),ms)) < 0) - { - erl_errno = (res == -2) ? ETIMEDOUT : errno; - closesocket(sd); + err = ei_connect_t__(sd, (void *) &addr, sizeof(addr), tmo); + if (err) { + erl_errno = err; + ei_close__(sd); return -1; } @@ -104,6 +111,9 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, int port; int dist_high, dist_low, proto; int res; + int err; + ssize_t dlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; #if defined(VXWORKS) char ntoabuf[32]; #endif @@ -124,10 +134,14 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, return -1; } - if ((res = ei_write_fill_t(fd, buf, len+2, ms)) != len+2) { - closesocket(fd); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + dlen = len + 2; + err = ei_write_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) len + 2) + erl_errno = EIO; + if (err) { + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -1; } #ifdef VXWORKS @@ -142,12 +156,15 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, "-> PORT2_REQ alive=%s ip=%s",alive,inet_ntoa(*addr)); #endif - /* read first two bytes (response type, response) */ - if ((res = ei_read_fill_t(fd, buf, 2, ms)) != 2) { - EI_TRACE_ERR0("ei_epmd_r4_port","<- CLOSE"); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - closesocket(fd); - return -2; /* version mismatch */ + dlen = (ssize_t) 2; + err = ei_read_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) 2) + erl_errno = EIO; + if (err) { + EI_TRACE_ERR0("ei_epmd_r4_port","<- CLOSE"); + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -2; } s = buf; @@ -156,7 +173,7 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, if (res != EI_EPMD_PORT2_RESP) { /* response type */ EI_TRACE_ERR1("ei_epmd_r4_port","<- unknown (%d)",res); EI_TRACE_ERR0("ei_epmd_r4_port","-> CLOSE"); - closesocket(fd); + ei_close__(fd); erl_errno = EIO; return -1; } @@ -167,7 +184,7 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, if ((res = get8(s))) { /* got negative response */ EI_TRACE_ERR1("ei_epmd_r4_port","<- PORT2_RESP result=%d (failure)",res); - closesocket(fd); + ei_close__(fd); erl_errno = EIO; return -1; } @@ -175,14 +192,18 @@ static int ei_epmd_r4_port (struct in_addr *addr, const char *alive, EI_TRACE_CONN1("ei_epmd_r4_port","<- PORT2_RESP result=%d (ok)",res); /* expecting remaining 8 bytes */ - if ((res = ei_read_fill_t(fd,buf,8,ms)) != 8) { + dlen = (ssize_t) 8; + err = ei_read_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) 8) + err = EIO; + if (err) { EI_TRACE_ERR0("ei_epmd_r4_port","<- CLOSE"); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - closesocket(fd); + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); return -1; } - closesocket(fd); + ei_close__(fd); s = buf; port = get16be(s); diff --git a/lib/erl_interface/src/epmd/epmd_publish.c b/lib/erl_interface/src/epmd/epmd_publish.c index 47d68a6db0..20b8e867e8 100644 --- a/lib/erl_interface/src/epmd/epmd_publish.c +++ b/lib/erl_interface/src/epmd/epmd_publish.c @@ -68,8 +68,10 @@ static int ei_epmd_r4_publish (int port, const char *alive, unsigned ms) int nlen = strlen(alive); int len = elen + nlen + 13; /* hard coded: be careful! */ int n; - int res, creation; - + int err, res, creation; + ssize_t dlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; + if (len > sizeof(buf)-2) { erl_errno = ERANGE; @@ -93,29 +95,39 @@ static int ei_epmd_r4_publish (int port, const char *alive, unsigned ms) if ((fd = ei_epmd_connect_tmo(NULL,ms)) < 0) return fd; - if ((res = ei_write_fill_t(fd, buf, len+2, ms)) != len+2) { - closesocket(fd); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + dlen = (ssize_t) len+2; + err = ei_write_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) len + 2) + erl_errno = EIO; + if (err) { + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -1; } EI_TRACE_CONN6("ei_epmd_r4_publish", "-> ALIVE2_REQ alive=%s port=%d ntype=%d " "proto=%d dist-high=%d dist-low=%d", alive,port,'H',EI_MYPROTO,EI_DIST_HIGH,EI_DIST_LOW); - - if ((n = ei_read_fill_t(fd, buf, 4, ms)) != 4) { + + dlen = (ssize_t) 4; + err = ei_read_fill_t__(fd, buf, &dlen, tmo); + n = (int) dlen; + if (!err && n != 4) + err = EIO; + if (err) { EI_TRACE_ERR0("ei_epmd_r4_publish","<- CLOSE"); - closesocket(fd); - erl_errno = (n == -2) ? ETIMEDOUT : EIO; + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); return -2; /* version mismatch */ } + /* Don't close fd here! It keeps us registered with epmd */ s = buf; if (((res=get8(s)) != EI_EPMD_ALIVE2_RESP)) { /* response */ EI_TRACE_ERR1("ei_epmd_r4_publish","<- unknown (%d)",res); EI_TRACE_ERR0("ei_epmd_r4_publish","-> CLOSE"); - closesocket(fd); + ei_close__(fd); erl_errno = EIO; return -1; } @@ -124,7 +136,7 @@ static int ei_epmd_r4_publish (int port, const char *alive, unsigned ms) if (((res=get8(s)) != 0)) { /* 0 == success */ EI_TRACE_ERR1("ei_epmd_r4_publish"," result=%d (fail)",res); - closesocket(fd); + ei_close__(fd); erl_errno = EIO; return -1; } diff --git a/lib/erl_interface/src/epmd/epmd_unpublish.c b/lib/erl_interface/src/epmd/epmd_unpublish.c index 255d0ffb59..c112f74147 100644 --- a/lib/erl_interface/src/epmd/epmd_unpublish.c +++ b/lib/erl_interface/src/epmd/epmd_unpublish.c @@ -58,7 +58,9 @@ int ei_unpublish_tmo(const char *alive, unsigned ms) char buf[EPMDBUF]; char *s = (char*)buf; int len = 1 + strlen(alive); - int fd, res; + int fd, err; + ssize_t dlen; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; if (len > sizeof(buf)-3) { erl_errno = ERANGE; @@ -72,20 +74,29 @@ int ei_unpublish_tmo(const char *alive, unsigned ms) /* FIXME can't connect, return success?! At least commen whats up */ if ((fd = ei_epmd_connect_tmo(NULL,ms)) < 0) return fd; - if ((res = ei_write_fill_t(fd, buf, len+2,ms)) != len+2) { - closesocket(fd); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + dlen = (ssize_t) len+2; + err = ei_write_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) len + 2) + erl_errno = EIO; + if (err) { + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -1; } EI_TRACE_CONN1("ei_unpublish_tmo","-> STOP %s",alive); - - if ((res = ei_read_fill_t(fd, buf, 7, ms)) != 7) { - closesocket(fd); - erl_errno = (res == -2) ? ETIMEDOUT : EIO; - return -1; + + dlen = (ssize_t) 7; + err = ei_read_fill_t__(fd, buf, &dlen, tmo); + if (!err && dlen != (ssize_t) 7) + erl_errno = EIO; + if (err) { + ei_close__(fd); + EI_CONN_SAVE_ERRNO__(err); + return -1; } - closesocket(fd); + + ei_close__(fd); buf[7]=(char)0; /* terminate the string */ if (!strcmp("STOPPED",(char *)buf)) { diff --git a/lib/erl_interface/src/legacy/erl_connect.c b/lib/erl_interface/src/legacy/erl_connect.c index 7ffd545d3e..e2fd4611c0 100644 --- a/lib/erl_interface/src/legacy/erl_connect.c +++ b/lib/erl_interface/src/legacy/erl_connect.c @@ -179,15 +179,13 @@ int erl_xconnect(Erl_IpAddr addr, char *alivename) * * API: erl_close_connection() * - * Close a connection. FIXME call ei_close_connection() later. - * * Returns 0 on success and -1 on failure. * ***************************************************************************/ int erl_close_connection(int fd) { - return closesocket(fd); + return ei_close_connection(fd); } /* @@ -220,7 +218,10 @@ int erl_reg_send(int fd, char *server_name, ETERM *msg) ei_x_buff x; int r; - ei_x_new_with_version(&x); + if (ei_x_new_with_version(&x) < 0) { + erl_errno = ENOMEM; + return 0; + } if (ei_x_encode_term(&x, msg) < 0) { erl_errno = EINVAL; r = 0; diff --git a/lib/erl_interface/src/misc/ei_internal.h b/lib/erl_interface/src/misc/ei_internal.h index aa6aacd703..0c58245c0a 100644 --- a/lib/erl_interface/src/misc/ei_internal.h +++ b/lib/erl_interface/src/misc/ei_internal.h @@ -22,19 +22,20 @@ #ifndef _EI_INTERNAL_H #define _EI_INTERNAL_H +#ifdef EI_HIDE_REAL_ERRNO +# define EI_CONN_SAVE_ERRNO__(E) \ + ((E) == ETIMEDOUT ? (erl_errno = ETIMEDOUT) : (erl_errno = EIO)) +#else +# define EI_CONN_SAVE_ERRNO__(E) \ + (erl_errno = (E)) +#endif + /* * Some useful stuff not to be exported to users. */ #ifdef __WIN32__ #define MAXPATHLEN 256 -#define writesocket(sock,buf,nbyte) send(sock,buf,nbyte,0) -#define readsocket(sock,buf,nbyte) recv(sock,buf,nbyte,0) -#else /* not __WIN32__ */ -#define writesocket write -#define readsocket read -#define closesocket close -#define ioctlsocket ioctl #endif /* @@ -155,4 +156,7 @@ extern int ei_tracelevel; void ei_trace_printf(const char *name, int level, const char *format, ...); int ei_internal_use_r9_pids_ports(void); + +int ei_get_cbs_ctx__(ei_socket_callbacks **cbs, void **ctx, int fd); + #endif /* _EI_INTERNAL_H */ diff --git a/lib/erl_interface/src/misc/ei_portio.c b/lib/erl_interface/src/misc/ei_portio.c index 8cd35bf2e5..726b1af82d 100644 --- a/lib/erl_interface/src/misc/ei_portio.c +++ b/lib/erl_interface/src/misc/ei_portio.c @@ -22,6 +22,7 @@ #ifdef __WIN32__ #include <winsock2.h> #include <windows.h> +#include <winbase.h> #include <process.h> #include <stdio.h> #include <stdlib.h> @@ -35,10 +36,6 @@ static unsigned long param_one = 1; #define SET_BLOCKING(Sock) ioctlsocket((Sock),FIONBIO,¶m_zero) #define SET_NONBLOCKING(Sock) ioctlsocket((Sock),FIONBIO,¶m_one) -#define ERROR_WOULDBLOCK WSAEWOULDBLOCK -#define ERROR_TIMEDOUT WSAETIMEDOUT -#define ERROR_INPROGRESS WSAEINPROGRESS -#define GET_SOCKET_ERROR() WSAGetLastError() #define MEANS_SOCKET_ERROR(Ret) ((Ret == SOCKET_ERROR)) #define IS_INVALID_SOCKET(Sock) ((Sock) == INVALID_SOCKET) @@ -53,15 +50,16 @@ static unsigned long param_one = 1; #include <sys/types.h> #include <ioLib.h> #include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <timers.h> static unsigned long param_zero = 0; static unsigned long param_one = 1; #define SET_BLOCKING(Sock) ioctl((Sock),FIONBIO,(int)¶m_zero) #define SET_NONBLOCKING(Sock) ioctl((Sock),FIONBIO,(int)¶m_one) -#define ERROR_WOULDBLOCK EWOULDBLOCK -#define ERROR_TIMEDOUT ETIMEDOUT -#define ERROR_INPROGRESS EINPROGRESS -#define GET_SOCKET_ERROR() (errno) #define MEANS_SOCKET_ERROR(Ret) ((Ret) == ERROR) #define IS_INVALID_SOCKET(Sock) ((Sock) < 0) @@ -69,106 +67,394 @@ static unsigned long param_one = 1; #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> -#include <sys/uio.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <netdb.h> -#ifndef EWOULDBLOCK -#define ERROR_WOULDBLOCK EAGAIN -#else -#define ERROR_WOULDBLOCK EWOULDBLOCK -#endif #define SET_BLOCKING(fd) fcntl((fd), F_SETFL, \ fcntl((fd), F_GETFL, 0) & ~O_NONBLOCK) #define SET_NONBLOCKING(fd) fcntl((fd), F_SETFL, \ fcntl((fd), F_GETFL, 0) | O_NONBLOCK) -#define ERROR_TIMEDOUT ETIMEDOUT -#define ERROR_INPROGRESS EINPROGRESS -#define GET_SOCKET_ERROR() (errno) #define MEANS_SOCKET_ERROR(Ret) ((Ret) < 0) #define IS_INVALID_SOCKET(Sock) ((Sock) < 0) #endif /* common includes */ -#include "eidef.h" #include <stdio.h> #include <stdlib.h> #include <string.h> -#include "ei_portio.h" -#include "ei_internal.h" - #ifdef HAVE_SYS_TIME_H #include <sys/time.h> #else #include <time.h> #endif +#include "eidef.h" +#include "ei_portio.h" +#include "ei_internal.h" + +#ifdef __WIN32__ + +#define writesocket(sock,buf,nbyte) send(sock,buf,nbyte,0) +#define readsocket(sock,buf,nbyte) recv(sock,buf,nbyte,0) -#ifdef HAVE_WRITEV -static int ei_writev_t(int fd, struct iovec *iov, int iovcnt, unsigned ms) +static int get_error(void) { - int res; - if (ms != 0) { - fd_set writemask; - struct timeval tv; - tv.tv_sec = (time_t) (ms / 1000U); - ms %= 1000U; - tv.tv_usec = (time_t) (ms * 1000U); - FD_ZERO(&writemask); - FD_SET(fd,&writemask); - switch (select(fd+1, NULL, &writemask, NULL, &tv)) { - case -1 : - return -1; /* i/o error */ - case 0: - return -2; /* timeout */ - default: - if (!FD_ISSET(fd, &writemask)) { - return -1; /* Other error */ - } - } + switch (WSAGetLastError()) { + case WSAEWOULDBLOCK: return EWOULDBLOCK; + case WSAETIMEDOUT: return ETIMEDOUT; + case WSAEINPROGRESS: return EINPROGRESS; + case WSA_NOT_ENOUGH_MEMORY: return ENOMEM; + case WSA_INVALID_PARAMETER: return EINVAL; + case WSAEBADF: return EBADF; + case WSAEINVAL: return EINVAL; + case WSAEADDRINUSE: return EADDRINUSE; + case WSAENETUNREACH: return ENETUNREACH; + case WSAECONNABORTED: return ECONNABORTED; + case WSAECONNRESET: return ECONNRESET; + case WSAECONNREFUSED: return ECONNREFUSED; + case WSAEHOSTUNREACH: return EHOSTUNREACH; + case WSAEMFILE: return EMFILE; + case WSAEALREADY: return EALREADY; + default: return EIO; } +} + +#else /* not __WIN32__ */ + +#define writesocket write +#define readsocket read +#define closesocket close +#define ioctlsocket ioctl + +static int get_error(void) +{ + int err = errno; + if (err == 0) + return EIO; /* Make sure never to return 0 as error code... */ + return err; +} + +#endif + +int ei_plugin_socket_impl__ = 0; + +/* + * Callbacks for communication over TCP/IPv4 + */ + +static int tcp_get_fd(void *ctx, int *fd) +{ + return EI_DFLT_CTX_TO_FD__(ctx, fd); +} + +static int tcp_hs_packet_header_size(void *ctx, int *sz) +{ + int fd; + *sz = 2; + return EI_DFLT_CTX_TO_FD__(ctx, &fd); +} + +static int tcp_handshake_complete(void *ctx) +{ + int res, fd, one = 1; + + res = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (res) + return res; + + res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + res = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + return 0; +} + +static int tcp_socket(void **ctx, void *setup_ctx) +{ + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (MEANS_SOCKET_ERROR(fd)) + return get_error(); + + *ctx = EI_FD_AS_CTX__(fd); + return 0; +} + +static int tcp_close(void *ctx) +{ + int fd, res; + + res = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (res) + return res; + + res = closesocket(fd); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + return 0; +} + +static int tcp_listen(void *ctx, void *addr, int *len, int backlog) +{ + int res, fd; + socklen_t sz = (socklen_t) *len; + int on = 1; + + res = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (res) + return res; + + res = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + res = bind(fd, (struct sockaddr *) addr, sz); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + res = getsockname(fd, (struct sockaddr *) addr, (socklen_t *) &sz); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + *len = (int) sz; + + res = listen(fd, backlog); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + return 0; +} + +static int tcp_accept(void **ctx, void *addr, int *len, unsigned unused) +{ + int fd, res; + socklen_t addr_len = (socklen_t) *len; + + if (!ctx) + return EINVAL; + + res = EI_DFLT_CTX_TO_FD__(*ctx, &fd); + if (res) + return res; + + res = accept(fd, (struct sockaddr*) &addr, &addr_len); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + *len = (int) addr_len; + + *ctx = EI_FD_AS_CTX__(res); + return 0; +} + +static int tcp_connect(void *ctx, void *addr, int len, unsigned unused) +{ + int res, fd; + + res = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (res) + return res; + + res = connect(fd, (struct sockaddr *) addr, len); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + + return 0; +} + +#if defined(EI_HAVE_STRUCT_IOVEC__) && defined(HAVE_WRITEV) + +static int tcp_writev(void *ctx, const void *viov, int iovcnt, ssize_t *len, unsigned unused) +{ + const struct iovec *iov = (const struct iovec *) viov; + int fd, error; + ssize_t res; + + error = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (error) + return error; + res = writev(fd, iov, iovcnt); - return (res < 0) ? -1 : res; + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + *len = res; + return 0; +} + +#endif + +static int tcp_write(void *ctx, const char* buf, ssize_t *len, unsigned unused) +{ + int error, fd; + ssize_t res; + + error = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (error) + return error; + + res = writesocket(fd, buf, *len); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + *len = res; + return 0; } -int ei_writev_fill_t(int fd, const struct iovec *iov, int iovcnt, unsigned ms) +static int tcp_read(void *ctx, char* buf, ssize_t *len, unsigned unused) { - int i; - int done; + int error, fd; + ssize_t res; + + error = EI_DFLT_CTX_TO_FD__(ctx, &fd); + if (error) + return error; + + res = readsocket(fd, buf, *len); + if (MEANS_SOCKET_ERROR(res)) + return get_error(); + *len = res; + return 0; +} + +ei_socket_callbacks ei_default_socket_callbacks = { + 0, /* flags */ + tcp_socket, + tcp_close, + tcp_listen, + tcp_accept, + tcp_connect, +#if defined(EI_HAVE_STRUCT_IOVEC__) && defined(HAVE_WRITEV) + tcp_writev, +#else + NULL, +#endif + tcp_write, + tcp_read, + + tcp_hs_packet_header_size, + tcp_handshake_complete, + tcp_handshake_complete, + tcp_get_fd + +}; + + +/* + * + */ + +#if defined(EI_HAVE_STRUCT_IOVEC__) + +int ei_socket_callbacks_have_writev__(ei_socket_callbacks *cbs) +{ + return !!cbs->writev; +} + +static int writev_ctx_t__(ei_socket_callbacks *cbs, void *ctx, + const struct iovec *iov, int iovcnt, + ssize_t *len, + unsigned ms) +{ + int error; + + if (!(cbs->flags & EI_SCLBK_FLG_FULL_IMPL) && ms != EI_SCLBK_INF_TMO) { + int fd; + + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; + + do { + fd_set writemask; + struct timeval tv; + + tv.tv_sec = (time_t) (ms / 1000U); + ms %= 1000U; + tv.tv_usec = (time_t) (ms * 1000U); + FD_ZERO(&writemask); + FD_SET(fd,&writemask); + switch (select(fd+1, NULL, &writemask, NULL, &tv)) { + case -1 : + error = get_error(); + if (error != EINTR) + return error; + break; + case 0: + return ETIMEDOUT; /* timeout */ + default: + if (!FD_ISSET(fd, &writemask)) { + return EIO; /* Other error */ + } + error = 0; + break; + } + } while (error == EINTR); + } + do { + error = cbs->writev(ctx, (const void *) iov, iovcnt, len, ms); + } while (error == EINTR); + return error; +} + +int ei_writev_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, + const struct iovec *iov, int iovcnt, + ssize_t *len, + unsigned ms) +{ + ssize_t i, done, sum; struct iovec *iov_base = NULL; struct iovec *current_iov; int current_iovcnt; - int sum; + int fd, error; + int basic; + + if (!cbs->writev) + return ENOTSUP; + + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; + basic = !(cbs->flags & EI_SCLBK_FLG_FULL_IMPL); + for (sum = 0, i = 0; i < iovcnt; ++i) { sum += iov[i].iov_len; } - if (ms != 0U) { + if (basic && ms != 0U) { SET_NONBLOCKING(fd); } current_iovcnt = iovcnt; current_iov = (struct iovec *) iov; done = 0; for (;;) { - i = ei_writev_t(fd, current_iov, current_iovcnt, ms); - if (i <= 0) { /* ei_writev_t should always return at least 1 */ + + error = writev_ctx_t__(cbs, ctx, current_iov, current_iovcnt, &i, ms); + if (error) { + *len = done; if (ms != 0U) { SET_BLOCKING(fd); } if (iov_base != NULL) { free(iov_base); } - return (i); - } + return error; + } done += i; if (done < sum) { if (iov_base == NULL) { iov_base = malloc(sizeof(struct iovec) * iovcnt); if (iov_base == NULL) { - return -1; + *len = done; + return ENOMEM; } memcpy(iov_base, iov, sizeof(struct iovec) * iovcnt); current_iov = iov_base; @@ -189,195 +475,383 @@ int ei_writev_fill_t(int fd, const struct iovec *iov, int iovcnt, unsigned break; } } - if (ms != 0U) { + if (basic && ms != 0U) { SET_BLOCKING(fd); } if (iov_base != NULL) { free(iov_base); } - return (sum); + *len = done; + return 0; } +#endif /* defined(EI_HAVE_STRUCT_IOVEC__) */ -#endif - -int ei_connect_t(int fd, void *sinp, int sin_siz, unsigned ms) +int ei_socket_ctx__(ei_socket_callbacks *cbs, void **ctx, void *setup_ctx) { int res; - int error; - int s_res; - struct timeval tv; - fd_set writefds; - fd_set exceptfds; - - if (ms == 0) { - res = connect(fd, sinp, sin_siz); - return (res < 0) ? -1 : res; - } else { - SET_NONBLOCKING(fd); - res = connect(fd, sinp, sin_siz); - error = GET_SOCKET_ERROR(); - SET_BLOCKING(fd); - if (!MEANS_SOCKET_ERROR(res)) { - return (res < 0) ? -1 : res; - } else { - if (error != ERROR_WOULDBLOCK && - error != ERROR_INPROGRESS) { - return -1; - } else { - tv.tv_sec = (long) (ms/1000U); - ms %= 1000U; - tv.tv_usec = (long) (ms * 1000U); - FD_ZERO(&writefds); - FD_SET(fd,&writefds); - FD_ZERO(&exceptfds); - FD_SET(fd,&exceptfds); - s_res = select(fd + 1, NULL, &writefds, &exceptfds, &tv); - switch (s_res) { - case 0: - return -2; - case 1: - if (FD_ISSET(fd, &exceptfds)) { - return -1; - } else { - return 0; /* Connect completed */ - } - default: - return -1; - } - } - } - } + + do { + res = cbs->socket(ctx, setup_ctx); + } while (res == EINTR); + + return res; } -int ei_accept_t(int fd, void *addr, void *addrlen, unsigned ms) +int ei_close_ctx__(ei_socket_callbacks *cbs, void *ctx) { - int res; - if (ms != 0) { - fd_set readmask; - struct timeval tv; - tv.tv_sec = (time_t) (ms / 1000U); - ms %= 1000U; - tv.tv_usec = (time_t) (ms * 1000U); - FD_ZERO(&readmask); - FD_SET(fd,&readmask); - switch (select(fd+1, &readmask, NULL, NULL, &tv)) { - case -1 : - return -1; /* i/o error */ - case 0: - return -2; /* timeout */ - default: - if (!FD_ISSET(fd, &readmask)) { - return -1; /* Other error */ - } - } - } - res = (int) accept(fd,addr,addrlen); - return (res < 0) ? -1 : res; + return cbs->close(ctx); } + +int ei_connect_ctx_t__(ei_socket_callbacks *cbs, void *ctx, + void *addr, int len, unsigned ms) +{ + int res, fd; + + if ((cbs->flags & EI_SCLBK_FLG_FULL_IMPL) || ms == EI_SCLBK_INF_TMO) { + do { + res = cbs->connect(ctx, addr, len, ms); + } while (res == EINTR); + return res; + } + + res = EI_GET_FD__(cbs, ctx, &fd); + if (res) + return res; + SET_NONBLOCKING(fd); + do { + res = cbs->connect(ctx, addr, len, 0); + } while (res == EINTR); + SET_BLOCKING(fd); + + switch (res) { + case EINPROGRESS: + case EAGAIN: +#ifdef EWOULDBLOCK +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif +#endif + break; + default: + return res; + } + while (1) { + struct timeval tv; + fd_set writefds; + fd_set exceptfds; + + tv.tv_sec = (long) (ms/1000U); + ms %= 1000U; + tv.tv_usec = (long) (ms * 1000U); + FD_ZERO(&writefds); + FD_SET(fd,&writefds); + FD_ZERO(&exceptfds); + FD_SET(fd,&exceptfds); + res = select(fd + 1, NULL, &writefds, &exceptfds, &tv); + switch (res) { + case -1: + res = get_error(); + if (res != EINTR) + return res; + break; + case 0: + return ETIMEDOUT; + case 1: + if (!FD_ISSET(fd, &exceptfds)) + return 0; /* Connect completed */ + /* fall through... */ + default: + return EIO; + } + } +} -static int ei_read_t(int fd, char* buf, int len, unsigned ms) +int ei_listen_ctx__(ei_socket_callbacks *cbs, void *ctx, + void *adr, int *len, int backlog) { int res; - if (ms != 0) { - fd_set readmask; - struct timeval tv; - tv.tv_sec = (time_t) (ms / 1000U); - ms %= 1000U; - tv.tv_usec = (time_t) (ms * 1000U); - FD_ZERO(&readmask); - FD_SET(fd,&readmask); - switch (select(fd+1, &readmask, NULL, NULL, &tv)) { - case -1 : - return -1; /* i/o error */ - case 0: - return -2; /* timeout */ - default: - if (!FD_ISSET(fd, &readmask)) { - return -1; /* Other error */ - } - } + + do { + res = cbs->listen(ctx, adr, len, backlog); + } while (res == EINTR); + return res; +} + +int ei_accept_ctx_t__(ei_socket_callbacks *cbs, void **ctx, + void *addr, int *len, unsigned ms) +{ + int error; + + if (!(cbs->flags & EI_SCLBK_FLG_FULL_IMPL) && ms != EI_SCLBK_INF_TMO) { + int fd; + + error = EI_GET_FD__(cbs, *ctx, &fd); + if (error) + return error; + + do { + fd_set readmask; + struct timeval tv; + + tv.tv_sec = (time_t) (ms / 1000U); + ms %= 1000U; + tv.tv_usec = (time_t) (ms * 1000U); + FD_ZERO(&readmask); + FD_SET(fd,&readmask); + switch (select(fd+1, &readmask, NULL, NULL, &tv)) { + case -1 : + error = get_error(); + if (error != EINTR) + return error; + break; + case 0: + return ETIMEDOUT; /* timeout */ + default: + if (!FD_ISSET(fd, &readmask)) { + return EIO; /* Other error */ + } + error = 0; + break; + } + } while (error == EINTR); } - res = readsocket(fd, buf, len); - return (res < 0) ? -1 : res; + do { + error = cbs->accept(ctx, addr, len, ms); + } while (error == EINTR); + return error; } -static int ei_write_t(int fd, const char* buf, int len, unsigned ms) +static int read_ctx_t__(ei_socket_callbacks *cbs, void *ctx, + char* buf, ssize_t *len, unsigned ms) { - int res; - if (ms != 0) { - fd_set writemask; - struct timeval tv; - tv.tv_sec = (time_t) (ms / 1000U); - ms %= 1000U; - tv.tv_usec = (time_t) (ms * 1000U); - FD_ZERO(&writemask); - FD_SET(fd,&writemask); - switch (select(fd+1, NULL, &writemask, NULL, &tv)) { - case -1 : - return -1; /* i/o error */ - case 0: - return -2; /* timeout */ - default: - if (!FD_ISSET(fd, &writemask)) { - return -1; /* Other error */ - } - } + int error; + + if (!(cbs->flags & EI_SCLBK_FLG_FULL_IMPL) && ms != EI_SCLBK_INF_TMO) { + int fd; + + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; + + do { + fd_set readmask; + struct timeval tv; + + tv.tv_sec = (time_t) (ms / 1000U); + ms %= 1000U; + tv.tv_usec = (time_t) (ms * 1000U); + FD_ZERO(&readmask); + FD_SET(fd,&readmask); + switch (select(fd+1, &readmask, NULL, NULL, &tv)) { + case -1 : + error = get_error(); + if (error != EINTR) + return error; + break; + case 0: + return ETIMEDOUT; /* timeout */ + default: + if (!FD_ISSET(fd, &readmask)) { + return EIO; /* Other error */ + } + error = 0; + break; + } + } while (error == EINTR); + } + do { + error = cbs->read(ctx, buf, len, ms); + } while (error == EINTR); + return error; +} + +static int write_ctx_t__(ei_socket_callbacks *cbs, void *ctx, const char* buf, ssize_t *len, unsigned ms) +{ + int error; + + if (!(cbs->flags & EI_SCLBK_FLG_FULL_IMPL) && ms != EI_SCLBK_INF_TMO) { + int fd; + + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; + + do { + fd_set writemask; + struct timeval tv; + + tv.tv_sec = (time_t) (ms / 1000U); + ms %= 1000U; + tv.tv_usec = (time_t) (ms * 1000U); + FD_ZERO(&writemask); + FD_SET(fd,&writemask); + switch (select(fd+1, NULL, &writemask, NULL, &tv)) { + case -1 : + error = get_error(); + if (error != EINTR) + return error; + break; + case 0: + return ETIMEDOUT; /* timeout */ + default: + if (!FD_ISSET(fd, &writemask)) { + return EIO; /* Other error */ + } + error = 0; + break; + } + } while (error == EINTR); } - res = writesocket(fd, buf, len); - return (res < 0) ? -1 : res; + do { + error = cbs->write(ctx, buf, len, ms); + } while (error == EINTR); + return error; } /* * Fill buffer, return buffer length, 0 for EOF, < 0 (and sets errno) * for error. */ -int ei_read_fill_t(int fd, char* buf, int len, unsigned ms) +int ei_read_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, char* buf, ssize_t *len, unsigned ms) { - int i,got=0; + ssize_t got = 0; + ssize_t want = *len; do { - i = ei_read_t(fd, buf+got, len-got, ms); - if (i <= 0) - return (i); - got += i; - } while (got < len); - return (len); - + ssize_t read_len = want-got; + int error; + + do { + error = read_ctx_t__(cbs, ctx, buf+got, &read_len, ms); + } while (error == EINTR); + if (error) + return error; + if (read_len == 0) { + *len = got; + return 0; + } + got += read_len; + } while (got < want); + + *len = got; + return 0; } /* read_fill */ -int ei_read_fill(int fd, char* buf, int len) +int ei_read_fill_ctx__(ei_socket_callbacks *cbs, void *ctx, char* buf, ssize_t *len) { - return ei_read_fill_t(fd, buf, len, 0); + return ei_read_fill_ctx_t__(cbs, ctx, buf, len, 0); } /* write entire buffer on fd or fail (setting errno) */ -int ei_write_fill_t(int fd, const char *buf, int len, unsigned ms) +int ei_write_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, const char *buf, ssize_t *len, unsigned ms) { - int i,done=0; - if (ms != 0U) { + ssize_t tot = *len, done = 0; + int error, fd = -1, basic = !(cbs->flags & EI_SCLBK_FLG_FULL_IMPL); + + if (basic && ms != 0U) { + error = EI_GET_FD__(cbs, ctx, &fd); + if (error) + return error; SET_NONBLOCKING(fd); } do { - i = ei_write_t(fd, buf+done, len-done, ms); - if (i <= 0) { - if (ms != 0U) { + ssize_t write_len = tot-done; + error = write_ctx_t__(cbs, ctx, buf+done, &write_len, ms); + if (error) { + *len = done; + if (basic && ms != 0U) { SET_BLOCKING(fd); } - return (i); + return error; } - done += i; - } while (done < len); - if (ms != 0U) { + done += write_len; + } while (done < tot); + if (basic && ms != 0U) { SET_BLOCKING(fd); } - return (len); + *len = done; + return 0; +} + +int ei_write_fill_ctx__(ei_socket_callbacks *cbs, void *ctx, const char *buf, ssize_t *len) +{ + return ei_write_fill_ctx_t__(cbs, ctx, buf, len, 0); +} + +/* + * Internal API for TCP/IPv4 + */ + +int ei_connect_t__(int fd, void *addr, int len, unsigned ms) +{ + return ei_connect_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + addr, len, ms); } -int ei_write_fill(int fd, const char *buf, int len) +int ei_socket__(int *fd) { - return ei_write_fill_t(fd, buf, len, 0); + void *ctx; + int error = ei_socket_ctx__(&ei_default_socket_callbacks, &ctx, NULL); + if (error) + return error; + return EI_GET_FD__(&ei_default_socket_callbacks, ctx, fd); } +int ei_close__(int fd) +{ + return ei_close_ctx__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd)); +} + +int ei_listen__(int fd, void *adr, int *len, int backlog) +{ + return ei_listen_ctx__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + adr, len, backlog); +} + +int ei_accept_t__(int *fd, void *addr, int *len, unsigned ms) +{ + void *ctx = EI_FD_AS_CTX__(*fd); + int error = ei_accept_ctx_t__(&ei_default_socket_callbacks, &ctx, + addr, len, ms); + if (error) + return error; + return EI_GET_FD__(&ei_default_socket_callbacks, ctx, fd); +} + +int ei_read_fill_t__(int fd, char* buf, ssize_t *len, unsigned ms) +{ + return ei_read_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + buf, len, ms); +} + +int ei_read_fill__(int fd, char* buf, ssize_t *len) +{ + return ei_read_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + buf, len, 0); +} + +int ei_write_fill_t__(int fd, const char *buf, ssize_t *len, unsigned ms) +{ + return ei_write_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + buf, len, ms); +} + +int ei_write_fill__(int fd, const char *buf, ssize_t *len) +{ + return ei_write_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + buf, len, 0); +} + +#if defined(EI_HAVE_STRUCT_IOVEC__) && defined(HAVE_WRITEV) + +int ei_writev_fill_t__(int fd, const struct iovec *iov, int iovcnt, ssize_t *len, unsigned ms) +{ + return ei_writev_fill_ctx_t__(&ei_default_socket_callbacks, EI_FD_AS_CTX__(fd), + iov, iovcnt, len, ms); +} + +#endif + diff --git a/lib/erl_interface/src/misc/ei_portio.h b/lib/erl_interface/src/misc/ei_portio.h index bded811a35..a84b5ca09c 100644 --- a/lib/erl_interface/src/misc/ei_portio.h +++ b/lib/erl_interface/src/misc/ei_portio.h @@ -21,21 +21,94 @@ */ #ifndef _EI_PORTIO_H #define _EI_PORTIO_H -#if !defined(__WIN32__) && !defined(VXWORKS) -#ifdef HAVE_WRITEV + +#undef EI_HAVE_STRUCT_IOVEC__ +#if !defined(__WIN32__) && !defined(VXWORKS) && defined(HAVE_SYS_UIO_H) /* Declaration of struct iovec *iov should be visible in this scope. */ -#include <sys/uio.h> +# include <sys/uio.h> +# define EI_HAVE_STRUCT_IOVEC__ #endif + +/* + * Internal API. Should not be used outside of the erl_interface application... + */ + +int ei_socket_ctx__(ei_socket_callbacks *cbs, void **ctx, void *setup); +int ei_close_ctx__(ei_socket_callbacks *cbs, void *ctx); +int ei_listen_ctx__(ei_socket_callbacks *cbs, void *ctx, void *adr, int *len, int backlog); +int ei_accept_ctx_t__(ei_socket_callbacks *cbs, void **ctx, void *addr, int *len, unsigned ms); +int ei_connect_ctx_t__(ei_socket_callbacks *cbs, void *ctx, void *addr, int len, unsigned ms); +int ei_read_fill_ctx__(ei_socket_callbacks *cbs, void *ctx, char* buf, ssize_t *len); +int ei_write_fill_ctx__(ei_socket_callbacks *cbs, void *ctx, const char *buf, ssize_t *len); +int ei_read_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, char* buf, ssize_t *len, unsigned ms); +int ei_write_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, const char *buf, ssize_t *len, unsigned ms); +#if defined(EI_HAVE_STRUCT_IOVEC__) +int ei_writev_fill_ctx_t__(ei_socket_callbacks *cbs, void *ctx, const struct iovec *iov, int iovcnt, ssize_t *len, unsigned ms); +int ei_socket_callbacks_have_writev__(ei_socket_callbacks *cbs); #endif -int ei_accept_t(int fd, void *addr, void *addrlen, unsigned ms); -int ei_connect_t(int fd, void *sinp, int sin_siz, unsigned ms); -int ei_read_fill(int fd, char* buf, int len); -int ei_write_fill(int fd, const char *buf, int len); -int ei_read_fill_t(int fd, char* buf, int len, unsigned ms); -int ei_write_fill_t(int fd, const char *buf, int len, unsigned ms); -#ifdef HAVE_WRITEV -int ei_writev_fill_t(int fd, const struct iovec *iov, int iovcnt, unsigned ms); +ei_socket_callbacks ei_default_socket_callbacks; + +#define EI_FD_AS_CTX__(FD) \ + ((void *) (long) (FD)) + +#define EI_DFLT_CTX_TO_FD__(CTX, FD) \ + ((int) (long) (CTX) < 0 \ + ? EBADF \ + : (*(FD) = (int) (long) (CTX), 0)) + +#define EI_GET_FD__(CBS, CTX, FD) \ + ((CBS) == &ei_default_socket_callbacks \ + ? EI_DFLT_CTX_TO_FD__((CTX), FD) \ + : (CBS)->get_fd((CTX), (FD))) + +extern int ei_plugin_socket_impl__; + +#if !defined(_REENTRANT) + +#define EI_HAVE_PLUGIN_SOCKET_IMPL__ \ + ei_plugin_socket_impl__ +#define EI_SET_HAVE_PLUGIN_SOCKET_IMPL__ \ + ei_plugin_socket_impl__ = 1 + +#elif ((ETHR_HAVE___atomic_load_n & SIZEOF_INT) \ + && (ETHR_HAVE___atomic_store_n & SIZEOF_INT)) + +#define EI_HAVE_PLUGIN_SOCKET_IMPL__ \ + __atomic_load_n(&ei_plugin_socket_impl__, __ATOMIC_ACQUIRE) +#define EI_SET_HAVE_PLUGIN_SOCKET_IMPL__ \ + __atomic_store_n(&ei_plugin_socket_impl__, 1, __ATOMIC_RELEASE) + +#else + +/* No gcc atomics; always lookup using ei_get_cbs_ctx()... */ +#define EI_HAVE_PLUGIN_SOCKET_IMPL__ 0 +#define EI_SET_HAVE_PLUGIN_SOCKET_IMPL__ (void) 0 + +#endif + +#define EI_GET_CBS_CTX__(CBS, CTX, FD) \ + (EI_HAVE_PLUGIN_SOCKET_IMPL__ \ + ? ei_get_cbs_ctx__((CBS), (CTX), (FD)) \ + : ((FD) < 0 \ + ? EBADF \ + : (*(CBS) = &ei_default_socket_callbacks, \ + *(CTX) = EI_FD_AS_CTX__((FD)), \ + 0))) +/* + * The following uses our own TCP/IPv4 socket implementation... + */ +int ei_socket__(int *fd); +int ei_close__(int fd); +int ei_listen__(int fd, void *adr, int *len, int backlog); +int ei_accept_t__(int *fd, void *addr, int *len, unsigned ms); +int ei_connect_t__(int fd, void *addr, int len, unsigned ms); +int ei_read_fill__(int fd, char* buf, ssize_t *len); +int ei_write_fill__(int fd, const char *buf, ssize_t *len); +int ei_read_fill_t__(int fd, char* buf, ssize_t *len, unsigned ms); +int ei_write_fill_t__(int fd, const char *buf, ssize_t *len, unsigned ms); +#if defined(EI_HAVE_STRUCT_IOVEC__) && defined(HAVE_WRITEV) +int ei_writev_fill_t__(int fd, const struct iovec *iov, int iovcnt, ssize_t *len, unsigned ms); #endif #endif /* _EI_PORTIO_H */ diff --git a/lib/erl_interface/src/not_used/send_link.c b/lib/erl_interface/src/not_used/send_link.c index 7be476fd93..38fae27df4 100644 --- a/lib/erl_interface/src/not_used/send_link.c +++ b/lib/erl_interface/src/not_used/send_link.c @@ -50,6 +50,7 @@ static int link_unlink(int fd, const erlang_pid *from, const erlang_pid *to, char *s; int index = 0; int n; + unsigned tmo = ms == 0 ? EI_SCLBK_INF_TMO : ms; index = 5; /* max sizes: */ ei_encode_version(msgbuf,&index); /* 1 */ @@ -69,7 +70,7 @@ static int link_unlink(int fd, const erlang_pid *from, const erlang_pid *to, if (ei_trace_distribution > 1) ei_show_sendmsg(stderr,msgbuf,NULL); #endif - n = ei_write_fill_t(fd,msgbuf,index,ms); + n = ei_write_fill_t__(fd,msgbuf,index,tmo); return (n==index ? 0 : -1); } diff --git a/lib/erl_interface/test/ei_accept_SUITE.erl b/lib/erl_interface/test/ei_accept_SUITE.erl index 78a433d21b..9c9c3f86b6 100644 --- a/lib/erl_interface/test/ei_accept_SUITE.erl +++ b/lib/erl_interface/test/ei_accept_SUITE.erl @@ -81,12 +81,10 @@ ei_accept(Config) when is_list(Config) -> ei_threaded_accept(Config) when is_list(Config) -> Einode = filename:join(proplists:get_value(data_dir, Config), "eiaccnode"), - N = 1, % 3, + N = 3, Host = atom_to_list(node()), - Port = 6767, - start_einode(Einode, N, Host, Port), + start_einode(Einode, N, Host), io:format("started eiaccnode"), - %%spawn_link(fun() -> start_einode(Einode, N, Host, Port) end), TestServerPid = self(), [spawn_link(fun() -> send_rec_einode(I, TestServerPid) end) || I <- lists:seq(0, N-1)], [receive I -> ok end || I <- lists:seq(0, N-1) ], @@ -159,10 +157,9 @@ send_rec_einode(N, TestServerPid) -> ct:fail(EINode) end. -start_einode(Einode, N, Host, Port) -> +start_einode(Einode, N, Host) -> Einodecmd = Einode ++ " " ++ atom_to_list(erlang:get_cookie()) - ++ " " ++ integer_to_list(N) ++ " " ++ Host ++ " " - ++ integer_to_list(Port) ++ " nothreads", + ++ " " ++ integer_to_list(N) ++ " " ++ Host, io:format("Einodecmd ~p ~n", [Einodecmd]), open_port({spawn, Einodecmd}, []), ok. diff --git a/lib/erl_interface/test/ei_accept_SUITE_data/ei_accept_test.c b/lib/erl_interface/test/ei_accept_SUITE_data/ei_accept_test.c index 50df848b69..f41d741609 100644 --- a/lib/erl_interface/test/ei_accept_SUITE_data/ei_accept_test.c +++ b/lib/erl_interface/test/ei_accept_SUITE_data/ei_accept_test.c @@ -125,45 +125,26 @@ static void cmd_ei_connect_init(char* buf, int len) ei_x_free(&res); } -static int my_listen(int port) -{ - int listen_fd; - struct sockaddr_in addr; - const char *on = "1"; - - if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) - return -1; - - setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, on, sizeof(on)); - - memset((void*) &addr, 0, (size_t) sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if (bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)) < 0) - return -1; - - listen(listen_fd, 5); - return listen_fd; -} - static void cmd_ei_publish(char* buf, int len) { int index = 0; - int listen, r; - long port; + int iport, lfd, r; + long lport; ei_x_buff x; int i; /* get port */ - if (ei_decode_long(buf, &index, &port) < 0) + if (ei_decode_long(buf, &index, &lport) < 0) fail("expected int (port)"); /* Make a listen socket */ - if ((listen = my_listen(port)) <= 0) + + iport = (int) lport; + lfd = ei_listen(&ec, &iport, 5); + if (lfd < 0) fail("listen"); + lport = (long) iport; - if ((i = ei_publish(&ec, port)) == -1) + if ((i = ei_publish(&ec, lport)) == -1) fail("ei_publish"); #ifdef VXWORKS save_fd(i); @@ -171,7 +152,7 @@ static void cmd_ei_publish(char* buf, int len) /* send listen-fd, result and errno */ ei_x_new_with_version(&x); ei_x_encode_tuple_header(&x, 3); - ei_x_encode_long(&x, listen); + ei_x_encode_long(&x, (long) lfd); ei_x_encode_long(&x, i); ei_x_encode_long(&x, erl_errno); send_bin_term(&x); diff --git a/lib/erl_interface/test/ei_accept_SUITE_data/eiaccnode.c b/lib/erl_interface/test/ei_accept_SUITE_data/eiaccnode.c index 308f843530..c850d20f3c 100644 --- a/lib/erl_interface/test/ei_accept_SUITE_data/eiaccnode.c +++ b/lib/erl_interface/test/ei_accept_SUITE_data/eiaccnode.c @@ -47,8 +47,6 @@ #define MAIN main #endif -static int my_listen(int port); - /* A small einode. To be called from the test case ei_accept_SUITE:multi_thread @@ -64,7 +62,6 @@ static int my_listen(int port); */ static const char* cookie, * desthost; -static int port; /* actually base port */ #ifndef SD_SEND #ifdef SHUTWR @@ -74,10 +71,6 @@ static int port; /* actually base port */ #endif #endif -#ifndef __WIN32__ -#define closesocket(fd) close(fd) -#endif - #ifdef __WIN32__ static DWORD WINAPI #else @@ -86,26 +79,32 @@ static void* einode_thread(void* num) { int n = (int)num; + int port; ei_cnode ec; - char myname[100], destname[100]; + char myname[100], destname[100], filename[100]; int r, fd, listen; ErlConnect conn; erlang_msg msg; -/* FILE* f;*/ + FILE* file; - sprintf(myname, "eiacc%d", n); - printf("thread %d (%s) listening\n", n, myname, destname); + sprintf(filename, "eiacc%d_trace.txt", n); + file = fopen(filename, "w"); + + sprintf(myname, "eiacc%d", n); fflush(file); r = ei_connect_init(&ec, myname, cookie, 0); - if ((listen = my_listen(port+n)) <= 0) { - printf("listen err\n"); + port = 0; + listen = ei_listen(&ec, &port, 5); + if (listen <= 0) { + fprintf(file, "listen err\n"); fflush(file); exit(7); } - if (ei_publish(&ec, port + n) == -1) { - printf("ei_publish port %d\n", port+n); + fprintf(file, "thread %d (%s:%s) listening on port %d\n", n, myname, destname, port); + if (ei_publish(&ec, port) == -1) { + fprintf(file, "ei_publish port %d\n", port+n); fflush(file); exit(8); } fd = ei_accept(&ec, listen, &conn); - printf("ei_accept %d\n", fd); + fprintf(file, "ei_accept %d\n", fd); fflush(file); if (fd >= 0) { ei_x_buff x, xs; int index, version; @@ -117,37 +116,38 @@ static void* if (got == ERL_TICK) continue; if (got == ERL_ERROR) { - printf("receive error %d\n", n); + fprintf(file, "receive error %d\n", n); fflush(file); return 0; } - printf("received %d\n", got); + fprintf(file, "received %d\n", got); fflush(file); break; } index = 0; if (ei_decode_version(x.buff, &index, &version) != 0) { - printf("ei_decode_version %d\n", n); + fprintf(file, "ei_decode_version %d\n", n); fflush(file); return 0; } if (ei_decode_pid(x.buff, &index, &pid) != 0) { - printf("ei_decode_pid %d\n", n); + fprintf(file, "ei_decode_pid %d\n", n); fflush(file); return 0; } -/* fprintf(f, "got pid from %s \n", pid.node);*/ + fprintf(file, "got pid from %s \n", pid.node); fflush(file); ei_x_new_with_version(&xs); ei_x_encode_tuple_header(&xs, 2); ei_x_encode_long(&xs, n); ei_x_encode_pid(&xs, &pid); r = ei_send(fd, &pid, xs.buff, xs.index); -/* fprintf(f, "sent %d bytes %d\n", xs.index, r);*/ + fprintf(file, "sent %d bytes %d\n", xs.index, r); fflush(file); shutdown(fd, SD_SEND); - closesocket(fd); + ei_close_connection(fd); ei_x_free(&x); ei_x_free(&xs); } else { - printf("coudn't connect fd %d r %d\n", fd, r); + fprintf(file, "coudn't connect fd %d r %d\n", fd, r); fflush(file); } - printf("done thread %d\n", n); -/* fclose(f);*/ + ei_close_connection(listen); + fprintf(file, "done thread %d\n", n); + fclose(file); return 0; } @@ -170,12 +170,14 @@ MAIN(int argc, char *argv[]) if (n > 100) exit(2); desthost = argv[3]; - port = atoi(argv[4]); -#ifndef VXWORKS - no_threads = argv[5] != NULL && strcmp(argv[5], "nothreads") == 0; -#else + if (argc == 3) + no_threads = 0; + else + no_threads = argv[4] != NULL && strcmp(argv[4], "nothreads") == 0; +#ifdef VXWORKS no_threads = 1; #endif + for (i = 0; i < n; ++i) { if (!no_threads) { #ifndef VXWORKS @@ -209,27 +211,3 @@ MAIN(int argc, char *argv[]) printf("ok\n"); return 0; } - -static int my_listen(int port) -{ - int listen_fd; - struct sockaddr_in addr; - const char *on = "1"; - - if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) - return -1; - - setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, on, sizeof(on)); - - memset((void*) &addr, 0, (size_t) sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if (bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)) < 0) - return -1; - - listen(listen_fd, 5); - return listen_fd; -} - diff --git a/lib/erl_interface/test/erl_eterm_SUITE_data/cnode.c b/lib/erl_interface/test/erl_eterm_SUITE_data/cnode.c index bead0f8413..b87feb9dfc 100644 --- a/lib/erl_interface/test/erl_eterm_SUITE_data/cnode.c +++ b/lib/erl_interface/test/erl_eterm_SUITE_data/cnode.c @@ -20,7 +20,7 @@ #include <stdlib.h> #include <stdio.h> - +#include <string.h> #include "ei.h" #include "erl_interface.h" @@ -68,6 +68,7 @@ MAIN(int argc, char **argv) char host[80]; int number; ETERM *ref, *ref1, *ref2; + FILE *dfile = fopen("cnode_debug_printout", "w"); erl_init(NULL, 0); @@ -80,28 +81,30 @@ MAIN(int argc, char **argv) gethostname(host, sizeof(host)); sprintf(node, "c%d@%s", number, host); - printf("s = %d\n", s); + fprintf(dfile, "s = %d\n", s); fflush(dfile); sprintf(server, "test_server@%s", host); fd = erl_connect(server); - printf("fd = %d\n", fd); + fprintf(dfile, "fd = %d\n", fd); -/* printf("dist = %d\n", erl_distversion(fd)); */ +/* fprintf(dfile, "dist = %d\n", erl_distversion(fd)); */ #if 1 ref = erl_mk_long_ref(node, 4711, 113, 98, 0); #else ref = erl_mk_ref(node, 4711, 0); #endif - printf("ref = %d\n", ref); + fprintf(dfile, "ref = %p\n", ref); fflush(dfile); s = erl_reg_send(fd, "mip", ref); - printf("s = %d\n", s); + fprintf(dfile, "s = %d\n", s); fflush(dfile); { ETERM* emsg; emsg = SELF(fd); - erl_reg_send(fd,"mip",emsg); + fprintf(dfile, "pid = %p\n", emsg); fflush(dfile); + s = erl_reg_send(fd,"mip",emsg); + fprintf(dfile, "s2 = %d\n", s); fflush(dfile); erl_free_term(emsg); } @@ -116,28 +119,29 @@ MAIN(int argc, char **argv) #endif switch (s) { case ERL_TICK: - printf("tick\n"); + fprintf(dfile, "tick\n"); break; case ERL_ERROR: - printf("error\n"); + fprintf(dfile, "error: %s (%d)\n", strerror(erl_errno), erl_errno); break; case ERL_MSG: - printf("msg %d\n", msgsize); + fprintf(dfile, "msg %d\n", msgsize); break; default: - printf("unknown result %d\n", s); + fprintf(dfile, "unknown result %d\n", s); break; } + fflush(dfile); } while (s == ERL_TICK); s = erl_reg_send(fd, "mip", msg.msg); - printf("s = %d\n", s); + fprintf(dfile, "s = %d\n", s); fflush(dfile); s = erl_reg_send(fd, "mip", msg.to); - printf("s = %d\n", s); + fprintf(dfile, "s = %d\n", s); fflush(dfile); #if 0 /* from = NULL! */ s = erl_reg_send(fd, "mip", msg.from); - printf("s = %d\n", s); + fprintf(dfile, "s = %d\n", s); fflush(dfile); #endif #if 0 @@ -150,17 +154,19 @@ MAIN(int argc, char **argv) ref1 = erl_mk_long_ref(node, 4711, 113, 98, 0); ref2 = erl_mk_ref(node, 4711, 0); s = erl_encode(ref1, buf1); - printf("enc1 s = %d\n", s); + fprintf(dfile, "enc1 s = %d\n", s); fflush(dfile); s = erl_encode(ref2, buf2); - printf("enc2 s = %d\n", s); + fprintf(dfile, "enc2 s = %d\n", s); fflush(dfile); s = erl_compare_ext(buf1, buf2); - printf("comp s = %d\n", s); + fprintf(dfile, "comp s = %d\n", s); fflush(dfile); /* Compare, in another way */ s = erl_match(ref1, ref2); - printf("match s = %d\n", s); + fprintf(dfile, "match s = %d\n", s); fflush(dfile); #endif + fclose(dfile); + erl_close_connection(fd); return 0; diff --git a/lib/inets/src/http_client/httpc_handler.erl b/lib/inets/src/http_client/httpc_handler.erl index 1bf5d25c98..8b356d8026 100644 --- a/lib/inets/src/http_client/httpc_handler.erl +++ b/lib/inets/src/http_client/httpc_handler.erl @@ -805,11 +805,12 @@ handle_unix_socket_options(#request{unix_socket = UnixSocket}, error({badarg, [{ipfamily, Else}, {unix_socket, UnixSocket}]}) end. -connect_and_send_first_request(Address, Request, #state{options = Options0} = State) -> +connect_and_send_first_request(Address, #request{ipv6_host_with_brackets = HasBrackets} = Request, + #state{options = Options0} = State) -> SocketType = socket_type(Request), ConnTimeout = (Request#request.settings)#http_options.connect_timeout, Options = handle_unix_socket_options(Request, Options0), - case connect(SocketType, Address, Options, ConnTimeout) of + case connect(SocketType, format_address(Address, HasBrackets), Options, ConnTimeout) of {ok, Socket} -> ClientClose = httpc_request:is_client_closing( @@ -1739,3 +1740,8 @@ update_session(ProfileName, #session{id = SessionId} = Session, Pos, Value) -> end. +format_address({Host, Port}, true) when is_list(Host)-> + {ok, Address} = inet:parse_address(string:strip(string:strip(Host, right, $]), left, $[)), + {Address, Port}; +format_address(HostPort, _) -> + HostPort. diff --git a/lib/kernel/doc/src/gen_sctp.xml b/lib/kernel/doc/src/gen_sctp.xml index 1e7009b3a8..f70d6c24db 100644 --- a/lib/kernel/doc/src/gen_sctp.xml +++ b/lib/kernel/doc/src/gen_sctp.xml @@ -284,7 +284,7 @@ connect(Socket, Ip, Port>, <func> <name name="listen" arity="2" clause_i="1" since=""/> - <name name="listen" arity="2" clause_i="2" since=""/> + <name name="listen" arity="2" clause_i="2" since="OTP R15B"/> <fsummary>Set up a socket to listen.</fsummary> <desc> <p>Sets up a socket to listen on the IP address and port number diff --git a/lib/kernel/test/gen_tcp_misc_SUITE.erl b/lib/kernel/test/gen_tcp_misc_SUITE.erl index 244bd7e2a0..52edfaee29 100644 --- a/lib/kernel/test/gen_tcp_misc_SUITE.erl +++ b/lib/kernel/test/gen_tcp_misc_SUITE.erl @@ -53,7 +53,7 @@ active_once_closed/1, send_timeout/1, send_timeout_active/1, otp_7731/1, zombie_sockets/1, otp_7816/1, otp_8102/1, wrapping_oct/0, wrapping_oct/1, otp_9389/1, otp_13939/1, - otp_12242/1]). + otp_12242/1, delay_send_error/1]). %% Internal exports. -export([sender/3, not_owner/1, passive_sockets_server/2, priority_server/1, @@ -97,7 +97,7 @@ all() -> active_once_closed, send_timeout, send_timeout_active, otp_7731, wrapping_oct, zombie_sockets, otp_7816, otp_8102, otp_9389, - otp_12242]. + otp_12242, delay_send_error]. groups() -> []. @@ -3427,3 +3427,32 @@ otp_12242(Addr) when tuple_size(Addr) =:= 4 -> wait(Mref) -> receive {'DOWN',Mref,_,_,Reason} -> Reason end. + +%% OTP-15536 +%% Test that send error works correctly for delay_send +delay_send_error(Config) -> + {ok, LS} = gen_tcp:listen(0, [{reuseaddr, true}, {packet, 1}, {active, false}]), + {ok,{{0,0,0,0},PortNum}}=inet:sockname(LS), + P = spawn_link( + fun() -> + {ok, S} = gen_tcp:accept(LS), + receive die -> gen_tcp:close(S) end + end), + erlang:monitor(process, P), + {ok, S} = gen_tcp:connect("localhost", PortNum, + [{packet, 1}, {active, false}, {delay_send, true}]), + + %% Do a couple of sends first to see that it works + ok = gen_tcp:send(S, "hello"), + ok = gen_tcp:send(S, "hello"), + ok = gen_tcp:send(S, "hello"), + + %% Make the receiver close + P ! die, + receive _Down -> ok end, + + ok = gen_tcp:send(S, "hello"), + timer:sleep(500), %% Sleep in order for delay_send to have time to trigger + + %% This used to result in a double free + {error, closed} = gen_tcp:send(S, "hello"). diff --git a/lib/mnesia/doc/src/mnesia.xml b/lib/mnesia/doc/src/mnesia.xml index 94f1af34bf..11b0b8e987 100644 --- a/lib/mnesia/doc/src/mnesia.xml +++ b/lib/mnesia/doc/src/mnesia.xml @@ -2077,6 +2077,13 @@ mnesia:create_table(employee, <fsummary>Starts a local Mnesia system.</fsummary> <desc> <marker id="start"></marker> + <p>Mnesia startup is asynchronous. The function call + <c>mnesia:start()</c> returns the atom <c>ok</c> and then + starts to initialize the different tables. Depending on the + size of the database, this can take some time, and the + application programmer must wait for the tables that the + application needs before they can be used. This is achieved + by using the function <c>mnesia:wait_for_tables/2</c>.</p> <p>The startup procedure for a set of Mnesia nodes is a fairly complicated operation. A Mnesia system consists of a set of nodes, with Mnesia started locally on all diff --git a/lib/ssl/src/dtls_connection.erl b/lib/ssl/src/dtls_connection.erl index 2583667fa2..855cd0d123 100644 --- a/lib/ssl/src/dtls_connection.erl +++ b/lib/ssl/src/dtls_connection.erl @@ -107,9 +107,11 @@ pids(_) -> %%==================================================================== %% State transition handling %%==================================================================== -next_record(#state{unprocessed_handshake_events = N} = State) when N > 0 -> - {no_record, State#state{unprocessed_handshake_events = N-1}}; - +next_record(#state{handshake_env = + #handshake_env{unprocessed_handshake_events = N} = HsEnv} + = State) when N > 0 -> + {no_record, State#state{handshake_env = + HsEnv#handshake_env{unprocessed_handshake_events = N-1}}}; next_record(#state{protocol_buffers = #protocol_buffers{dtls_cipher_texts = [#ssl_tls{epoch = Epoch} = CT | Rest]} = Buffers, @@ -249,19 +251,22 @@ handle_protocol_record(#ssl_tls{type = ?HANDSHAKE, fragment = Data}, StateName, #state{protocol_buffers = Buffers0, - negotiated_version = Version} = State0) -> + negotiated_version = Version} = State) -> try case dtls_handshake:get_dtls_handshake(Version, Data, Buffers0) of {[], Buffers} -> - next_event(StateName, no_record, State0#state{protocol_buffers = Buffers}); + next_event(StateName, no_record, State#state{protocol_buffers = Buffers}); {Packets, Buffers} -> - State = State0#state{protocol_buffers = Buffers}, + HsEnv = State#state.handshake_env, Events = dtls_handshake_events(Packets), {next_state, StateName, - State#state{unprocessed_handshake_events = unprocessed_events(Events)}, Events} + State#state{protocol_buffers = Buffers, + handshake_env = + HsEnv#handshake_env{unprocessed_handshake_events + = unprocessed_events(Events)}}, Events} end catch throw:#alert{} = Alert -> - handle_own_alert(Alert, Version, StateName, State0) + handle_own_alert(Alert, Version, StateName, State) end; %%% DTLS record protocol level change cipher messages handle_protocol_record(#ssl_tls{type = ?CHANGE_CIPHER_SPEC, fragment = Data}, StateName, State) -> @@ -299,7 +304,7 @@ send_handshake(Handshake, #state{connection_states = ConnectionStates} = State) #{epoch := Epoch} = ssl_record:current_connection_state(ConnectionStates, write), send_handshake_flight(queue_handshake(Handshake, State), Epoch). -queue_handshake(Handshake0, #state{tls_handshake_history = Hist0, +queue_handshake(Handshake0, #state{handshake_env = #handshake_env{tls_handshake_history = Hist0} = HsEnv, negotiated_version = Version, flight_buffer = #{handshakes := HsBuffer0, change_cipher_spec := undefined, @@ -308,9 +313,9 @@ queue_handshake(Handshake0, #state{tls_handshake_history = Hist0, Hist = update_handshake_history(Handshake0, Handshake, Hist0), State#state{flight_buffer = Flight0#{handshakes => [Handshake | HsBuffer0], next_sequence => Seq +1}, - tls_handshake_history = Hist}; + handshake_env = HsEnv#handshake_env{tls_handshake_history = Hist}}; -queue_handshake(Handshake0, #state{tls_handshake_history = Hist0, +queue_handshake(Handshake0, #state{handshake_env = #handshake_env{tls_handshake_history = Hist0} = HsEnv, negotiated_version = Version, flight_buffer = #{handshakes_after_change_cipher_spec := Buffer0, next_sequence := Seq} = Flight0} = State) -> @@ -318,7 +323,7 @@ queue_handshake(Handshake0, #state{tls_handshake_history = Hist0, Hist = update_handshake_history(Handshake0, Handshake, Hist0), State#state{flight_buffer = Flight0#{handshakes_after_change_cipher_spec => [Handshake | Buffer0], next_sequence => Seq +1}, - tls_handshake_history = Hist}. + handshake_env = HsEnv#handshake_env{tls_handshake_history = Hist}}. queue_change_cipher(ChangeCipher, #state{flight_buffer = Flight, connection_states = ConnectionStates0} = State) -> @@ -330,10 +335,11 @@ queue_change_cipher(ChangeCipher, #state{flight_buffer = Flight, reinit(State) -> %% To be API compatible with TLS NOOP here reinit_handshake_data(State). -reinit_handshake_data(#state{protocol_buffers = Buffers} = State) -> +reinit_handshake_data(#state{protocol_buffers = Buffers, + handshake_env = HsEnv} = State) -> State#state{premaster_secret = undefined, public_key_info = undefined, - tls_handshake_history = ssl_handshake:init_handshake_history(), + handshake_env = HsEnv#handshake_env{tls_handshake_history = ssl_handshake:init_handshake_history()}, flight_state = {retransmit, ?INITIAL_RETRANSMIT_TIMEOUT}, flight_buffer = new_flight(), protocol_buffers = @@ -417,10 +423,10 @@ init({call, From}, {start, Timeout}, role = client, session_cache = Cache, session_cache_cb = CacheCb}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, _}}, ssl_options = SslOpts, session = #session{own_certificate = Cert} = Session0, - connection_states = ConnectionStates0, - renegotiation = {Renegotiation, _} + connection_states = ConnectionStates0 } = State0) -> Timer = ssl_connection:start_or_recv_cancel_timer(Timeout, From), Hello = dtls_handshake:client_hello(Host, Port, ConnectionStates0, SslOpts, @@ -487,6 +493,7 @@ hello(internal, #client_hello{cookie = <<>>, #state{static_env = #static_env{role = server, transport_cb = Transport, socket = Socket}, + handshake_env = HsEnv, protocol_specific = #{current_cookie_secret := Secret}} = State0) -> {ok, {IP, Port}} = dtls_socket:peername(Transport, Socket), Cookie = dtls_handshake:cookie(Secret, IP, Port, Hello), @@ -500,24 +507,30 @@ hello(internal, #client_hello{cookie = <<>>, State1 = prepare_flight(State0#state{negotiated_version = Version}), {State2, Actions} = send_handshake(VerifyRequest, State1), {Record, State} = next_record(State2), - next_event(?FUNCTION_NAME, Record, State#state{tls_handshake_history = ssl_handshake:init_handshake_history()}, Actions); + next_event(?FUNCTION_NAME, Record, + State#state{handshake_env = HsEnv#handshake_env{ + tls_handshake_history = + ssl_handshake:init_handshake_history()}}, + Actions); hello(internal, #hello_verify_request{cookie = Cookie}, #state{static_env = #static_env{role = client, host = Host, port = Port, session_cache = Cache, session_cache_cb = CacheCb}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, _}} = HsEnv, ssl_options = SslOpts, session = #session{own_certificate = OwnCert} = Session0, - connection_states = ConnectionStates0, - renegotiation = {Renegotiation, _} + connection_states = ConnectionStates0 } = State0) -> Hello = dtls_handshake:client_hello(Host, Port, Cookie, ConnectionStates0, SslOpts, Cache, CacheCb, Renegotiation, OwnCert), Version = Hello#client_hello.client_version, - State1 = prepare_flight(State0#state{tls_handshake_history = ssl_handshake:init_handshake_history()}), + State1 = prepare_flight(State0#state{handshake_env = + HsEnv#handshake_env{tls_handshake_history + = ssl_handshake:init_handshake_history()}}), {State2, Actions} = send_handshake(Hello, State1), State = State2#state{negotiated_version = Version, %% Requested version @@ -560,9 +573,9 @@ hello(internal, #client_hello{cookie = Cookie} = Hello, #state{static_env = #sta hello(internal, #server_hello{} = Hello, #state{ static_env = #static_env{role = client}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, _}}, connection_states = ConnectionStates0, negotiated_version = ReqVersion, - renegotiation = {Renegotiation, _}, ssl_options = SslOptions} = State) -> case dtls_handshake:hello(Hello, SslOptions, ConnectionStates0, Renegotiation) of #alert{} = Alert -> @@ -676,11 +689,12 @@ connection(internal, #hello_request{}, #state{static_env = #static_env{host = Ho session_cache = Cache, session_cache_cb = CacheCb }, + handshake_env = #handshake_env{ renegotiation = {Renegotiation, _}}, session = #session{own_certificate = Cert} = Session0, ssl_options = SslOpts, - connection_states = ConnectionStates0, - renegotiation = {Renegotiation, _}} = State0) -> + connection_states = ConnectionStates0 + } = State0) -> Hello = dtls_handshake:client_hello(Host, Port, ConnectionStates0, SslOpts, Cache, CacheCb, Renegotiation, Cert), @@ -702,7 +716,8 @@ connection(internal, #client_hello{} = Hello, #state{static_env = #static_env{ro %% initiated renegotiation we will disallow many client initiated %% renegotiations immediately after each other. erlang:send_after(?WAIT_TO_ALLOW_RENEGOTIATION, self(), allow_renegotiate), - {next_state, hello, State#state{allow_renegotiate = false, renegotiation = {true, peer}}, + {next_state, hello, State#state{allow_renegotiate = false, + handshake_env = #handshake_env{renegotiation = {true, peer}}}, [{next_event, internal, Hello}]}; connection(internal, #client_hello{}, #state{static_env = #static_env{role = server}, allow_renegotiate = false} = State0) -> @@ -774,6 +789,10 @@ initial_state(Role, Host, Port, Socket, {SSLOptions, SocketOptions, _}, User, }, #state{static_env = InitStatEnv, + handshake_env = #handshake_env{ + tls_handshake_history = ssl_handshake:init_handshake_history(), + renegotiation = {false, first} + }, socket_options = SocketOptions, %% We do not want to save the password in the state so that %% could be written in the clear into error logs. @@ -783,7 +802,6 @@ initial_state(Role, Host, Port, Socket, {SSLOptions, SocketOptions, _}, User, protocol_buffers = #protocol_buffers{}, user_application = {Monitor, User}, user_data_buffer = <<>>, - renegotiation = {false, first}, allow_renegotiate = SSLOptions#ssl_options.client_renegotiation, start_or_recv_from = undefined, flight_buffer = new_flight(), @@ -836,9 +854,8 @@ handle_client_hello(#client_hello{client_version = ClientVersion} = Hello, static_env = #static_env{port = Port, session_cache = Cache, session_cache_cb = CacheCb}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, _}} = HsEnv, session = #session{own_certificate = Cert} = Session0, - renegotiation = {Renegotiation, _}, - negotiated_protocol = CurrentProtocol, key_algorithm = KeyExAlg, ssl_options = SslOpts} = State0) -> @@ -857,7 +874,7 @@ handle_client_hello(#client_hello{client_version = ClientVersion} = Hello, State = prepare_flight(State0#state{connection_states = ConnectionStates, negotiated_version = Version, hashsign_algorithm = HashSign, - client_hello_version = ClientVersion, + handshake_env = HsEnv#handshake_env{client_hello_version = ClientVersion}, session = Session, negotiated_protocol = Protocol}), @@ -1146,13 +1163,14 @@ send_application_data(Data, From, _StateName, #state{static_env = #static_env{socket = Socket, protocol_cb = Connection, transport_cb = Transport}, + handshake_env = HsEnv, negotiated_version = Version, connection_states = ConnectionStates0, ssl_options = #ssl_options{renegotiate_at = RenegotiateAt}} = State0) -> case time_to_renegotiate(Data, ConnectionStates0, RenegotiateAt) of true -> - renegotiate(State0#state{renegotiation = {true, internal}}, + renegotiate(State0#state{handshake_env = HsEnv#handshake_env{renegotiation = {true, internal}}}, [{next_event, {call, From}, {application_data, Data}}]); false -> {Msgs, ConnectionStates} = diff --git a/lib/ssl/src/ssl_connection.erl b/lib/ssl/src/ssl_connection.erl index 7d7da2dcec..2bfa9a52cd 100644 --- a/lib/ssl/src/ssl_connection.erl +++ b/lib/ssl/src/ssl_connection.erl @@ -356,8 +356,8 @@ handle_normal_shutdown(Alert, _, #state{static_env = #static_env{role = Role, transport_cb = Transport, protocol_cb = Connection, tracker = Tracker}, - start_or_recv_from = StartFrom, - renegotiation = {false, first}} = State) -> + handshake_env = #handshake_env{renegotiation = {false, first}}, + start_or_recv_from = StartFrom} = State) -> Pids = Connection:pids(State), alert_user(Pids, Transport, Tracker,Socket, StartFrom, Alert, Role, Connection); @@ -401,8 +401,8 @@ handle_alert(#alert{level = ?WARNING, description = ?CLOSE_NOTIFY} = Alert, handle_alert(#alert{level = ?WARNING, description = ?NO_RENEGOTIATION} = Alert, StateName, #state{static_env = #static_env{role = Role, protocol_cb = Connection}, - ssl_options = SslOpts, - renegotiation = {true, internal}} = State) -> + handshake_env = #handshake_env{renegotiation = {true, internal}}, + ssl_options = SslOpts} = State) -> log_alert(SslOpts#ssl_options.log_alert, Role, Connection:protocol_name(), StateName, Alert#alert{role = opposite_role(Role)}), handle_normal_shutdown(Alert, StateName, State), @@ -411,26 +411,26 @@ handle_alert(#alert{level = ?WARNING, description = ?NO_RENEGOTIATION} = Alert, handle_alert(#alert{level = ?WARNING, description = ?NO_RENEGOTIATION} = Alert, connection = StateName, #state{static_env = #static_env{role = Role, protocol_cb = Connection}, - ssl_options = SslOpts, - renegotiation = {true, From} + handshake_env = #handshake_env{renegotiation = {true, From}} = HsEnv, + ssl_options = SslOpts } = State0) -> log_alert(SslOpts#ssl_options.log_alert, Role, Connection:protocol_name(), StateName, Alert#alert{role = opposite_role(Role)}), gen_statem:reply(From, {error, renegotiation_rejected}), State = Connection:reinit_handshake_data(State0), - Connection:next_event(connection, no_record, State#state{renegotiation = undefined}); + Connection:next_event(connection, no_record, State#state{handshake_env = HsEnv#handshake_env{renegotiation = undefined}}); handle_alert(#alert{level = ?WARNING, description = ?NO_RENEGOTIATION} = Alert, StateName, #state{static_env = #static_env{role = Role, protocol_cb = Connection}, - ssl_options = SslOpts, - renegotiation = {true, From} + handshake_env = #handshake_env{renegotiation = {true, From}} = HsEnv, + ssl_options = SslOpts } = State0) -> log_alert(SslOpts#ssl_options.log_alert, Role, Connection:protocol_name(), StateName, Alert#alert{role = opposite_role(Role)}), gen_statem:reply(From, {error, renegotiation_rejected}), %% Go back to connection! - State = Connection:reinit(State0#state{renegotiation = undefined}), + State = Connection:reinit(State0#state{handshake_env = HsEnv#handshake_env{renegotiation = undefined}}), Connection:next_event(connection, no_record, State); %% Gracefully log and ignore all other warning alerts @@ -607,7 +607,8 @@ handle_session(#server_hello{cipher_suite = CipherSuite, ssl_config(Opts, Role, State) -> ssl_config(Opts, Role, State, new). -ssl_config(Opts, Role, #state{static_env = InitStatEnv0} =State0, Type) -> +ssl_config(Opts, Role, #state{static_env = InitStatEnv0, + handshake_env = HsEnv} = State0, Type) -> {ok, #{cert_db_ref := Ref, cert_db_handle := CertDbHandle, fileref_db_handle := FileRefHandle, @@ -634,8 +635,8 @@ ssl_config(Opts, Role, #state{static_env = InitStatEnv0} =State0, Type) -> ssl_options = Opts}, case Type of new -> - Handshake = ssl_handshake:init_handshake_history(), - State#state{tls_handshake_history = Handshake}; + Hist = ssl_handshake:init_handshake_history(), + State#state{handshake_env = HsEnv#handshake_env{tls_handshake_history = Hist}}; continue -> State end. @@ -728,15 +729,15 @@ abbreviated({call, From}, Msg, State, Connection) -> handle_call(Msg, From, ?FUNCTION_NAME, State, Connection); abbreviated(internal, #finished{verify_data = Data} = Finished, #state{static_env = #static_env{role = server}, + handshake_env = #handshake_env{tls_handshake_history = Hist}, negotiated_version = Version, expecting_finished = true, - tls_handshake_history = Handshake, session = #session{master_secret = MasterSecret}, connection_states = ConnectionStates0} = State0, Connection) -> case ssl_handshake:verify_connection(ssl:tls_version(Version), Finished, client, get_current_prf(ConnectionStates0, write), - MasterSecret, Handshake) of + MasterSecret, Hist) of verified -> ConnectionStates = ssl_record:set_client_verify_data(current_both, Data, ConnectionStates0), @@ -748,13 +749,13 @@ abbreviated(internal, #finished{verify_data = Data} = Finished, end; abbreviated(internal, #finished{verify_data = Data} = Finished, #state{static_env = #static_env{role = client}, - tls_handshake_history = Handshake0, + handshake_env = #handshake_env{tls_handshake_history = Hist0}, session = #session{master_secret = MasterSecret}, negotiated_version = Version, connection_states = ConnectionStates0} = State0, Connection) -> case ssl_handshake:verify_connection(ssl:tls_version(Version), Finished, server, get_pending_prf(ConnectionStates0, write), - MasterSecret, Handshake0) of + MasterSecret, Hist0) of verified -> ConnectionStates1 = ssl_record:set_server_verify_data(current_read, Data, ConnectionStates0), @@ -1003,18 +1004,18 @@ cipher(info, Msg, State, _) -> cipher(internal, #certificate_verify{signature = Signature, hashsign_algorithm = CertHashSign}, #state{static_env = #static_env{role = server}, + handshake_env = #handshake_env{tls_handshake_history = Hist}, key_algorithm = KexAlg, public_key_info = PublicKeyInfo, negotiated_version = Version, - session = #session{master_secret = MasterSecret}, - tls_handshake_history = Handshake + session = #session{master_secret = MasterSecret} } = State, Connection) -> TLSVersion = ssl:tls_version(Version), %% Use negotiated value if TLS-1.2 otherwhise return default HashSign = negotiated_hashsign(CertHashSign, KexAlg, PublicKeyInfo, TLSVersion), case ssl_handshake:certificate_verify(Signature, PublicKeyInfo, - TLSVersion, HashSign, MasterSecret, Handshake) of + TLSVersion, HashSign, MasterSecret, Hist) of valid -> Connection:next_event(?FUNCTION_NAME, no_record, State#state{cert_hashsign_algorithm = HashSign}); @@ -1038,11 +1039,11 @@ cipher(internal, #finished{verify_data = Data} = Finished, = Session0, ssl_options = SslOpts, connection_states = ConnectionStates0, - tls_handshake_history = Handshake0} = State, Connection) -> + handshake_env = #handshake_env{tls_handshake_history = Hist}} = State, Connection) -> case ssl_handshake:verify_connection(ssl:tls_version(Version), Finished, opposite_role(Role), get_current_prf(ConnectionStates0, read), - MasterSecret, Handshake0) of + MasterSecret, Hist) of verified -> Session = handle_session(Role, SslOpts, Host, Port, Session0), cipher_role(Role, Data, Session, @@ -1084,9 +1085,10 @@ connection({call, RecvFrom}, {recv, N, Timeout}, start_or_recv_from = RecvFrom, timer = Timer}, ?FUNCTION_NAME, Connection); -connection({call, From}, renegotiate, #state{static_env = #static_env{protocol_cb = Connection}} = State, +connection({call, From}, renegotiate, #state{static_env = #static_env{protocol_cb = Connection}, + handshake_env = HsEnv} = State, Connection) -> - Connection:renegotiate(State#state{renegotiation = {true, From}}, []); + Connection:renegotiate(State#state{handshake_env = HsEnv#handshake_env{renegotiation = {true, From}}}, []); connection({call, From}, peer_certificate, #state{session = #session{peer_certificate = Cert}} = State, _) -> hibernate_after(?FUNCTION_NAME, State, [{reply, From, {ok, Cert}}]); @@ -1106,9 +1108,10 @@ connection({call, From}, negotiated_protocol, connection({call, From}, Msg, State, Connection) -> handle_call(Msg, From, ?FUNCTION_NAME, State, Connection); connection(cast, {internal_renegotiate, WriteState}, #state{static_env = #static_env{protocol_cb = Connection}, + handshake_env = HsEnv, connection_states = ConnectionStates} = State, Connection) -> - Connection:renegotiate(State#state{renegotiation = {true, internal}, + Connection:renegotiate(State#state{handshake_env = HsEnv#handshake_env{renegotiation = {true, internal}}, connection_states = ConnectionStates#{current_write => WriteState}}, []); connection(cast, {dist_handshake_complete, DHandle}, #state{ssl_options = #ssl_options{erl_dist = true}, @@ -1141,15 +1144,17 @@ downgrade(Type, Event, State, Connection) -> %% common or unexpected events for the state. %%-------------------------------------------------------------------- handle_common_event(internal, {handshake, {#hello_request{} = Handshake, _}}, connection = StateName, - #state{static_env = #static_env{role = client}} = State, _) -> + #state{static_env = #static_env{role = client}, + handshake_env = HsEnv} = State, _) -> %% Should not be included in handshake history - {next_state, StateName, State#state{renegotiation = {true, peer}}, [{next_event, internal, Handshake}]}; + {next_state, StateName, State#state{handshake_env = HsEnv#handshake_env{renegotiation = {true, peer}}}, + [{next_event, internal, Handshake}]}; handle_common_event(internal, {handshake, {#hello_request{}, _}}, StateName, #state{static_env = #static_env{role = client}}, _) when StateName =/= connection -> keep_state_and_data; handle_common_event(internal, {handshake, {Handshake, Raw}}, StateName, - #state{tls_handshake_history = Hs0} = State0, + #state{handshake_env = #handshake_env{tls_handshake_history = Hist0} = HsEnv} = State0, Connection) -> PossibleSNI = Connection:select_sni_extension(Handshake), @@ -1157,8 +1162,9 @@ handle_common_event(internal, {handshake, {Handshake, Raw}}, StateName, %% a client_hello, which needs to be determined by the connection callback. %% In other cases this is a noop State = handle_sni_extension(PossibleSNI, State0), - HsHist = ssl_handshake:update_handshake_history(Hs0, iolist_to_binary(Raw)), - {next_state, StateName, State#state{tls_handshake_history = HsHist}, + + Hist = ssl_handshake:update_handshake_history(Hist0, Raw), + {next_state, StateName, State#state{handshake_env = HsEnv#handshake_env{tls_handshake_history = Hist}}, [{next_event, internal, Handshake}]}; handle_common_event(internal, {protocol_record, TLSorDTLSRecord}, StateName, State, Connection) -> Connection:handle_protocol_record(TLSorDTLSRecord, StateName, State); @@ -1321,7 +1327,7 @@ handle_info(allow_renegotiate, StateName, State) -> {next_state, StateName, State#state{allow_renegotiate = true}}; handle_info({cancel_start_or_recv, StartFrom}, StateName, - #state{renegotiation = {false, first}} = State) when StateName =/= connection -> + #state{handshake_env = #handshake_env{renegotiation = {false, first}}} = State) when StateName =/= connection -> {stop_and_reply, {shutdown, user_timeout}, {reply, StartFrom, {error, timeout}}, @@ -1406,7 +1412,7 @@ format_status(terminate, [_, StateName, State]) -> [{data, [{"State", {StateName, State#state{connection_states = ?SECRET_PRINTOUT, protocol_buffers = ?SECRET_PRINTOUT, user_data_buffer = ?SECRET_PRINTOUT, - tls_handshake_history = ?SECRET_PRINTOUT, + handshake_env = ?SECRET_PRINTOUT, session = ?SECRET_PRINTOUT, private_key = ?SECRET_PRINTOUT, diffie_hellman_params = ?SECRET_PRINTOUT, @@ -1562,16 +1568,16 @@ certify_client(#state{client_certificate_requested = false} = State, _) -> State. verify_client_cert(#state{static_env = #static_env{role = client}, + handshake_env = #handshake_env{tls_handshake_history = Hist}, client_certificate_requested = true, negotiated_version = Version, private_key = PrivateKey, session = #session{master_secret = MasterSecret, own_certificate = OwnCert}, - cert_hashsign_algorithm = HashSign, - tls_handshake_history = Handshake0} = State, Connection) -> + cert_hashsign_algorithm = HashSign} = State, Connection) -> case ssl_handshake:client_certificate_verify(OwnCert, MasterSecret, - ssl:tls_version(Version), HashSign, PrivateKey, Handshake0) of + ssl:tls_version(Version), HashSign, PrivateKey, Hist) of #certificate_verify{} = Verified -> Connection:queue_handshake(Verified, State); ignore -> @@ -1607,7 +1613,9 @@ server_certify_and_key_exchange(State0, Connection) -> request_client_cert(State2, Connection). certify_client_key_exchange(#encrypted_premaster_secret{premaster_secret= EncPMS}, - #state{private_key = Key, client_hello_version = {Major, Minor} = Version} = State, Connection) -> + #state{private_key = Key, + handshake_env = #handshake_env{client_hello_version = {Major, Minor} = Version}} + = State, Connection) -> FakeSecret = make_premaster_secret(Version, rsa), %% Countermeasure for Bleichenbacher attack always provide some kind of premaster secret %% and fail handshake later.RFC 5246 section 7.4.7.1. @@ -2034,14 +2042,15 @@ cipher_protocol(State, Connection) -> Connection:queue_change_cipher(#change_cipher_spec{}, State). finished(#state{static_env = #static_env{role = Role}, + handshake_env = #handshake_env{tls_handshake_history = Hist}, negotiated_version = Version, session = Session, - connection_states = ConnectionStates0, - tls_handshake_history = Handshake0} = State0, StateName, Connection) -> + connection_states = ConnectionStates0} = State0, + StateName, Connection) -> MasterSecret = Session#session.master_secret, Finished = ssl_handshake:finished(ssl:tls_version(Version), Role, get_current_prf(ConnectionStates0, write), - MasterSecret, Handshake0), + MasterSecret, Hist), ConnectionStates = save_verify_data(Role, Finished, ConnectionStates0, StateName), Connection:send_handshake(Finished, State0#state{connection_states = ConnectionStates}). @@ -2369,7 +2378,7 @@ handle_trusted_certs_db(#state{static_env = #static_env{cert_db_ref = Ref, ok end. -prepare_connection(#state{renegotiation = Renegotiate, +prepare_connection(#state{handshake_env = #handshake_env{renegotiation = Renegotiate}, start_or_recv_from = RecvFrom} = State0, Connection) when Renegotiate =/= {false, first}, RecvFrom =/= undefined -> @@ -2379,18 +2388,18 @@ prepare_connection(State0, Connection) -> State = Connection:reinit(State0), {no_record, ack_connection(State)}. -ack_connection(#state{renegotiation = {true, Initiater}} = State) when Initiater == peer; - Initiater == internal -> - State#state{renegotiation = undefined}; -ack_connection(#state{renegotiation = {true, From}} = State) -> +ack_connection(#state{handshake_env = #handshake_env{renegotiation = {true, Initiater}} = HsEnv} = State) when Initiater == peer; + Initiater == internal -> + State#state{handshake_env = HsEnv#handshake_env{renegotiation = undefined}}; +ack_connection(#state{handshake_env = #handshake_env{renegotiation = {true, From}} = HsEnv} = State) -> gen_statem:reply(From, ok), - State#state{renegotiation = undefined}; -ack_connection(#state{renegotiation = {false, first}, + State#state{handshake_env = HsEnv#handshake_env{renegotiation = undefined}}; +ack_connection(#state{handshake_env = #handshake_env{renegotiation = {false, first}} = HsEnv, start_or_recv_from = StartFrom, timer = Timer} = State) when StartFrom =/= undefined -> gen_statem:reply(StartFrom, connected), cancel_timer(Timer), - State#state{renegotiation = undefined, + State#state{handshake_env = HsEnv#handshake_env{renegotiation = undefined}, start_or_recv_from = undefined, timer = undefined}; ack_connection(State) -> State. diff --git a/lib/ssl/src/ssl_connection.hrl b/lib/ssl/src/ssl_connection.hrl index dc8aa7619b..177fa37b83 100644 --- a/lib/ssl/src/ssl_connection.hrl +++ b/lib/ssl/src/ssl_connection.hrl @@ -51,8 +51,18 @@ cert_db_ref :: certdb_ref() | 'undefined', tracker :: pid() | 'undefined' %% Tracker process for listen socket }). + +-record(handshake_env, { + client_hello_version :: ssl_record:ssl_version() | 'undefined', + unprocessed_handshake_events = 0 :: integer(), + tls_handshake_history :: ssl_handshake:ssl_handshake_history() | secret_printout() + | 'undefined', + renegotiation :: undefined | {boolean(), From::term() | internal | peer} + }). + -record(state, { static_env :: #static_env{}, + handshake_env :: #handshake_env{} | secret_printout(), %% Change seldome user_application :: {Monitor::reference(), User::pid()}, ssl_options :: #ssl_options{}, @@ -68,12 +78,9 @@ connection_states :: ssl_record:connection_states() | secret_printout(), protocol_buffers :: term() | secret_printout() , %% #protocol_buffers{} from tls_record.hrl or dtls_recor.hr user_data_buffer :: undefined | binary() | secret_printout(), - + %% Used only in HS - unprocessed_handshake_events = 0 :: integer(), - tls_handshake_history :: ssl_handshake:ssl_handshake_history() | secret_printout() - | 'undefined', - client_hello_version :: ssl_record:ssl_version() | 'undefined', + client_certificate_requested = false :: boolean(), key_algorithm :: ssl_cipher_format:key_algo(), hashsign_algorithm = {undefined, undefined}, @@ -86,7 +93,6 @@ srp_params :: #srp_user{} | secret_printout() | 'undefined', srp_keys ::{PublicKey :: binary(), PrivateKey :: binary()} | secret_printout() | 'undefined', premaster_secret :: binary() | secret_printout() | 'undefined', - renegotiation :: undefined | {boolean(), From::term() | internal | peer}, start_or_recv_from :: term(), timer :: undefined | reference(), % start_or_recive_timer hello, %%:: #client_hello{} | #server_hello{}, diff --git a/lib/ssl/src/tls_connection.erl b/lib/ssl/src/tls_connection.erl index 8b24151d9f..e034cb20e9 100644 --- a/lib/ssl/src/tls_connection.erl +++ b/lib/ssl/src/tls_connection.erl @@ -143,22 +143,24 @@ pids(#state{protocol_specific = #{sender := Sender}}) -> %%==================================================================== %% State transition handling %%==================================================================== -next_record(#state{unprocessed_handshake_events = N} = State) when N > 0 -> - {no_record, State#state{unprocessed_handshake_events = N-1}}; - +next_record(#state{handshake_env = + #handshake_env{unprocessed_handshake_events = N} = HsEnv} + = State) when N > 0 -> + {no_record, State#state{handshake_env = + HsEnv#handshake_env{unprocessed_handshake_events = N-1}}}; next_record(#state{protocol_buffers = - #protocol_buffers{tls_packets = [], tls_cipher_texts = [CT | Rest]} - = Buffers, - connection_states = ConnStates0, - ssl_options = #ssl_options{padding_check = Check}} = State) -> - case tls_record:decode_cipher_text(CT, ConnStates0, Check) of - {Plain, ConnStates} -> - {Plain, State#state{protocol_buffers = - Buffers#protocol_buffers{tls_cipher_texts = Rest}, - connection_states = ConnStates}}; - #alert{} = Alert -> - {Alert, State} - end; + #protocol_buffers{tls_packets = [], tls_cipher_texts = [#ssl_tls{type = Type}| _] = CipherTexts0} + = Buffers, + connection_states = ConnectionStates0, + ssl_options = #ssl_options{padding_check = Check}} = State) -> + case decode_cipher_texts(Type, CipherTexts0, ConnectionStates0, Check, <<>>) of + {#ssl_tls{} = Record, ConnectionStates, CipherTexts} -> + {Record, State#state{protocol_buffers = Buffers#protocol_buffers{tls_cipher_texts = CipherTexts}, + connection_states = ConnectionStates}}; + {#alert{} = Alert, ConnectionStates, CipherTexts} -> + {Alert, State#state{protocol_buffers = Buffers#protocol_buffers{tls_cipher_texts = CipherTexts}, + connection_states = ConnectionStates}} + end; next_record(#state{protocol_buffers = #protocol_buffers{tls_packets = [], tls_cipher_texts = []}, protocol_specific = #{active_n_toggle := true, active_n := N} = ProtocolSpec, static_env = #static_env{socket = Socket, @@ -196,6 +198,22 @@ next_event(StateName, Record, State, Actions) -> {next_state, StateName, State, [{next_event, internal, Alert} | Actions]} end. +decode_cipher_texts(Type, [] = CipherTexts, ConnectionStates, _, Acc) -> + {#ssl_tls{type = Type, fragment = Acc}, ConnectionStates, CipherTexts}; +decode_cipher_texts(Type, + [#ssl_tls{type = Type} = CT | CipherTexts], ConnectionStates0, Check, Acc) -> + case tls_record:decode_cipher_text(CT, ConnectionStates0, Check) of + {#ssl_tls{type = ?APPLICATION_DATA, fragment = Plain}, ConnectionStates} -> + decode_cipher_texts(Type, CipherTexts, + ConnectionStates, Check, <<Acc/binary, Plain/binary>>); + {#ssl_tls{type = Type, fragment = Plain}, ConnectionStates} -> + {#ssl_tls{type = Type, fragment = Plain}, ConnectionStates, CipherTexts}; + #alert{} = Alert -> + {Alert, ConnectionStates0, CipherTexts} + end; +decode_cipher_texts(Type, CipherTexts, ConnectionStates, _, Acc) -> + {#ssl_tls{type = Type, fragment = Acc}, ConnectionStates, CipherTexts}. + %%% TLS record protocol level application data messages handle_protocol_record(#ssl_tls{type = ?APPLICATION_DATA, fragment = Data}, StateName, State0) -> @@ -227,8 +245,12 @@ handle_protocol_record(#ssl_tls{type = ?HANDSHAKE, fragment = Data}, connection -> ssl_connection:hibernate_after(StateName, State, Events); _ -> + HsEnv = State#state.handshake_env, {next_state, StateName, - State#state{unprocessed_handshake_events = unprocessed_events(Events)}, Events} + State#state{protocol_buffers = Buffers, + handshake_env = + HsEnv#handshake_env{unprocessed_handshake_events + = unprocessed_events(Events)}}, Events} end end catch throw:#alert{} = Alert -> @@ -263,15 +285,17 @@ handle_protocol_record(#ssl_tls{type = _Unknown}, StateName, State) -> renegotiation(Pid, WriteState) -> gen_statem:call(Pid, {user_renegotiate, WriteState}). -renegotiate(#state{static_env = #static_env{role = client}} = State, Actions) -> +renegotiate(#state{static_env = #static_env{role = client}, + handshake_env = HsEnv} = State, Actions) -> %% Handle same way as if server requested %% the renegotiation Hs0 = ssl_handshake:init_handshake_history(), - {next_state, connection, State#state{tls_handshake_history = Hs0}, + {next_state, connection, State#state{handshake_env = HsEnv#handshake_env{tls_handshake_history = Hs0}}, [{next_event, internal, #hello_request{}} | Actions]}; renegotiate(#state{static_env = #static_env{role = server, socket = Socket, transport_cb = Transport}, + handshake_env = HsEnv, negotiated_version = Version, connection_states = ConnectionStates0} = State0, Actions) -> HelloRequest = ssl_handshake:hello_request(), @@ -282,20 +306,20 @@ renegotiate(#state{static_env = #static_env{role = server, send(Transport, Socket, BinMsg), State = State0#state{connection_states = ConnectionStates, - tls_handshake_history = Hs0}, + handshake_env = HsEnv#handshake_env{tls_handshake_history = Hs0}}, next_event(hello, no_record, State, Actions). send_handshake(Handshake, State) -> send_handshake_flight(queue_handshake(Handshake, State)). queue_handshake(Handshake, #state{negotiated_version = Version, - tls_handshake_history = Hist0, + handshake_env = #handshake_env{tls_handshake_history = Hist0} = HsEnv, flight_buffer = Flight0, connection_states = ConnectionStates0} = State0) -> {BinHandshake, ConnectionStates, Hist} = encode_handshake(Handshake, Version, ConnectionStates0, Hist0), State0#state{connection_states = ConnectionStates, - tls_handshake_history = Hist, + handshake_env = HsEnv#handshake_env{tls_handshake_history = Hist}, flight_buffer = Flight0 ++ [BinHandshake]}. send_handshake_flight(#state{static_env = #static_env{socket = Socket, @@ -318,14 +342,14 @@ reinit(#state{protocol_specific = #{sender := Sender}, tls_sender:update_connection_state(Sender, Write, Version), reinit_handshake_data(State). -reinit_handshake_data(State) -> +reinit_handshake_data(#state{handshake_env = HsEnv} =State) -> %% premaster_secret, public_key_info and tls_handshake_info %% are only needed during the handshake phase. %% To reduce memory foot print of a connection reinitialize them. State#state{ premaster_secret = undefined, public_key_info = undefined, - tls_handshake_history = ssl_handshake:init_handshake_history() + handshake_env = HsEnv#handshake_env{tls_handshake_history = ssl_handshake:init_handshake_history()} }. select_sni_extension(#client_hello{extensions = HelloExtensions}) -> @@ -440,10 +464,10 @@ init({call, From}, {start, Timeout}, socket = Socket, session_cache = Cache, session_cache_cb = CacheCb}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, _}} = HsEnv, ssl_options = SslOpts, session = #session{own_certificate = Cert} = Session0, - connection_states = ConnectionStates0, - renegotiation = {Renegotiation, _} + connection_states = ConnectionStates0 } = State0) -> Timer = ssl_connection:start_or_recv_cancel_timer(Timeout, From), Hello = tls_handshake:client_hello(Host, Port, ConnectionStates0, SslOpts, @@ -459,7 +483,7 @@ init({call, From}, {start, Timeout}, negotiated_version = Version, %% Requested version session = Session0#session{session_id = Hello#client_hello.session_id}, - tls_handshake_history = Handshake, + handshake_env = HsEnv#handshake_env{tls_handshake_history = Handshake}, start_or_recv_from = From, timer = Timer}, next_event(hello, no_record, State); @@ -505,8 +529,8 @@ hello(internal, #client_hello{client_version = ClientVersion} = Hello, port = Port, session_cache = Cache, session_cache_cb = CacheCb}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, _}} = HsEnv, session = #session{own_certificate = Cert} = Session0, - renegotiation = {Renegotiation, _}, negotiated_protocol = CurrentProtocol, key_algorithm = KeyExAlg, ssl_options = SslOpts} = State) -> @@ -526,7 +550,7 @@ hello(internal, #client_hello{client_version = ClientVersion} = Hello, State#state{connection_states = ConnectionStates, negotiated_version = Version, hashsign_algorithm = HashSign, - client_hello_version = ClientVersion, + handshake_env = HsEnv#handshake_env{client_hello_version = ClientVersion}, session = Session, negotiated_protocol = Protocol}) end; @@ -534,7 +558,7 @@ hello(internal, #server_hello{} = Hello, #state{connection_states = ConnectionStates0, negotiated_version = ReqVersion, static_env = #static_env{role = client}, - renegotiation = {Renegotiation, _}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, _}}, ssl_options = SslOptions} = State) -> case tls_handshake:hello(Hello, SslOptions, ConnectionStates0, Renegotiation) of #alert{} = Alert -> @@ -620,7 +644,7 @@ connection(internal, #hello_request{}, port = Port, session_cache = Cache, session_cache_cb = CacheCb}, - renegotiation = {Renegotiation, peer}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, peer}}, session = #session{own_certificate = Cert} = Session0, ssl_options = SslOpts, protocol_specific = #{sender := Pid}, @@ -642,7 +666,7 @@ connection(internal, #hello_request{}, port = Port, session_cache = Cache, session_cache_cb = CacheCb}, - renegotiation = {Renegotiation, _}, + handshake_env = #handshake_env{renegotiation = {Renegotiation, _}}, session = #session{own_certificate = Cert} = Session0, ssl_options = SslOpts, connection_states = ConnectionStates} = State0) -> @@ -653,6 +677,7 @@ connection(internal, #hello_request{}, = Hello#client_hello.session_id}}, Actions); connection(internal, #client_hello{} = Hello, #state{static_env = #static_env{role = server}, + handshake_env = HsEnv, allow_renegotiate = true, connection_states = CS, protocol_specific = #{sender := Sender} @@ -666,7 +691,7 @@ connection(internal, #client_hello{} = Hello, {ok, Write} = tls_sender:renegotiate(Sender), next_event(hello, no_record, State#state{connection_states = CS#{current_write => Write}, allow_renegotiate = false, - renegotiation = {true, peer} + handshake_env = HsEnv#handshake_env{renegotiation = {true, peer}} }, [{next_event, internal, Hello}]); connection(internal, #client_hello{}, @@ -762,6 +787,10 @@ initial_state(Role, Sender, Host, Port, Socket, {SSLOptions, SocketOptions, Trac }, #state{ static_env = InitStatEnv, + handshake_env = #handshake_env{ + tls_handshake_history = ssl_handshake:init_handshake_history(), + renegotiation = {false, first} + }, socket_options = SocketOptions, ssl_options = SSLOptions, session = #session{is_resumable = new}, @@ -769,7 +798,6 @@ initial_state(Role, Sender, Host, Port, Socket, {SSLOptions, SocketOptions, Trac protocol_buffers = #protocol_buffers{}, user_application = {UserMonitor, User}, user_data_buffer = <<>>, - renegotiation = {false, first}, allow_renegotiate = SSLOptions#ssl_options.client_renegotiation, start_or_recv_from = undefined, flight_buffer = [], diff --git a/lib/ssl/test/ssl_basic_SUITE.erl b/lib/ssl/test/ssl_basic_SUITE.erl index 3778530a47..cfc4ec5770 100644 --- a/lib/ssl/test/ssl_basic_SUITE.erl +++ b/lib/ssl/test/ssl_basic_SUITE.erl @@ -4888,20 +4888,24 @@ run_suites(Ciphers, Config, Type) -> ssl_test_lib:ssl_options(server_psk_anon_hint, Config)]}; srp -> {ssl_test_lib:ssl_options(client_srp, Config), - ssl_test_lib:ssl_options(server_srp, Config)}; + [{ciphers, Ciphers} | + ssl_test_lib:ssl_options(server_srp, Config)]}; srp_anon -> {ssl_test_lib:ssl_options(client_srp, Config), - ssl_test_lib:ssl_options(server_srp_anon, Config)}; + [{ciphers, Ciphers} | + ssl_test_lib:ssl_options(server_srp_anon, Config)]}; srp_dsa -> {ssl_test_lib:ssl_options(client_srp_dsa, Config), - ssl_test_lib:ssl_options(server_srp_dsa, Config)}; + [{ciphers, Ciphers} | + ssl_test_lib:ssl_options(server_srp_dsa, Config)]}; ecdsa -> {ssl_test_lib:ssl_options(client_ecdsa_opts, Config), [{ciphers, Ciphers} | ssl_test_lib:ssl_options(server_ecdsa_opts, Config)]}; ecdh_rsa -> {ssl_test_lib:ssl_options(client_ecdh_rsa_opts, Config), - ssl_test_lib:ssl_options(server_ecdh_rsa_opts, Config)}; + [{ciphers, Ciphers} | + ssl_test_lib:ssl_options(server_ecdh_rsa_opts, Config)]}; rc4_rsa -> {ssl_test_lib:ssl_options(client_rsa_verify_opts, Config), [{ciphers, Ciphers} | diff --git a/lib/stdlib/doc/src/ets.xml b/lib/stdlib/doc/src/ets.xml index 70d1aaa74d..ccccf7de88 100644 --- a/lib/stdlib/doc/src/ets.xml +++ b/lib/stdlib/doc/src/ets.xml @@ -138,23 +138,56 @@ operation. In database terms the isolation level can be seen as "serializable", as if all isolated operations are carried out serially, one after the other in a strict order.</p> + </section> - <p>No other support is available within this module that would guarantee - consistency between objects. However, function - <seealso marker="#safe_fixtable/2"><c>safe_fixtable/2</c></seealso> - can be used to guarantee that a sequence of - <seealso marker="#first/1"><c>first/1</c></seealso> and - <seealso marker="#next/2"><c>next/2</c></seealso> calls traverse the - table without errors and that each existing object in the table is - visited exactly once, even if another (or the same) process - simultaneously deletes or inserts objects into the table. - Nothing else is guaranteed; in particular objects that are inserted - or deleted during such a traversal can be visited once or not at all. - Functions that internally traverse over a table, like - <seealso marker="#select/1"><c>select</c></seealso> and - <seealso marker="#match/1"><c>match</c></seealso>, - give the same guarantee as - <seealso marker="#safe_fixtable/2"><c>safe_fixtable</c></seealso>.</p> + <section><marker id="traversal"></marker> + <title>Table traversal</title> + <p>There are different ways to traverse through the objects of a table.</p> + <list type="bulleted"> + <item><p><em>Single-step</em> traversal one key at at time, using + <seealso marker="#first/1"><c>first/1</c></seealso>, + <seealso marker="#next/2"><c>next/2</c></seealso>, + <seealso marker="#last/1"><c>last/1</c></seealso> and + <seealso marker="#prev/2"><c>prev/2</c></seealso>.</p> + </item> + <item><p>Search with simple <em>match patterns</em>, using + <seealso marker="#match/1"><c>match/1/2/3</c></seealso>, + <seealso marker="#match_delete/2"><c>match_delete/2</c></seealso> and + <seealso marker="#match_object/1"><c>match_object/1/2/3</c></seealso>.</p> + </item> + <item><p>Search with more powerful <em>match specifications</em>, using + <seealso marker="#select/1"><c>select/1/2/3</c></seealso>, + <seealso marker="#select_count/2"><c>select_count/2</c></seealso>, + <seealso marker="#select_delete/2"><c>select_delete/2</c></seealso>, + <seealso marker="#select_replace/2"><c>select_replace/2</c></seealso> and + <seealso marker="#select_reverse/1"><c>select_reverse/1/2/3</c></seealso>.</p> + </item> + <item><p><em>Table conversions</em>, using + <seealso marker="#tab2file/2"><c>tab2file/2/3</c></seealso> and + <seealso marker="#tab2list/1"><c>tab2list/1</c></seealso>.</p> + </item> + </list> + <p>None of these ways of table traversal will guarantee a consistent table snapshot + if the table is also updated during the traversal. Moreover, traversals not + done in a <em>safe</em> way, on tables where keys are inserted or deleted + during the traversal, may yield the following undesired effects:</p> + <list type="bulleted"> + <item><p>Any key may be missed.</p></item> + <item><p>Any key may be found more than once.</p></item> + <item><p>The traversal may fail with <c>badarg</c> exception if keys are deleted.</p> + </item> + </list> + <p>A table traversal is <em>safe</em> if either</p> + <list type="bulleted"> + <item><p>the table is of type <c>ordered_set</c>.</p> + </item> + <item><p>the entire table traversal is done within one ETS function + call.</p> + </item> + <item><p>function <seealso marker="#safe_fixtable/2"><c>safe_fixtable/2</c></seealso> + is used to keep the table fixated during the entire traversal.</p> + </item> + </list> </section> <section> @@ -871,6 +904,9 @@ ets:is_compiled_ms(Broken).</code> <seealso marker="#first/1"><c>first/1</c></seealso> and <seealso marker="#next/2"><c>next/2</c></seealso>.</p> <p>If the table is empty, <c>'$end_of_table'</c> is returned.</p> + <p>Use <seealso marker="#safe_fixtable/2"><c>safe_fixtable/2</c></seealso> + to guarantee <seealso marker="#traversal">safe traversal</seealso> + for subsequent calls to <seealso marker="#match/1"><c>match/1</c></seealso>.</p> </desc> </func> @@ -936,6 +972,10 @@ ets:is_compiled_ms(Broken).</code> <seealso marker="#first/1"><c>first/1</c></seealso> and <seealso marker="#next/2"><c>next/2</c></seealso>.</p> <p>If the table is empty, <c>'$end_of_table'</c> is returned.</p> + <p>Use <seealso marker="#safe_fixtable/2"><c>safe_fixtable/2</c></seealso> + to guarantee <seealso marker="#traversal">safe traversal</seealso> + for subsequent calls to <seealso marker="#match_object/1"> + <c>match_object/1</c></seealso>.</p> </desc> </func> @@ -1192,12 +1232,13 @@ ets:select(Table, MatchSpec),</code> <p>To find the first key in the table, use <seealso marker="#first/1"><c>first/1</c></seealso>.</p> <p>Unless a table of type <c>set</c>, <c>bag</c>, or - <c>duplicate_bag</c> is protected using + <c>duplicate_bag</c> is fixated using <seealso marker="#safe_fixtable/2"><c>safe_fixtable/2</c></seealso>, - a traversal can fail if - concurrent updates are made to the table. For table - type <c>ordered_set</c>, the function returns the next key in - order, even if the object does no longer exist.</p> + a call to <c>next/2</c> will fail if <c><anno>Key1</anno></c> no longer + exists in the table. For table type <c>ordered_set</c>, the function + always returns the next key after <c><anno>Key1</anno></c> in term + order, regardless whether <c><anno>Key1</anno></c> ever existed in the + table.</p> </desc> </func> @@ -1212,7 +1253,7 @@ ets:select(Table, MatchSpec),</code> table types, the function is synonymous to <seealso marker="#next/2"><c>next/2</c></seealso>. If no previous key exists, <c>'$end_of_table'</c> is returned.</p> - <p>To find the last key in the table, use + <p>To find the last key in an <c>ordered_set</c> table, use <seealso marker="#last/1"><c>last/1</c></seealso>.</p> </desc> </func> @@ -1287,7 +1328,16 @@ ets:select(ets:repair_continuation(Broken,MS)).</code> <fsummary>Fix an ETS table for safe traversal.</fsummary> <desc> <p>Fixes a table of type <c>set</c>, <c>bag</c>, or - <c>duplicate_bag</c> for safe traversal.</p> + <c>duplicate_bag</c> for <seealso marker="#traversal"> + safe traversal</seealso> using + <seealso marker="#first/1"><c>first/1</c></seealso> & + <seealso marker="#next/2"><c>next/2</c></seealso>, + <seealso marker="#match/3"><c>match/3</c></seealso> & + <seealso marker="#match/1"><c>match/1</c></seealso>, + <seealso marker="#match_object/3"><c>match_object/3</c></seealso> & + <seealso marker="#match_object/1"><c>match_object/1</c></seealso>, or + <seealso marker="#select/3"><c>select/3</c></seealso> & + <seealso marker="#select/1"><c>select/1</c></seealso>.</p> <p>A process fixes a table by calling <c>safe_fixtable(<anno>Tab</anno>, true)</c>. The table remains fixed until the process releases it by calling @@ -1300,11 +1350,11 @@ ets:select(ets:repair_continuation(Broken,MS)).</code> <p>When a table is fixed, a sequence of <seealso marker="#first/1"><c>first/1</c></seealso> and <seealso marker="#next/2"><c>next/2</c></seealso> calls are - guaranteed to succeed, and each object in - the table is returned only once, even if objects - are removed or inserted during the traversal. The keys for new - objects inserted during the traversal <em>can</em> be returned by - <c>next/2</c> (it depends on the internal ordering of the keys).</p> + guaranteed to succeed even if keys are removed during the + traversal. The keys for objects inserted or deleted during a + traversal may or may not be returned by <c>next/2</c> depending on + the ordering of keys within the table and if the key exists at the time + <c>next/2</c> is called.</p> <p><em>Example:</em></p> <code type="none"> clean_all_with_value(Tab,X) -> @@ -1322,7 +1372,7 @@ clean_all_with_value(Tab,X,Key) -> true end, clean_all_with_value(Tab,X,ets:next(Tab,Key)).</code> - <p>Notice that no deleted objects are removed from a + <p>Notice that deleted objects are not freed from a fixed table until it has been released. If a process fixes a table but never releases it, the memory used by the deleted objects is never freed. The performance of operations on @@ -1332,9 +1382,9 @@ clean_all_with_value(Tab,X,Key) -> <c>info(Tab, safe_fixed_monotonic_time)</c></seealso>. A system with many processes fixing tables can need a monitor that sends alarms when tables have been fixed for too long.</p> - <p>Notice that for table type <c>ordered_set</c>, - <c>safe_fixtable/2</c> is not necessary, as calls to - <c>first/1</c> and <c>next/2</c> always succeed.</p> + <p>Notice that <c>safe_fixtable/2</c> is not necessary for table type + <c>ordered_set</c> and for traversals done by a single ETS function call, + like <seealso marker="#select/2"><c>select/2</c></seealso>.</p> </desc> </func> @@ -1462,7 +1512,10 @@ is_integer(X), is_integer(Y), X + Y < 4711]]></code> table, which is still faster than traversing the table object by object using <seealso marker="#first/1"><c>first/1</c></seealso> and <seealso marker="#next/2"><c>next/2</c></seealso>.</p> - <p>If the table is empty, <c>'$end_of_table'</c> is returned.</p> + <p>If the table is empty, <c>'$end_of_table'</c> is returned.</p> + <p>Use <seealso marker="#safe_fixtable/2"><c>safe_fixtable/2</c></seealso> + to guarantee <seealso marker="#traversal">safe traversal</seealso> + for subsequent calls to <seealso marker="#select/1"><c>select/1</c></seealso>.</p> </desc> </func> @@ -1519,7 +1572,7 @@ is_integer(X), is_integer(Y), X + Y < 4711]]></code> the match specification result.</p> <p>The match-and-replace operation for each individual object is guaranteed to be <seealso marker="#concurrency">atomic and isolated</seealso>. The - <c>select_replace</c> table iteration as a whole, like all other select functions, + <c>select_replace</c> table traversal as a whole, like all other select functions, does not give such guarantees.</p> <p>The match specifiction must be guaranteed to <em>retain the key</em> of any matched object. If not, <c>select_replace</c> will fail with <c>badarg</c> diff --git a/lib/stdlib/doc/src/io_lib.xml b/lib/stdlib/doc/src/io_lib.xml index cd4ca0a3a7..4d527f8ed3 100644 --- a/lib/stdlib/doc/src/io_lib.xml +++ b/lib/stdlib/doc/src/io_lib.xml @@ -385,7 +385,7 @@ <func> <name name="write" arity="1" since=""/> <name name="write" arity="2" clause_i="1" since=""/> - <name name="write" arity="2" clause_i="2" since=""/> + <name name="write" arity="2" clause_i="2" since="OTP 20.0"/> <fsummary>Write a term.</fsummary> <desc> <p>Returns a character list that represents <c><anno>Term</anno></c>. |