From 10f031e7e6b497430918a29db97d12ffe37a5b2d Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Tue, 19 Jan 2010 17:04:23 +0000 Subject: OTP-8016, OTP-8056, OTP-8103, OTP-8106, OTP-8312, OTP-8315, OTP-8327, OTP-8349, OTP-8351, OTP-8352, OTP-8359 & OTP-8371. --- lib/inets/src/http_client/httpc_handler.erl | 271 +++++++++++++++++++--------- 1 file changed, 184 insertions(+), 87 deletions(-) (limited to 'lib/inets/src/http_client/httpc_handler.erl') diff --git a/lib/inets/src/http_client/httpc_handler.erl b/lib/inets/src/http_client/httpc_handler.erl index 25f9b0777f..fec74932a2 100644 --- a/lib/inets/src/http_client/httpc_handler.erl +++ b/lib/inets/src/http_client/httpc_handler.erl @@ -28,8 +28,15 @@ %%-------------------------------------------------------------------- %% Internal Application API --export([start_link/2, connect_and_send/2, - send/2, cancel/2, stream/3, stream_next/1]). +-export([ + start_link/2, + connect_and_send/2, + send/2, + cancel/2, + stream/3, + stream_next/1, + info/1 + ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -130,6 +137,18 @@ stream_next(Pid) -> cast(stream_next, Pid). +%%-------------------------------------------------------------------- +%% Function: info(Pid) -> [{Key, Val}] +%% Pid = pid() - the pid of the http-request handler process. +%% +%% Description: +%% Returns various information related to this handler +%% Used for debugging and testing +%%-------------------------------------------------------------------- +info(Pid) -> + call(info, Pid). + + %%-------------------------------------------------------------------- %% Function: stream(BodyPart, Request, Code) -> _ %% BodyPart = binary() @@ -143,21 +162,21 @@ stream_next(Pid) -> %%-------------------------------------------------------------------- %% Request should not be streamed stream(BodyPart, Request = #request{stream = none}, _) -> - ?hcrt("stream - none", [{body_part, BodyPart}]), + ?hcrt("stream - none", []), {BodyPart, Request}; %% Stream to caller stream(BodyPart, Request = #request{stream = Self}, Code) when ((Code =:= 200) orelse (Code =:= 206)) andalso ((Self =:= self) orelse (Self =:= {self, once})) -> - ?hcrt("stream - self", [{stream, Self}, {code, Code}, {body_part, BodyPart}]), + ?hcrt("stream - self", [{stream, Self}, {code, Code}]), httpc_response:send(Request#request.from, {Request#request.id, stream, BodyPart}), {<<>>, Request}; stream(BodyPart, Request = #request{stream = Self}, 404) when (Self =:= self) orelse (Self =:= {self, once}) -> - ?hcrt("stream - self with 404", [{stream, Self}, {body_part, BodyPart}]), + ?hcrt("stream - self with 404", [{stream, Self}]), httpc_response:send(Request#request.from, {Request#request.id, stream, BodyPart}), {<<>>, Request}; @@ -167,7 +186,7 @@ stream(BodyPart, Request = #request{stream = Self}, 404) %% We keep this for backward compatibillity... stream(BodyPart, Request = #request{stream = Filename}, Code) when ((Code =:= 200) orelse (Code =:= 206)) andalso is_list(Filename) -> - ?hcrt("stream - filename", [{stream, Filename}, {code, Code}, {body_part, BodyPart}]), + ?hcrt("stream - filename", [{stream, Filename}, {code, Code}]), case file:open(Filename, [write, raw, append, delayed_write]) of {ok, Fd} -> ?hcrt("stream - file open ok", [{fd, Fd}]), @@ -179,7 +198,7 @@ stream(BodyPart, Request = #request{stream = Filename}, Code) %% Stream to file stream(BodyPart, Request = #request{stream = Fd}, Code) when ((Code =:= 200) orelse (Code =:= 206)) -> - ?hcrt("stream to file", [{stream, Fd}, {code, Code}, {body_part, BodyPart}]), + ?hcrt("stream to file", [{stream, Fd}, {code, Code}]), case file:write(Fd, BodyPart) of ok -> {<<>>, Request}; @@ -188,7 +207,7 @@ stream(BodyPart, Request = #request{stream = Fd}, Code) end; stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed - ?hcrt("stream - ignore", [{request, Request}, {body_part, BodyPart}]), + ?hcrt("stream - ignore", [{request, Request}]), {BodyPart, Request}. @@ -260,22 +279,22 @@ handle_call({connect_and_send, #request{address = Address0, end end; -handle_call(Request, _, +handle_call(#request{address = Addr} = Request, _, #state{status = Status, session = #tcp_session{socket = Socket, type = pipeline} = Session, timers = Timers, - options = Options, + options = #options{proxy = Proxy} = _Options, profile_name = ProfileName} = State) when Status =/= undefined -> - ?hcrv("new request", [{request, Request}, - {profile, ProfileName}, - {status, Status}, - {session_type, pipeline}, - {timers, Timers}]), + ?hcrv("new request on a pipeline session", + [{request, Request}, + {profile, ProfileName}, + {status, Status}, + {timers, Timers}]), - Address = handle_proxy(Request#request.address, Options#options.proxy), + Address = handle_proxy(Addr, Proxy), case httpc_request:send(Address, Request, Socket) of ok -> @@ -331,21 +350,21 @@ handle_call(Request, _, {reply, {pipeline_failed, Reason}, State} end; -handle_call(Request, _, +handle_call(#request{address = Addr} = Request, _, #state{status = Status, session = #tcp_session{socket = Socket, type = keep_alive} = Session, timers = Timers, - options = Options, + options = #options{proxy = Proxy} = _Options, profile_name = ProfileName} = State) when Status =/= undefined -> - ?hcrv("new request", [{request, Request}, - {profile, ProfileName}, - {status, Status}, - {session_type, keep_alive}]), + ?hcrv("new request on a keep-alive session", + [{request, Request}, + {profile, ProfileName}, + {status, Status}]), - Address = handle_proxy(Request#request.address, Options#options.proxy), + Address = handle_proxy(Addr, Proxy), case httpc_request:send(Address, Request, Socket) of ok -> @@ -396,7 +415,12 @@ handle_call(Request, _, {error, Reason} -> ?hcri("failed sending request", [{reason, Reason}]), {reply, {request_failed, Reason}, State} - end. + end; + + +handle_call(info, _, State) -> + Info = handler_info(State), + {reply, Info, State}. %%-------------------------------------------------------------------- @@ -441,8 +465,7 @@ handle_cast({cancel, RequestId}, {noreply, State#state{canceled = [RequestId | Canceled]}}; handle_cast(stream_next, #state{session = Session} = State) -> - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, [{active, once}]), + activate_once(Session), {noreply, State#state{once = once}}. @@ -453,7 +476,7 @@ handle_cast(stream_next, #state{session = Session} = State) -> %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- handle_info({Proto, _Socket, Data}, - #state{mfa = {Module, Function, Args} = MFA, + #state{mfa = {Module, Function, Args}, request = #request{method = Method, stream = Stream} = Request, session = Session, @@ -463,8 +486,8 @@ handle_info({Proto, _Socket, Data}, (Proto =:= httpc_handler) -> ?hcri("received data", [{proto, Proto}, - {data, Data}, - {mfa, MFA}, + {module, Module}, + {function, Function}, {method, Method}, {stream, Stream}, {session, Session}, @@ -473,14 +496,13 @@ handle_info({Proto, _Socket, Data}, FinalResult = try Module:Function([Data | Args]) of {ok, Result} -> - ?hcrd("data processed - ok", [{result, Result}]), + ?hcrd("data processed - ok", []), handle_http_msg(Result, State); {_, whole_body, _} when Method =:= head -> ?hcrd("data processed - whole body", []), handle_response(State#state{body = <<>>}); {Module, whole_body, [Body, Length]} -> - ?hcrd("data processed - whole body", - [{module, Module}, {body, Body}, {length, Length}]), + ?hcrd("data processed - whole body", [{length, Length}]), {_, Code, _} = StatusLine, {NewBody, NewRequest} = stream(Body, Request, Code), %% When we stream we will not keep the already @@ -498,25 +520,25 @@ handle_info({Proto, _Socket, Data}, {noreply, NewState#state{mfa = NewMFA, request = NewRequest}}; NewMFA -> - ?hcrd("data processed", [{new_mfa, NewMFA}]), - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + ?hcrd("data processed - new mfa", []), + activate_once(Session), {noreply, State#state{mfa = NewMFA}} catch - exit:_ -> + exit:_Exit -> + ?hcrd("data processing exit", [{exit, _Exit}]), ClientReason = {could_not_parse_as_http, Data}, ClientErrMsg = httpc_response:error(Request, ClientReason), NewState = answer_request(Request, ClientErrMsg, State), {stop, normal, NewState}; - error:_ -> + error:_Error -> + ?hcrd("data processing error", [{error, _Error}]), ClientReason = {could_not_parse_as_http, Data}, ClientErrMsg = httpc_response:error(Request, ClientReason), NewState = answer_request(Request, ClientErrMsg, State), {stop, normal, NewState} end, - ?hcri("data processed", [{result, FinalResult}]), + ?hcri("data processed", []), FinalResult; @@ -667,6 +689,9 @@ terminate(normal, request = Request, timers = Timers, pipeline = Pipeline}) -> + ?hcrt("terminate(normal) - remote close", + [{id, Id}, {profile, ProfileName}]), + %% Clobber session (catch httpc_manager:delete_session(Id, ProfileName)), @@ -776,19 +801,22 @@ new_queue(Queue, Fun) -> end, List), queue:from_list(NewList). -%%-------------------------------------------------------------------- + +%%%-------------------------------------------------------------------- %%% Internal functions -%%-------------------------------------------------------------------- +%%%-------------------------------------------------------------------- -connect(SocketType, ToAddress, #options{ipfamily = IpFamily, - ip = FromAddress, - port = FromPort}, Timeout) -> +connect(SocketType, ToAddress, + #options{ipfamily = IpFamily, + ip = FromAddress, + port = FromPort, + socket_opts = Opts0}, Timeout) -> Opts1 = case FromPort of default -> - []; + Opts0; _ -> - [{port, FromPort}] + [{port, FromPort} | Opts0] end, Opts2 = case FromAddress of @@ -814,10 +842,10 @@ connect(SocketType, ToAddress, #options{ipfamily = IpFamily, end. connect_and_send_first_request(Address, - #request{settings = Settings, - headers = Headers, - address = OrigAddress, - scheme = Scheme} = Request, + #request{settings = Settings, + headers = Headers, + address = OrigAddress, + scheme = Scheme} = Request, #state{options = Options} = State) -> ?hcrd("connect", @@ -841,13 +869,13 @@ connect_and_send_first_request(Address, client_close = ClientClose, type = SessionType}, TmpState = - State#state{request = Request, - session = Session, - mfa = init_mfa(Request, State), + State#state{request = Request, + session = Session, + mfa = init_mfa(Request, State), status_line = init_status_line(Request), - headers = undefined, - body = undefined, - status = new}, + headers = undefined, + body = undefined, + status = new}, ?hcrt("activate socket", []), activate_once(Session), NewState = activate_request_timeout(TmpState), @@ -867,12 +895,87 @@ connect_and_send_first_request(Address, {stop, Error, State#state{request = Request}} end. + +handler_info(#state{request = Request, + session = Session, + status_line = _StatusLine, + pipeline = Pipeline, + keep_alive = KeepAlive, + status = Status, + canceled = _Canceled, + options = _Options, + timers = _Timers} = _State) -> + + ?hcrt("handler info", [{request, Request}, + {session, Session}, + {pipeline, Pipeline}, + {keep_alive, KeepAlive}, + {status, Status}]), + + %% Info about the current request + RequestInfo = + case Request of + undefined -> + []; + #request{id = Id, + started = ReqStarted} -> + [{id, Id}, {started, ReqStarted}] + end, + + ?hcrt("handler info", [{request_info, RequestInfo}]), + + %% Info about the current session/socket + SessionType = Session#tcp_session.type, + QueueLen = case Session#tcp_session.type of + pipeline -> + queue:len(Pipeline); + keep_alive -> + queue:len(KeepAlive) + end, + Socket = Session#tcp_session.socket, + Scheme = Session#tcp_session.scheme, + SocketType = socket_type(Scheme), + + ?hcrt("handler info", [{session_type, SessionType}, + {queue_length, QueueLen}, + {scheme, Scheme}, + {socket_type, SocketType}, + {socket, Socket}]), + + SocketOpts = http_transport:getopts(SocketType, Socket), + SocketStats = http_transport:getstat(SocketType, Socket), + + Remote = http_transport:peername(SocketType, Socket), + Local = http_transport:sockname(SocketType, Socket), + + ?hcrt("handler info", [{remote, Remote}, + {local, Local}, + {socket_opts, SocketOpts}, + {socket_stats, SocketStats}]), + + SocketInfo = [{remote, Remote}, + {local, Local}, + {socket_opts, SocketOpts}, + {socket_stats, SocketStats}], + + SessionInfo = + [{type, SessionType}, + {queue_length, QueueLen}, + {scheme, Scheme}, + {socket_info, SocketInfo}], + + [{status, Status}, + {current_request, RequestInfo}, + {session, SessionInfo}]. + + + handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, State = #state{request = Request}) -> - ?hcrt("handle_http_msg", [{body, Body}]), + ?hcrt("handle_http_msg", [{headers, Headers}]), case Headers#http_response_h.'content-type' of "multipart/byteranges" ++ _Param -> - exit({not_yet_implemented, multypart_nyteranges}); + exit({not_yet_implemented, multypart_byteranges}); _ -> StatusLine = {Version, StatusCode, ReasonPharse}, {ok, NewRequest} = start_stream(StatusLine, Headers, Request), @@ -883,11 +986,11 @@ handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, end; handle_http_msg({ChunkedHeaders, Body}, #state{headers = Headers} = State) -> ?hcrt("handle_http_msg", - [{chunked_headers, ChunkedHeaders}, {body, Body}]), + [{chunked_headers, ChunkedHeaders}, {headers, Headers}]), NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), handle_response(State#state{headers = NewHeaders, body = Body}); handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) -> - ?hcrt("handle_http_msg", [{body, Body}, {code, Code}]), + ?hcrt("handle_http_msg", [{code, Code}]), {NewBody, NewRequest} = stream(Body, State#state.request, Code), handle_response(State#state{body = NewBody, request = NewRequest}). @@ -903,15 +1006,12 @@ handle_http_body(<<>>, State = #state{request = #request{method = head}}) -> ?hcrt("handle_http_body - head", []), handle_response(State#state{body = <<>>}); -handle_http_body(Body, State = #state{headers = Headers, - max_body_size = MaxBodySize, - status_line = {_,Code, _}, - request = Request}) -> +handle_http_body(Body, #state{headers = Headers, + max_body_size = MaxBodySize, + status_line = {_,Code, _}, + request = Request} = State) -> ?hcrt("handle_http_body", - [{headers, Headers}, - {body, Body}, - {max_body_size, MaxBodySize}, - {code, Code}]), + [{max_body_size, MaxBodySize}, {headers, Headers}, {code, Code}]), TransferEnc = Headers#http_response_h.'transfer-encoding', case case_insensitive_header(TransferEnc) of "chunked" -> @@ -1000,9 +1100,7 @@ handle_response(#state{request = Request, Session#tcp_session.socket, RequestBody), %% Wait for next response - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + activate_once(Session), Relaxed = (Request#request.settings)#http_options.relaxed, MFA = {httpc_response, parse, [State#state.max_header_size, Relaxed]}, @@ -1038,7 +1136,7 @@ handle_response(#state{request = Request, ok = httpc_manager:retry_request(TimeNewRequest, ProfileName), handle_queue(State#state{request = undefined}, Data); {ok, Msg, Data} -> - ?hcrd("handle response - ok", [{msg, Msg}, {data, Data}]), + ?hcrd("handle response - ok", []), end_stream(StatusLine, Request), NewState = answer_request(Request, Msg, State), handle_queue(NewState, Data); @@ -1137,10 +1235,7 @@ handle_pipeline(#state{status = pipeline, body = undefined}, case Data of <<>> -> - http_transport:setopts( - socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + activate_once(Session), {noreply, NewState}; _ -> %% If we already received some bytes of @@ -1164,14 +1259,12 @@ handle_keep_alive_queue( case queue:out(State#state.keep_alive) of {empty, _} -> - ?hcrd("epmty keep_alive queue", []), + ?hcrd("empty keep_alive queue", []), %% The server may choose too terminate an idle keep_alive session %% in this case we want to receive the close message %% at once and not when trying to send the next %% request. - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + activate_once(Session), %% If a keep_alive session has been idle for some time is not %% closed by the server, the client may want to close it. NewState = activate_queue_timeout(TimeOut, State), @@ -1276,9 +1369,8 @@ is_keep_alive_enabled_server("HTTP/1.0", is_keep_alive_enabled_server(_,_) -> false. -is_keep_alive_connection(Headers, Session) -> - (not ((Session#tcp_session.client_close) or - httpc_response:is_server_closing(Headers))). +is_keep_alive_connection(Headers, #tcp_session{client_close = ClientClose}) -> + (not ((ClientClose) orelse httpc_response:is_server_closing(Headers))). try_to_enable_pipeline_or_keep_alive( #state{session = Session, @@ -1286,8 +1378,12 @@ try_to_enable_pipeline_or_keep_alive( status_line = {Version, _, _}, headers = Headers, profile_name = ProfileName} = State) -> - case (is_keep_alive_enabled_server(Version, Headers) andalso - is_keep_alive_connection(Headers, Session)) of + ?hcrd("try to enable pipeline or keep-alive", + [{version, Version}, + {headers, Headers}, + {session, Session}]), + case is_keep_alive_enabled_server(Version, Headers) andalso + is_keep_alive_connection(Headers, Session) of true -> case (is_pipeline_enabled_client(Session) andalso httpc_request:is_idempotent(Method)) of @@ -1307,7 +1403,7 @@ try_to_enable_pipeline_or_keep_alive( end. answer_request(Request, Msg, #state{timers = Timers} = State) -> - ?hcrt("answer request", [{request, Request}, {msg, Msg}]), + ?hcrt("answer request", [{request, Request}]), httpc_response:send(Request#request.from, Msg), RequestTimers = Timers#timers.request_timers, TimerRef = @@ -1435,7 +1531,7 @@ socket_type(#request{scheme = https, settings = Settings}) -> socket_type(http) -> ip_comm; socket_type(https) -> - {ssl, []}. %% Dummy value ok for ex setops that does not use this value + {ssl, []}. %% Dummy value ok for ex setopts that does not use this value start_stream({_Version, _Code, _ReasonPhrase}, _Headers, #request{stream = none} = Request) -> @@ -1529,6 +1625,7 @@ handle_verbose(trace) -> handle_verbose(_) -> ok. + %%% Normaly I do not comment out code, I throw it away. But this might %%% actually be used one day if ssl is improved. %% send_ssl_tunnel_request(Address, Request = #request{address = {Host, Port}}, -- cgit v1.2.3