From 47369e456a6ffb4652ccb11f7a9e64d34444fd01 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Mon, 10 Dec 2018 16:03:30 +0100 Subject: Tighten dist app data receive --- lib/ssl/src/ssl_connection.erl | 228 +++++++++++++++++++++++------------------ 1 file changed, 127 insertions(+), 101 deletions(-) (limited to 'lib/ssl/src/ssl_connection.erl') diff --git a/lib/ssl/src/ssl_connection.erl b/lib/ssl/src/ssl_connection.erl index 619824a7f2..e8c8120f50 100644 --- a/lib/ssl/src/ssl_connection.erl +++ b/lib/ssl/src/ssl_connection.erl @@ -459,89 +459,106 @@ passive_receive(State0 = #state{user_data_buffer = Buffer}, StateName, Connectio end end. -read_application_data(Data, #state{static_env = #static_env{socket = Socket, - protocol_cb = Connection, - transport_cb = Transport, - tracker = Tracker}, - user_application = {_Mon, Pid}, - socket_options = SOpts, - bytes_to_read = BytesToRead, - start_or_recv_from = RecvFrom, - timer = Timer, - user_data_buffer = Buffer0} = State0) -> - Buffer1 = if - Buffer0 =:= <<>> -> Data; - Data =:= <<>> -> Buffer0; - true -> <> - end, - case get_data(SOpts, BytesToRead, Buffer1) of +read_application_data( + Data, + #state{ + user_data_buffer = Buffer0, + erl_dist_handle = DHandle} = State) -> + %% + Buffer = bincat(Buffer0, Data), + case DHandle of + undefined -> + #state{ + socket_options = SocketOpts, + bytes_to_read = BytesToRead, + start_or_recv_from = RecvFrom, + timer = Timer} = State, + read_application_data( + Buffer, State, SocketOpts, RecvFrom, Timer, BytesToRead); + _ -> + try read_application_dist_data(Buffer, State, DHandle) + catch error:_ -> + {stop,disconnect, + State#state{ + user_data_buffer = Buffer, + bytes_to_read = undefined}} + end + end. + +read_application_dist_data(Buffer, State, DHandle) -> + case Buffer of + <> -> + erlang:dist_ctrl_put_data(DHandle, Data), + {no_record, + State#state{ + user_data_buffer = <<>>, + bytes_to_read = undefined}}; + <> -> + erlang:dist_ctrl_put_data(DHandle, Data), + read_application_dist_data(Rest, State, DHandle); + _ -> + {no_record, + State#state{ + user_data_buffer = Buffer, + bytes_to_read = undefined}} + end. + +read_application_data( + Buffer0, State, SocketOpts0, RecvFrom, Timer, BytesToRead) -> + %% + case get_data(SocketOpts0, BytesToRead, Buffer0) of {ok, ClientData, Buffer} -> % Send data - #state{ssl_options = #ssl_options{erl_dist = Dist}, - erl_dist_data = DistData} = State0, - case Dist andalso is_dist_up(DistData) of - true -> - dist_app_data(ClientData, State0#state{user_data_buffer = Buffer, - bytes_to_read = undefined}); - _ -> - SocketOpt = - deliver_app_data(Connection:pids(State0), - Transport, Socket, SOpts, - ClientData, Pid, RecvFrom, Tracker, Connection), - cancel_timer(Timer), - State = - State0#state{ - user_data_buffer = Buffer, - start_or_recv_from = undefined, - timer = undefined, - bytes_to_read = undefined, - socket_options = SocketOpt - }, - if - SocketOpt#socket_options.active =:= false; - Buffer =:= <<>> -> - %% Passive mode, wait for active once or recv - %% Active and empty, get more data - {no_record, State}; - true -> %% We have more data - read_application_data(<<>>, State) - end - end; + #state{ + static_env = + #static_env{ + socket = Socket, + protocol_cb = Connection, + transport_cb = Transport, + tracker = Tracker}, + user_application = {_Mon, Pid}} = State, + SocketOpts = + deliver_app_data( + Connection:pids(State), + Transport, Socket, SocketOpts0, + ClientData, Pid, RecvFrom, Tracker, Connection), + cancel_timer(Timer), + if + SocketOpts#socket_options.active =:= false; + Buffer =:= <<>> -> + %% Passive mode, wait for active once or recv + %% Active and empty, get more data + {no_record, + State#state{ + user_data_buffer = Buffer, + start_or_recv_from = undefined, + timer = undefined, + bytes_to_read = undefined, + socket_options = SocketOpts + }}; + true -> %% We have more data + read_application_data( + Buffer, State, SocketOpts, + undefined, undefined, undefined) + end; {more, Buffer} -> % no reply, we need more data - {no_record, State0#state{user_data_buffer = Buffer}}; + {no_record, State#state{user_data_buffer = Buffer}}; {passive, Buffer} -> - {no_record, State0#state{user_data_buffer = Buffer}}; + {no_record, State#state{user_data_buffer = Buffer}}; {error,_Reason} -> %% Invalid packet in packet mode - deliver_packet_error(Connection:pids(State0), - Transport, Socket, SOpts, Buffer1, Pid, RecvFrom, Tracker, Connection), - {stop, {shutdown, normal}, State0} - end. - -dist_app_data(ClientData, #state{erl_dist_data = #{dist_handle := undefined, - dist_buffer := DistBuff} = DistData} = State) -> - {no_record, State#state{erl_dist_data = DistData#{dist_buffer => [ClientData, DistBuff]}}}; -dist_app_data(ClientData, #state{erl_dist_data = #{dist_handle := DHandle, - dist_buffer := DistBuff} = ErlDistData, - user_data_buffer = Buffer, - socket_options = SOpts} = State) -> - Data = merge_dist_data(DistBuff, ClientData), - try erlang:dist_ctrl_put_data(DHandle, Data) of - _ when SOpts#socket_options.active =:= false; - Buffer =:= <<>> -> - %% Passive mode, wait for active once or recv - %% Active and empty, get more data - {no_record, State#state{erl_dist_data = ErlDistData#{dist_buffer => <<>>}}}; - _ -> %% We have more data - read_application_data(<<>>, State) - catch error:_ -> - {stop, State, disconnect} + #state{ + static_env = + #static_env{ + socket = Socket, + protocol_cb = Connection, + transport_cb = Transport, + tracker = Tracker}, + user_application = {_Mon, Pid}} = State, + deliver_packet_error( + Connection:pids(State), Transport, Socket, SocketOpts0, + Buffer0, Pid, RecvFrom, Tracker, Connection), + {stop, {shutdown, normal}, State} end. -merge_dist_data(<<>>, ClientData) -> - ClientData; -merge_dist_data(DistBuff, <<>>) -> - DistBuff; -merge_dist_data(DistBuff, ClientData) -> - [DistBuff, ClientData]. %%==================================================================== %% Help functions for tls|dtls_connection.erl %%==================================================================== @@ -1095,15 +1112,14 @@ connection(cast, {internal_renegotiate, WriteState}, #state{static_env = #static connection_states = ConnectionStates#{current_write => WriteState}}, []); connection(cast, {dist_handshake_complete, DHandle}, #state{ssl_options = #ssl_options{erl_dist = true}, - erl_dist_data = ErlDistData, socket_options = SockOpts} = State0, Connection) -> process_flag(priority, normal), State1 = State0#state{ - socket_options = - SockOpts#socket_options{active = true}, - erl_dist_data = ErlDistData#{dist_handle => DHandle}}, - {Record, State} = dist_app_data(<<>>, State1), + socket_options = SockOpts#socket_options{active = true}, + erl_dist_handle = DHandle, + bytes_to_read = undefined}, + {Record, State} = read_application_data(<<>>, State1), Connection:next_event(connection, Record, State); connection(info, Msg, State, _) -> handle_info(Msg, ?FUNCTION_NAME, State); @@ -2557,21 +2573,28 @@ decode_packet(Type, Buffer, PacketOpts) -> %% Note that if the user has explicitly configured the socket to expect %% HTTP headers using the {packet, httph} option, we don't do any automatic %% switching of states. -deliver_app_data(CPids, Transport, Socket, SOpts = #socket_options{active=Active, packet=Type}, - Data, Pid, From, Tracker, Connection) -> - send_or_reply(Active, Pid, From, - format_reply(CPids, Transport, Socket, SOpts, Data, Tracker, Connection)), - SO = case Data of - {P, _, _, _} when ((P =:= http_request) or (P =:= http_response)), - ((Type =:= http) or (Type =:= http_bin)) -> - SOpts#socket_options{packet={Type, headers}}; - http_eoh when tuple_size(Type) =:= 2 -> - % End of headers - expect another Request/Response line - {Type1, headers} = Type, - SOpts#socket_options{packet=Type1}; - _ -> - SOpts - end, +deliver_app_data( + CPids, Transport, Socket, + #socket_options{active=Active, packet=Type} = SOpts, + Data, Pid, From, Tracker, Connection) -> + %% + send_or_reply( + Active, Pid, From, + format_reply( + CPids, Transport, Socket, SOpts, Data, Tracker, Connection)), + SO = + case Data of + {P, _, _, _} + when ((P =:= http_request) or (P =:= http_response)), + ((Type =:= http) or (Type =:= http_bin)) -> + SOpts#socket_options{packet={Type, headers}}; + http_eoh when tuple_size(Type) =:= 2 -> + %% End of headers - expect another Request/Response line + {Type1, headers} = Type, + SOpts#socket_options{packet=Type1}; + _ -> + SOpts + end, case Active of once -> SO#socket_options{active=false}; @@ -2728,7 +2751,10 @@ new_emulated([], EmOpts) -> new_emulated(NewEmOpts, _) -> NewEmOpts. -is_dist_up(#{dist_handle := Handle}) when Handle =/= undefined -> - true; -is_dist_up(_) -> - false. +-compile({inline, [bincat/2]}). +bincat(<<>>, B) -> + B; +bincat(A, <<>>) -> + A; +bincat(A, B) -> + <>. -- cgit v1.2.3