diff options
-rw-r--r-- | lib/inets/src/http_client/httpc_handler.erl | 55 | ||||
-rw-r--r-- | lib/inets/src/http_lib/http_chunk.erl | 53 | ||||
-rw-r--r-- | lib/inets/test/httpc_SUITE.erl | 9 |
3 files changed, 72 insertions, 45 deletions
diff --git a/lib/inets/src/http_client/httpc_handler.erl b/lib/inets/src/http_client/httpc_handler.erl index f6b13c2998..55794f57dc 100644 --- a/lib/inets/src/http_client/httpc_handler.erl +++ b/lib/inets/src/http_client/httpc_handler.erl @@ -33,7 +33,6 @@ %% connect_and_send/2, send/2, cancel/3, - stream/3, stream_next/1, info/1 ]). @@ -65,7 +64,7 @@ options, % #options{} timers = #timers{}, % #timers{} profile_name, % atom() - id of httpc_manager process. - once % send | undefined + once = inactive % inactive | once }). @@ -231,6 +230,8 @@ init([Parent, Request, Options, ProfileName]) -> ProxyOptions = handle_proxy_options(Request#request.scheme, Options), Address = handle_proxy(Request#request.address, ProxyOptions), {ok, State} = + %% #state.once should initially be 'inactive' because we + %% activate the socket at first regardless of the state. case {Address /= Request#request.address, Request#request.scheme} of {true, https} -> connect_and_send_upgrade_request(Address, Request, @@ -425,7 +426,9 @@ handle_cast({cancel, RequestId, From}, handle_cast(stream_next, #state{session = Session} = State) -> activate_once(Session), - {noreply, State#state{once = once}}. + %% Inactivate the #state.once here because we don't want + %% next_body_chunk/1 to activate the socket twice. + {noreply, State#state{once = inactive}}. %%-------------------------------------------------------------------- @@ -478,6 +481,41 @@ handle_info({Proto, _Socket, Data}, NewMFA = {Module, whole_body, [NewBody, NewLength]}, {noreply, NewState#state{mfa = NewMFA, request = NewRequest}}; + {Module, decode_size, + [TotalChunk, HexList, + {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]} + when BodySoFar =/= <<>> -> + ?hcrd("data processed - decode_size", []), + %% The response body is chunk-encoded. Steal decoded + %% chunks as much as possible to stream. + {_, Code, _} = StatusLine, + {NewBody, NewRequest} = stream(BodySoFar, Request, Code), + NewState = next_body_chunk(State), + NewMFA = {Module, decode_size, + [TotalChunk, HexList, + {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]}, + {noreply, NewState#state{mfa = NewMFA, + request = NewRequest}}; + {Module, decode_data, + [ChunkSize, TotalChunk, + {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]} + when TotalChunk =/= <<>> orelse BodySoFar =/= <<>> -> + ?hcrd("data processed - decode_data", []), + %% The response body is chunk-encoded. Steal decoded + %% chunks as much as possible to stream. + ChunkSizeToSteal = min(ChunkSize, byte_size(TotalChunk)), + <<StolenChunk:ChunkSizeToSteal/binary, NewTotalChunk/binary>> = TotalChunk, + StolenBody = <<BodySoFar/binary, StolenChunk/binary>>, + NewChunkSize = ChunkSize - ChunkSizeToSteal, + {_, Code, _} = StatusLine, + + {NewBody, NewRequest} = stream(StolenBody, Request, Code), + NewState = next_body_chunk(State), + NewMFA = {Module, decode_data, + [NewChunkSize, NewTotalChunk, + {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]}, + {noreply, NewState#state{mfa = NewMFA, + request = NewRequest}}; NewMFA -> ?hcrd("data processed - new mfa", []), activate_once(Session), @@ -1027,11 +1065,15 @@ handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, status_line = StatusLine, headers = Headers}) end; -handle_http_msg({ChunkedHeaders, Body}, #state{headers = Headers} = State) -> +handle_http_msg({ChunkedHeaders, Body}, + #state{status_line = {_, Code, _}, headers = Headers} = State) -> ?hcrt("handle_http_msg", [{chunked_headers, ChunkedHeaders}, {headers, Headers}]), NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), - handle_response(State#state{headers = NewHeaders, body = Body}); + {NewBody, NewRequest} = stream(Body, State#state.request, Code), + handle_response(State#state{headers = NewHeaders, + body = NewBody, + request = NewRequest}); handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) -> ?hcrt("handle_http_msg", [{code, Code}]), {NewBody, NewRequest} = stream(Body, State#state.request, Code), @@ -1070,8 +1112,7 @@ handle_http_body(Body, #state{headers = Headers, "chunked" -> ?hcrt("handle_http_body - chunked", []), case http_chunk:decode(Body, State#state.max_body_size, - State#state.max_header_size, - {Code, Request}) of + State#state.max_header_size) of {Module, Function, Args} -> ?hcrt("handle_http_body - new mfa", [{module, Module}, diff --git a/lib/inets/src/http_lib/http_chunk.erl b/lib/inets/src/http_lib/http_chunk.erl index 57647438e9..24c939e80c 100644 --- a/lib/inets/src/http_lib/http_chunk.erl +++ b/lib/inets/src/http_lib/http_chunk.erl @@ -24,7 +24,7 @@ -include("http_internal.hrl"). %% API --export([decode/3, decode/4, encode/1, encode_last/0, handle_headers/2]). +-export([decode/3, encode/1, encode_last/0, handle_headers/2]). %% Callback API - used for example if the chunkedbody is received a %% little at a time on a socket. -export([decode_size/1, ignore_extensions/1, decode_data/1, decode_trailer/1]). @@ -34,20 +34,14 @@ %%% API %%%========================================================================= %%------------------------------------------------------------------------- -%% decode(ChunkedBody, MaxBodySize, MaxHeaderSize, <Stream>) -> +%% decode(ChunkedBody, MaxBodySize, MaxHeaderSize) -> %% {ok, {Headers, Body}} | {Module, Function, Args} %% %% Headers = ["Header:Value"] %% ChunkedBody = binary() %% MaxBodySize = integer() %% MaxHeaderSize = integer() -%% Stream = {Code, Request} - if Request#request.stream =/= none -%% and Code == 200 the side effect of sending each decode chunk to the -%% client/file before the whole body is received will take place. %% -%% Note: decode/4 should only be used from httpc_handler module. -%% Otherwhise use the side effect free decode/3. -%% %% Description: Decodes a body encoded by the chunked transfer %% encoding. If the ChunkedBody is not compleate it returns {Module, %% Function, Args} so that decoding can be continued when more of the @@ -61,12 +55,9 @@ %% the next pass in the loop. %%------------------------------------------------------------------------- decode(ChunkedBody, MaxBodySize, MaxHeaderSize) -> - decode(ChunkedBody, MaxBodySize, MaxHeaderSize, false). - -decode(ChunkedBody, MaxBodySize, MaxHeaderSize, Stream) -> %% Note decode_size will call decode_data. - decode_size([ChunkedBody, <<>>, [], - {MaxBodySize, <<>>, 0, MaxHeaderSize, Stream}]). + decode_size([ChunkedBody, <<>>, [], + {MaxBodySize, <<>>, 0, MaxHeaderSize}]). %%------------------------------------------------------------------------- %% encode(Chunk) -> EncodedChunk @@ -150,7 +141,7 @@ decode_size(<<>>, HexList, Info) -> decode_size(Data = <<?CR, ?LF, ChunkRest/binary>>, HexList, {MaxBodySize, Body, AccLength, - MaxHeaderSize, Stream}) -> + MaxHeaderSize}) -> ChunkSize = http_util:hexlist_to_integer(lists:reverse(HexList)), case ChunkSize of 0 -> % Last chunk, there was no data @@ -164,7 +155,7 @@ decode_size(Data = <<?CR, ?LF, ChunkRest/binary>>, HexList, %% to this function comes in. decode_data(ChunkSize, ChunkRest, {MaxBodySize, Body, ChunkSize + AccLength , - MaxHeaderSize, Stream}) + MaxHeaderSize}) end; decode_size(<<";", Rest/binary>>, HexList, Info) -> %% Note ignore_extensions will call decode_size/1 again when @@ -189,50 +180,42 @@ ignore_extensions(<<_Octet, Rest/binary>>, NextFunction) -> ignore_extensions(Rest, NextFunction). decode_data(ChunkSize, TotalChunk, - Info = {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize, Stream}) + Info = {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}) when ChunkSize =< size(TotalChunk) -> case TotalChunk of %% Last chunk <<Data:ChunkSize/binary, ?CR, ?LF, "0", ";">> -> %% Note ignore_extensions will call decode_trailer/1 %% once it ignored all extensions. - {NewBody, _} = - stream(<<BodySoFar/binary, Data/binary>>, Stream), {?MODULE, ignore_extensions, [<<>>, {?MODULE, decode_trailer, [<<>>, [],[], MaxHeaderSize, - NewBody, + <<BodySoFar/binary, Data/binary>>, integer_to_list(AccLength)]}]}; <<Data:ChunkSize/binary, ?CR, ?LF, "0", ";", Rest/binary>> -> %% Note ignore_extensions will call decode_trailer/1 %% once it ignored all extensions. - {NewBody, _} = stream(<<BodySoFar/binary, Data/binary>>, Stream), ignore_extensions(Rest, {?MODULE, decode_trailer, [<<>>, [],[], MaxHeaderSize, - NewBody, + <<BodySoFar/binary, Data/binary>>, integer_to_list(AccLength)]}); <<Data:ChunkSize/binary, ?CR, ?LF, "0", ?CR, ?LF>> -> - {NewBody, _} = stream(<<BodySoFar/binary, Data/binary>>, Stream), {?MODULE, decode_trailer, [<<?CR, ?LF>>, [],[], MaxHeaderSize, - NewBody, + <<BodySoFar/binary, Data/binary>>, integer_to_list(AccLength)]}; <<Data:ChunkSize/binary, ?CR, ?LF, "0", ?CR, ?LF, Rest/binary>> -> - {NewBody,_}= stream(<<BodySoFar/binary, Data/binary>>, Stream), decode_trailer(<<?CR, ?LF, Rest/binary>>, [],[], MaxHeaderSize, - NewBody, + <<BodySoFar/binary, Data/binary>>, integer_to_list(AccLength)); %% There are more chunks, so here we go agin... <<Data:ChunkSize/binary, ?CR, ?LF>> -> - {NewBody, NewStream} = - stream(<<BodySoFar/binary, Data/binary>>, Stream), - {?MODULE, decode_size, [<<>>, [], {MaxBodySize, NewBody, AccLength, MaxHeaderSize, NewStream}]}; + NewBody = <<BodySoFar/binary, Data/binary>>, + {?MODULE, decode_size, [<<>>, [], {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]}; <<Data:ChunkSize/binary, ?CR, ?LF, Rest/binary>> when (AccLength < MaxBodySize) or (MaxBodySize == nolimit) -> - {NewBody, NewStream} = - stream(<<BodySoFar/binary, Data/binary>>, Stream), decode_size(Rest, [], - {MaxBodySize, NewBody, - AccLength, MaxHeaderSize, NewStream}); + {MaxBodySize, <<BodySoFar/binary, Data/binary>>, + AccLength, MaxHeaderSize}); <<_:ChunkSize/binary, ?CR, ?LF, _/binary>> -> throw({error, body_too_big}); _ -> @@ -286,9 +269,3 @@ decode_trailer(<<Octet, Rest/binary>>, Header, Headers, MaxHeaderSize, Body, BodyLength) -> decode_trailer(Rest, [Octet | Header], Headers, MaxHeaderSize, Body, BodyLength). - -stream(BodyPart, false) -> - {BodyPart, false}; -stream(BodyPart, {Code, Request}) -> - {NewBody, NewRequest} = httpc_handler:stream(BodyPart, Request, Code), - {NewBody, {Code, NewRequest}}. diff --git a/lib/inets/test/httpc_SUITE.erl b/lib/inets/test/httpc_SUITE.erl index 350192464e..0c35f284f7 100644 --- a/lib/inets/test/httpc_SUITE.erl +++ b/lib/inets/test/httpc_SUITE.erl @@ -1693,6 +1693,15 @@ receive_streamed_body(RequestId, Body, Pid) -> ct:print("~p:receive_streamed_body -> requested next stream ~n", [?MODULE]), receive {http, {RequestId, stream, BinBodyPart}} -> + %% Make sure the httpc hasn't sent us the next 'stream' + %% without our request. + receive + {http, {RequestId, stream, _}} = Msg -> + ct:fail({unexpected_flood_of_stream, Msg}) + after + 1000 -> + ok + end, receive_streamed_body(RequestId, <<Body/binary, BinBodyPart/binary>>, Pid); |