From 47369e456a6ffb4652ccb11f7a9e64d34444fd01 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Mon, 10 Dec 2018 16:03:30 +0100 Subject: Tighten dist app data receive --- lib/ssl/src/ssl_connection.erl | 228 +++++++++++++++++++++++------------------ lib/ssl/src/ssl_connection.hrl | 2 +- lib/ssl/src/tls_connection.erl | 8 -- 3 files changed, 128 insertions(+), 110 deletions(-) (limited to 'lib') diff --git a/lib/ssl/src/ssl_connection.erl b/lib/ssl/src/ssl_connection.erl index 619824a7f2..e8c8120f50 100644 --- a/lib/ssl/src/ssl_connection.erl +++ b/lib/ssl/src/ssl_connection.erl @@ -459,89 +459,106 @@ passive_receive(State0 = #state{user_data_buffer = Buffer}, StateName, Connectio end end. -read_application_data(Data, #state{static_env = #static_env{socket = Socket, - protocol_cb = Connection, - transport_cb = Transport, - tracker = Tracker}, - user_application = {_Mon, Pid}, - socket_options = SOpts, - bytes_to_read = BytesToRead, - start_or_recv_from = RecvFrom, - timer = Timer, - user_data_buffer = Buffer0} = State0) -> - Buffer1 = if - Buffer0 =:= <<>> -> Data; - Data =:= <<>> -> Buffer0; - true -> <> - end, - case get_data(SOpts, BytesToRead, Buffer1) of +read_application_data( + Data, + #state{ + user_data_buffer = Buffer0, + erl_dist_handle = DHandle} = State) -> + %% + Buffer = bincat(Buffer0, Data), + case DHandle of + undefined -> + #state{ + socket_options = SocketOpts, + bytes_to_read = BytesToRead, + start_or_recv_from = RecvFrom, + timer = Timer} = State, + read_application_data( + Buffer, State, SocketOpts, RecvFrom, Timer, BytesToRead); + _ -> + try read_application_dist_data(Buffer, State, DHandle) + catch error:_ -> + {stop,disconnect, + State#state{ + user_data_buffer = Buffer, + bytes_to_read = undefined}} + end + end. + +read_application_dist_data(Buffer, State, DHandle) -> + case Buffer of + <> -> + erlang:dist_ctrl_put_data(DHandle, Data), + {no_record, + State#state{ + user_data_buffer = <<>>, + bytes_to_read = undefined}}; + <> -> + erlang:dist_ctrl_put_data(DHandle, Data), + read_application_dist_data(Rest, State, DHandle); + _ -> + {no_record, + State#state{ + user_data_buffer = Buffer, + bytes_to_read = undefined}} + end. + +read_application_data( + Buffer0, State, SocketOpts0, RecvFrom, Timer, BytesToRead) -> + %% + case get_data(SocketOpts0, BytesToRead, Buffer0) of {ok, ClientData, Buffer} -> % Send data - #state{ssl_options = #ssl_options{erl_dist = Dist}, - erl_dist_data = DistData} = State0, - case Dist andalso is_dist_up(DistData) of - true -> - dist_app_data(ClientData, State0#state{user_data_buffer = Buffer, - bytes_to_read = undefined}); - _ -> - SocketOpt = - deliver_app_data(Connection:pids(State0), - Transport, Socket, SOpts, - ClientData, Pid, RecvFrom, Tracker, Connection), - cancel_timer(Timer), - State = - State0#state{ - user_data_buffer = Buffer, - start_or_recv_from = undefined, - timer = undefined, - bytes_to_read = undefined, - socket_options = SocketOpt - }, - if - SocketOpt#socket_options.active =:= false; - Buffer =:= <<>> -> - %% Passive mode, wait for active once or recv - %% Active and empty, get more data - {no_record, State}; - true -> %% We have more data - read_application_data(<<>>, State) - end - end; + #state{ + static_env = + #static_env{ + socket = Socket, + protocol_cb = Connection, + transport_cb = Transport, + tracker = Tracker}, + user_application = {_Mon, Pid}} = State, + SocketOpts = + deliver_app_data( + Connection:pids(State), + Transport, Socket, SocketOpts0, + ClientData, Pid, RecvFrom, Tracker, Connection), + cancel_timer(Timer), + if + SocketOpts#socket_options.active =:= false; + Buffer =:= <<>> -> + %% Passive mode, wait for active once or recv + %% Active and empty, get more data + {no_record, + State#state{ + user_data_buffer = Buffer, + start_or_recv_from = undefined, + timer = undefined, + bytes_to_read = undefined, + socket_options = SocketOpts + }}; + true -> %% We have more data + read_application_data( + Buffer, State, SocketOpts, + undefined, undefined, undefined) + end; {more, Buffer} -> % no reply, we need more data - {no_record, State0#state{user_data_buffer = Buffer}}; + {no_record, State#state{user_data_buffer = Buffer}}; {passive, Buffer} -> - {no_record, State0#state{user_data_buffer = Buffer}}; + {no_record, State#state{user_data_buffer = Buffer}}; {error,_Reason} -> %% Invalid packet in packet mode - deliver_packet_error(Connection:pids(State0), - Transport, Socket, SOpts, Buffer1, Pid, RecvFrom, Tracker, Connection), - {stop, {shutdown, normal}, State0} - end. - -dist_app_data(ClientData, #state{erl_dist_data = #{dist_handle := undefined, - dist_buffer := DistBuff} = DistData} = State) -> - {no_record, State#state{erl_dist_data = DistData#{dist_buffer => [ClientData, DistBuff]}}}; -dist_app_data(ClientData, #state{erl_dist_data = #{dist_handle := DHandle, - dist_buffer := DistBuff} = ErlDistData, - user_data_buffer = Buffer, - socket_options = SOpts} = State) -> - Data = merge_dist_data(DistBuff, ClientData), - try erlang:dist_ctrl_put_data(DHandle, Data) of - _ when SOpts#socket_options.active =:= false; - Buffer =:= <<>> -> - %% Passive mode, wait for active once or recv - %% Active and empty, get more data - {no_record, State#state{erl_dist_data = ErlDistData#{dist_buffer => <<>>}}}; - _ -> %% We have more data - read_application_data(<<>>, State) - catch error:_ -> - {stop, State, disconnect} + #state{ + static_env = + #static_env{ + socket = Socket, + protocol_cb = Connection, + transport_cb = Transport, + tracker = Tracker}, + user_application = {_Mon, Pid}} = State, + deliver_packet_error( + Connection:pids(State), Transport, Socket, SocketOpts0, + Buffer0, Pid, RecvFrom, Tracker, Connection), + {stop, {shutdown, normal}, State} end. -merge_dist_data(<<>>, ClientData) -> - ClientData; -merge_dist_data(DistBuff, <<>>) -> - DistBuff; -merge_dist_data(DistBuff, ClientData) -> - [DistBuff, ClientData]. %%==================================================================== %% Help functions for tls|dtls_connection.erl %%==================================================================== @@ -1095,15 +1112,14 @@ connection(cast, {internal_renegotiate, WriteState}, #state{static_env = #static connection_states = ConnectionStates#{current_write => WriteState}}, []); connection(cast, {dist_handshake_complete, DHandle}, #state{ssl_options = #ssl_options{erl_dist = true}, - erl_dist_data = ErlDistData, socket_options = SockOpts} = State0, Connection) -> process_flag(priority, normal), State1 = State0#state{ - socket_options = - SockOpts#socket_options{active = true}, - erl_dist_data = ErlDistData#{dist_handle => DHandle}}, - {Record, State} = dist_app_data(<<>>, State1), + socket_options = SockOpts#socket_options{active = true}, + erl_dist_handle = DHandle, + bytes_to_read = undefined}, + {Record, State} = read_application_data(<<>>, State1), Connection:next_event(connection, Record, State); connection(info, Msg, State, _) -> handle_info(Msg, ?FUNCTION_NAME, State); @@ -2557,21 +2573,28 @@ decode_packet(Type, Buffer, PacketOpts) -> %% Note that if the user has explicitly configured the socket to expect %% HTTP headers using the {packet, httph} option, we don't do any automatic %% switching of states. -deliver_app_data(CPids, Transport, Socket, SOpts = #socket_options{active=Active, packet=Type}, - Data, Pid, From, Tracker, Connection) -> - send_or_reply(Active, Pid, From, - format_reply(CPids, Transport, Socket, SOpts, Data, Tracker, Connection)), - SO = case Data of - {P, _, _, _} when ((P =:= http_request) or (P =:= http_response)), - ((Type =:= http) or (Type =:= http_bin)) -> - SOpts#socket_options{packet={Type, headers}}; - http_eoh when tuple_size(Type) =:= 2 -> - % End of headers - expect another Request/Response line - {Type1, headers} = Type, - SOpts#socket_options{packet=Type1}; - _ -> - SOpts - end, +deliver_app_data( + CPids, Transport, Socket, + #socket_options{active=Active, packet=Type} = SOpts, + Data, Pid, From, Tracker, Connection) -> + %% + send_or_reply( + Active, Pid, From, + format_reply( + CPids, Transport, Socket, SOpts, Data, Tracker, Connection)), + SO = + case Data of + {P, _, _, _} + when ((P =:= http_request) or (P =:= http_response)), + ((Type =:= http) or (Type =:= http_bin)) -> + SOpts#socket_options{packet={Type, headers}}; + http_eoh when tuple_size(Type) =:= 2 -> + %% End of headers - expect another Request/Response line + {Type1, headers} = Type, + SOpts#socket_options{packet=Type1}; + _ -> + SOpts + end, case Active of once -> SO#socket_options{active=false}; @@ -2728,7 +2751,10 @@ new_emulated([], EmOpts) -> new_emulated(NewEmOpts, _) -> NewEmOpts. -is_dist_up(#{dist_handle := Handle}) when Handle =/= undefined -> - true; -is_dist_up(_) -> - false. +-compile({inline, [bincat/2]}). +bincat(<<>>, B) -> + B; +bincat(A, <<>>) -> + A; +bincat(A, B) -> + <>. diff --git a/lib/ssl/src/ssl_connection.hrl b/lib/ssl/src/ssl_connection.hrl index 2f4dfefdda..dc8aa7619b 100644 --- a/lib/ssl/src/ssl_connection.hrl +++ b/lib/ssl/src/ssl_connection.hrl @@ -101,7 +101,7 @@ %% The mecahnism is also usefull in TLS although we do not %% need to worry about packet loss in TLS. In DTLS we need to track DTLS handshake seqnr flight_state = reliable, %% reliable | {retransmit, integer()}| {waiting, ref(), integer()} - last two is used in DTLS over udp. - erl_dist_data = #{} :: map(), + erl_dist_handle = undefined :: erlang:dist_handle() | undefined, protocol_specific = #{} :: map() }). diff --git a/lib/ssl/src/tls_connection.erl b/lib/ssl/src/tls_connection.erl index 9aeca8c589..009ba0600d 100644 --- a/lib/ssl/src/tls_connection.erl +++ b/lib/ssl/src/tls_connection.erl @@ -711,7 +711,6 @@ initial_state(Role, Sender, Host, Port, Socket, {SSLOptions, SocketOptions, Trac erl_dist = IsErlDist} = SSLOptions, ConnectionStates = tls_record:init_connection_states(Role, BeastMitigation), - ErlDistData = erl_dist_data(IsErlDist), SessionCacheCb = case application:get_env(ssl, session_cb) of {ok, Cb} when is_atom(Cb) -> Cb; @@ -743,7 +742,6 @@ initial_state(Role, Sender, Host, Port, Socket, {SSLOptions, SocketOptions, Trac socket_options = SocketOptions, ssl_options = SSLOptions, session = #session{is_resumable = new}, - erl_dist_data = ErlDistData, connection_states = ConnectionStates, protocol_buffers = #protocol_buffers{}, user_application = {UserMonitor, User}, @@ -758,12 +756,6 @@ initial_state(Role, Sender, Host, Port, Socket, {SSLOptions, SocketOptions, Trac } }. -erl_dist_data(true) -> - #{dist_handle => undefined, - dist_buffer => <<>>}; -erl_dist_data(false) -> - #{}. - initialize_tls_sender(#state{static_env = #static_env{ role = Role, transport_cb = Transport, -- cgit v1.2.3