aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIngela Anderton Andin <[email protected]>2015-03-23 12:00:58 +0100
committerIngela Anderton Andin <[email protected]>2015-04-07 23:00:17 +0200
commit29a483d4f1eb42e23e30372d14ad69db2e9f33b9 (patch)
tree8f2c6f44a43ef4f1079196e156ffe0677291c21a
parent0f7e72999e65c293ea421cd931aff4fc60aa647e (diff)
downloadotp-29a483d4f1eb42e23e30372d14ad69db2e9f33b9.tar.gz
otp-29a483d4f1eb42e23e30372d14ad69db2e9f33b9.tar.bz2
otp-29a483d4f1eb42e23e30372d14ad69db2e9f33b9.zip
ssh: Change send_buf implementation from list to queue
A queue is the behaviour that we want, so this makes the code easier to understand and more effective.
-rw-r--r--lib/ssh/src/ssh_connection.erl105
-rw-r--r--lib/ssh/src/ssh_connection_handler.erl4
2 files changed, 60 insertions, 49 deletions
diff --git a/lib/ssh/src/ssh_connection.erl b/lib/ssh/src/ssh_connection.erl
index c66f810948..e9b4547aef 100644
--- a/lib/ssh/src/ssh_connection.erl
+++ b/lib/ssh/src/ssh_connection.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2008-2014. All Rights Reserved.
+%% Copyright Ericsson AB 2008-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -326,9 +326,7 @@ channel_data(ChannelId, DataType, Data,
SendDataType,
SendData)}
end, SendList),
- FlowCtrlMsgs = flow_control(Replies,
- Channel,
- Cache),
+ FlowCtrlMsgs = flow_control(Replies, Channel, Cache),
{{replies, Replies ++ FlowCtrlMsgs}, Connection};
_ ->
gen_fsm:reply(From, {error, closed}),
@@ -523,7 +521,9 @@ handle_msg(#ssh_msg_channel_open{channel_type = "forwarded-tcpip" = Type,
recv_window_size = LWindowSz,
recv_packet_size = LPacketSz,
send_window_size = RWindowSz,
- send_packet_size = RPacketSz},
+ send_packet_size = RPacketSz,
+ send_buf = queue:new()
+ },
ssh_channel:cache_update(Cache, Channel),
OpenConfMsg = channel_open_confirmation_msg(RemoteId, ChannelId,
LWindowSz, LPacketSz),
@@ -929,6 +929,7 @@ setup_session(#connection{channel_cache = Cache} = Connection0,
recv_packet_size = ?DEFAULT_PACKET_SIZE,
send_window_size = WindowSize,
send_packet_size = PacketSize,
+ send_buf = queue:new(),
remote_id = RemoteId
},
ssh_channel:cache_update(Cache, Channel),
@@ -1024,63 +1025,71 @@ request_reply_or_data(#channel{local_id = ChannelId, user = ChannelPid},
update_send_window(Channel, _, undefined,
#connection{channel_cache = Cache}) ->
- do_update_send_window(Channel, Channel#channel.send_buf, Cache);
+ do_update_send_window(Channel, Cache);
-update_send_window(Channel, DataType, Data,
+update_send_window(#channel{send_buf = SendBuffer} = Channel, DataType, Data,
#connection{channel_cache = Cache}) ->
- do_update_send_window(Channel, Channel#channel.send_buf ++ [{DataType, Data}], Cache).
+ do_update_send_window(Channel#channel{send_buf = queue:in({DataType, Data}, SendBuffer)},
+ Cache).
-do_update_send_window(Channel0, Buf0, Cache) ->
- {Buf1, NewSz, Buf2} = get_window(Buf0,
- Channel0#channel.send_packet_size,
- Channel0#channel.send_window_size),
-
- Channel = Channel0#channel{send_window_size = NewSz, send_buf = Buf2},
+do_update_send_window(Channel0, Cache) ->
+ {SendMsgs, Channel} = get_window(Channel0, []),
ssh_channel:cache_update(Cache, Channel),
- {Buf1, Channel}.
-
-get_window(Bs, PSz, WSz) ->
- get_window(Bs, PSz, WSz, []).
-
-get_window(Bs, _PSz, 0, Acc) ->
- {lists:reverse(Acc), 0, Bs};
-get_window([B0 = {DataType, Bin} | Bs], PSz, WSz, Acc) ->
- BSz = size(Bin),
- if BSz =< WSz -> %% will fit into window
- if BSz =< PSz -> %% will fit into a packet
- get_window(Bs, PSz, WSz-BSz, [B0|Acc]);
- true -> %% split into packet size
- <<Bin1:PSz/binary, Bin2/binary>> = Bin,
- get_window([setelement(2, B0, Bin2) | Bs],
- PSz, WSz-PSz,
- [{DataType, Bin1}|Acc])
+ {SendMsgs, Channel}.
+
+get_window(#channel{send_window_size = 0
+ } = Channel, Acc) ->
+ {lists:reverse(Acc), Channel};
+get_window(#channel{send_buf = Buffer,
+ send_packet_size = PacketSize,
+ send_window_size = WindowSize0
+ } = Channel, Acc0) ->
+ case queue:out(Buffer) of
+ {{value, {_, Data} = Msg}, NewBuffer} ->
+ case handle_send_window(Msg, size(Data), PacketSize, WindowSize0, Acc0) of
+ {WindowSize, Acc, {_, <<>>}} ->
+ {lists:reverse(Acc), Channel#channel{send_window_size = WindowSize,
+ send_buf = NewBuffer}};
+ {WindowSize, Acc, Rest} ->
+ get_window(Channel#channel{send_window_size = WindowSize,
+ send_buf = queue:in_r(Rest, NewBuffer)}, Acc)
end;
- WSz =< PSz -> %% use rest of window
- <<Bin1:WSz/binary, Bin2/binary>> = Bin,
- get_window([setelement(2, B0, Bin2) | Bs],
- PSz, WSz-WSz,
- [{DataType, Bin1}|Acc]);
- true -> %% use packet size
- <<Bin1:PSz/binary, Bin2/binary>> = Bin,
- get_window([setelement(2, B0, Bin2) | Bs],
- PSz, WSz-PSz,
- [{DataType, Bin1}|Acc])
+ {empty, NewBuffer} ->
+ {[], Channel#channel{send_buf = NewBuffer}}
+ end.
+
+handle_send_window(Msg = {Type, Data}, Size, PacketSize, WindowSize, Acc) when Size =< WindowSize ->
+ case Size =< PacketSize of
+ true ->
+ {WindowSize - Size, [Msg | Acc], {Type, <<>>}};
+ false ->
+ <<Msg1:PacketSize/binary, Msg2/binary>> = Data,
+ {WindowSize - PacketSize, [{Type, Msg1} | Acc], {Type, Msg2}}
end;
-get_window([], _PSz, WSz, Acc) ->
- {lists:reverse(Acc), WSz, []}.
+handle_send_window({Type, Data}, _, PacketSize, WindowSize, Acc) when WindowSize =< PacketSize ->
+ <<Msg1:WindowSize/binary, Msg2/binary>> = Data,
+ {WindowSize - WindowSize, [{Type, Msg1} | Acc], {Type, Msg2}};
+handle_send_window({Type, Data}, _, PacketSize, WindowSize, Acc) ->
+ <<Msg1:PacketSize/binary, Msg2/binary>> = Data,
+ {WindowSize - PacketSize, [{Type, Msg1} | Acc], {Type, Msg2}}.
flow_control(Channel, Cache) ->
flow_control([window_adjusted], Channel, Cache).
-
+
flow_control([], Channel, Cache) ->
ssh_channel:cache_update(Cache, Channel),
[];
-
flow_control([_|_], #channel{flow_control = From,
- send_buf = []} = Channel, Cache) when From =/= undefined ->
- [{flow_control, Cache, Channel, From, ok}];
+ send_buf = Buffer} = Channel, Cache) when From =/= undefined ->
+ case queue:is_empty(Buffer) of
+ true ->
+ ssh_channel:cache_update(Cache, Channel#channel{flow_control = undefined}),
+ [{flow_control, Cache, Channel, From, ok}];
+ false ->
+ []
+ end;
flow_control(_,_,_) ->
- [].
+ [].
pty_req(ConnectionHandler, Channel, Term, Width, Height,
PixWidth, PixHeight, PtyOpts, TimeOut) ->
diff --git a/lib/ssh/src/ssh_connection_handler.erl b/lib/ssh/src/ssh_connection_handler.erl
index 1610364287..e1f2e059e8 100644
--- a/lib/ssh/src/ssh_connection_handler.erl
+++ b/lib/ssh/src/ssh_connection_handler.erl
@@ -751,7 +751,9 @@ handle_sync_event({open, ChannelPid, Type, InitialWindowSize, MaxPacketSize, Dat
user = ChannelPid,
local_id = ChannelId,
recv_window_size = InitialWindowSize,
- recv_packet_size = MaxPacketSize},
+ recv_packet_size = MaxPacketSize,
+ send_buf = queue:new()
+ },
ssh_channel:cache_update(Cache, Channel),
State = add_request(true, ChannelId, From, State2),
start_timeout(ChannelId, From, Timeout),