diff options
Diffstat (limited to 'lib/inets/src/http_client/httpc_handler.erl')
-rw-r--r-- | lib/inets/src/http_client/httpc_handler.erl | 954 |
1 files changed, 581 insertions, 373 deletions
diff --git a/lib/inets/src/http_client/httpc_handler.erl b/lib/inets/src/http_client/httpc_handler.erl index 7b737c2f86..fec74932a2 100644 --- a/lib/inets/src/http_client/httpc_handler.erl +++ b/lib/inets/src/http_client/httpc_handler.erl @@ -1,19 +1,19 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 2002-2009. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 2002-2010. All Rights Reserved. +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% %% @@ -28,7 +28,15 @@ %%-------------------------------------------------------------------- %% Internal Application API --export([start_link/3, send/2, cancel/2, stream/3, stream_next/1]). +-export([ + start_link/2, + connect_and_send/2, + send/2, + cancel/2, + stream/3, + stream_next/1, + info/1 + ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -50,7 +58,7 @@ mfa, % {Moduel, Function, Args} pipeline = queue:new(), % queue() keep_alive = queue:new(), % queue() - status = new, % new | pipeline | keep_alive | close | ssl_tunnel + status, % undefined | new | pipeline | keep_alive | close | ssl_tunnel canceled = [], % [RequestId] max_header_size = nolimit, % nolimit | integer() max_body_size = nolimit, % nolimit | integer() @@ -85,9 +93,13 @@ %% the reply or part of it has arrived.) %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- -start_link(Request, Options, ProfileName) -> - {ok, proc_lib:spawn_link(?MODULE, init, [[Request, Options, - ProfileName]])}. + +start_link(Options, ProfileName) -> + Args = [Options, ProfileName], + gen_server:start_link(?MODULE, Args, []). + +connect_and_send(Request, HandlerPid) -> + call({connect_and_send, Request}, HandlerPid). %%-------------------------------------------------------------------- @@ -126,6 +138,18 @@ stream_next(Pid) -> %%-------------------------------------------------------------------- +%% Function: info(Pid) -> [{Key, Val}] +%% Pid = pid() - the pid of the http-request handler process. +%% +%% Description: +%% Returns various information related to this handler +%% Used for debugging and testing +%%-------------------------------------------------------------------- +info(Pid) -> + call(info, Pid). + + +%%-------------------------------------------------------------------- %% Function: stream(BodyPart, Request, Code) -> _ %% BodyPart = binary() %% Request = #request{} @@ -138,21 +162,21 @@ stream_next(Pid) -> %%-------------------------------------------------------------------- %% Request should not be streamed stream(BodyPart, Request = #request{stream = none}, _) -> - ?hcrt("stream - none", [{body_part, BodyPart}]), + ?hcrt("stream - none", []), {BodyPart, Request}; %% Stream to caller stream(BodyPart, Request = #request{stream = Self}, Code) when ((Code =:= 200) orelse (Code =:= 206)) andalso ((Self =:= self) orelse (Self =:= {self, once})) -> - ?hcrt("stream - self", [{stream, Self}, {code, Code}, {body_part, BodyPart}]), + ?hcrt("stream - self", [{stream, Self}, {code, Code}]), httpc_response:send(Request#request.from, {Request#request.id, stream, BodyPart}), {<<>>, Request}; stream(BodyPart, Request = #request{stream = Self}, 404) when (Self =:= self) orelse (Self =:= {self, once}) -> - ?hcrt("stream - self with 404", [{stream, Self}, {body_part, BodyPart}]), + ?hcrt("stream - self with 404", [{stream, Self}]), httpc_response:send(Request#request.from, {Request#request.id, stream, BodyPart}), {<<>>, Request}; @@ -162,7 +186,7 @@ stream(BodyPart, Request = #request{stream = Self}, 404) %% We keep this for backward compatibillity... stream(BodyPart, Request = #request{stream = Filename}, Code) when ((Code =:= 200) orelse (Code =:= 206)) andalso is_list(Filename) -> - ?hcrt("stream - filename", [{stream, Filename}, {code, Code}, {body_part, BodyPart}]), + ?hcrt("stream - filename", [{stream, Filename}, {code, Code}]), case file:open(Filename, [write, raw, append, delayed_write]) of {ok, Fd} -> ?hcrt("stream - file open ok", [{fd, Fd}]), @@ -174,7 +198,7 @@ stream(BodyPart, Request = #request{stream = Filename}, Code) %% Stream to file stream(BodyPart, Request = #request{stream = Fd}, Code) when ((Code =:= 200) orelse (Code =:= 206)) -> - ?hcrt("stream to file", [{stream, Fd}, {code, Code}, {body_part, BodyPart}]), + ?hcrt("stream to file", [{stream, Fd}, {code, Code}]), case file:write(Fd, BodyPart) of ok -> {<<>>, Request}; @@ -183,7 +207,7 @@ stream(BodyPart, Request = #request{stream = Fd}, Code) end; stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed - ?hcrt("stream - ignore", [{request, Request}, {body_part, BodyPart}]), + ?hcrt("stream - ignore", [{request, Request}]), {BodyPart, Request}. @@ -192,10 +216,9 @@ stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed %%==================================================================== %%-------------------------------------------------------------------- -%% Function: init([Request, Options, ProfileName]) -> {ok, State} | -%% {ok, State, Timeout} | ignore |{stop, Reason} +%% Function: init([Options, ProfileName]) -> {ok, State} | +%% {ok, State, Timeout} | ignore | {stop, Reason} %% -%% Request = #request{} %% Options = #options{} %% ProfileName = atom() - id of httpc manager process %% @@ -206,30 +229,16 @@ stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed %% but we do not want that so errors will be handled by the process %% sending an init_error message to itself. %%-------------------------------------------------------------------- -init([Request, Options, ProfileName]) -> +init([Options, ProfileName]) -> + ?hcrv("init - starting", [{options, Options}, {profile, ProfileName}]), process_flag(trap_exit, true), - handle_verbose(Options#options.verbose), - Address = handle_proxy(Request#request.address, Options#options.proxy), - {ok, State} = - case {Address /= Request#request.address, Request#request.scheme} of - {true, https} -> - Error = https_through_proxy_is_not_currently_supported, - self() ! {init_error, - Error, httpc_response:error(Request, Error)}, - {ok, #state{request = Request, options = Options, - status = ssl_tunnel}}; - %% This is what we should do if and when ssl supports - %% "socket upgrading" - %%send_ssl_tunnel_request(Address, Request, - %% #state{options = Options, - %% status = ssl_tunnel}); - {_, _} -> - send_first_request(Address, Request, - #state{options = Options, - profile_name = ProfileName}) - end, - gen_server:enter_loop(?MODULE, [], State). + State = #state{status = undefined, + options = Options, + profile_name = ProfileName}, + ?hcrd("init - started", []), + {ok, State}. + %%-------------------------------------------------------------------- %% Function: handle_call(Request, From, State) -> {reply, Reply, State} | @@ -240,39 +249,85 @@ init([Request, Options, ProfileName]) -> %% {stop, Reason, State} (terminate/2 is called) %% Description: Handling call messages %%-------------------------------------------------------------------- -handle_call(Request, _, State = #state{session = Session = - #tcp_session{socket = Socket, - type = pipeline}, - timers = Timers, - options = Options, - profile_name = ProfileName}) -> - Address = handle_proxy(Request#request.address, Options#options.proxy), + + +%% This is the first request, the reason the proc was started +handle_call({connect_and_send, #request{address = Address0, + scheme = Scheme} = Request}, + _From, + #state{options = #options{proxy = Proxy}, + status = undefined, + session = undefined} = State) -> + ?hcrv("connect and send", [{address0, Address0}, {proxy, Proxy}]), + Address = handle_proxy(Address0, Proxy), + if + ((Address =/= Address0) andalso (Scheme =:= https)) -> + %% This is what we should do if and when ssl supports + %% "socket upgrading" + %%send_ssl_tunnel_request(Address, Request, + %% #state{options = Options, + %% status = ssl_tunnel}); + Reason = https_through_proxy_is_not_currently_supported, + Error = {error, Reason}, + {stop, Error, Error, State}; + true -> + case connect_and_send_first_request(Address, Request, State) of + {ok, NewState} -> + {reply, ok, NewState}; + {stop, Error, NewState} -> + {stop, Error, Error, NewState} + end + end; + +handle_call(#request{address = Addr} = Request, _, + #state{status = Status, + session = #tcp_session{socket = Socket, + type = pipeline} = Session, + timers = Timers, + options = #options{proxy = Proxy} = _Options, + profile_name = ProfileName} = State) + when Status =/= undefined -> + + ?hcrv("new request on a pipeline session", + [{request, Request}, + {profile, ProfileName}, + {status, Status}, + {timers, Timers}]), + + Address = handle_proxy(Addr, Proxy), case httpc_request:send(Address, Request, Socket) of ok -> + + ?hcrd("request sent", []), + %% Activate the request time out for the new request - NewState = activate_request_timeout(State#state{request = - Request}), + NewState = + activate_request_timeout(State#state{request = Request}), + + ClientClose = + httpc_request:is_client_closing(Request#request.headers), - ClientClose = httpc_request:is_client_closing( - Request#request.headers), case State#state.request of - #request{} -> %% Old request no yet finished + #request{} -> %% Old request not yet finished + ?hcrd("old request still not finished", []), %% Make sure to use the new value of timers in state - NewTimers = NewState#state.timers, + NewTimers = NewState#state.timers, NewPipeline = queue:in(Request, State#state.pipeline), - NewSession = + NewSession = Session#tcp_session{queue_length = %% Queue + current queue:len(NewPipeline) + 1, client_close = ClientClose}, httpc_manager:insert_session(NewSession, ProfileName), + ?hcrd("session updated", []), {reply, ok, State#state{pipeline = NewPipeline, - session = NewSession, - timers = NewTimers}}; + session = NewSession, + timers = NewTimers}}; undefined -> - %% Note: tcp-message reciving has already been + %% Note: tcp-message receiving has already been %% activated by handle_pipeline/2. + ?hcrd("no current request", []), cancel_timer(Timers#timers.queue_timer, timeout_queue), NewSession = @@ -281,54 +336,67 @@ handle_call(Request, _, State = #state{session = Session = httpc_manager:insert_session(NewSession, ProfileName), Relaxed = (Request#request.settings)#http_options.relaxed, - {reply, ok, - NewState#state{request = Request, - session = NewSession, - mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, - timers = - Timers#timers{queue_timer = - undefined}}} + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, + NewTimers = Timers#timers{queue_timer = undefined}, + ?hcrd("session created", []), + {reply, ok, NewState#state{request = Request, + session = NewSession, + mfa = MFA, + timers = NewTimers}} end; {error, Reason} -> + ?hcri("failed sending request", [{reason, Reason}]), {reply, {pipeline_failed, Reason}, State} end; -handle_call(Request, _, #state{session = Session = - #tcp_session{type = keep_alive, - socket = Socket}, - timers = Timers, - options = Options, - profile_name = ProfileName} = State) -> - - ClientClose = httpc_request:is_client_closing(Request#request.headers), +handle_call(#request{address = Addr} = Request, _, + #state{status = Status, + session = #tcp_session{socket = Socket, + type = keep_alive} = Session, + timers = Timers, + options = #options{proxy = Proxy} = _Options, + profile_name = ProfileName} = State) + when Status =/= undefined -> - Address = handle_proxy(Request#request.address, - Options#options.proxy), + ?hcrv("new request on a keep-alive session", + [{request, Request}, + {profile, ProfileName}, + {status, Status}]), + + Address = handle_proxy(Addr, Proxy), case httpc_request:send(Address, Request, Socket) of ok -> + + ?hcrd("request sent", []), + + %% Activate the request time out for the new request NewState = - activate_request_timeout(State#state{request = - Request}), + activate_request_timeout(State#state{request = Request}), + + ClientClose = + httpc_request:is_client_closing(Request#request.headers), case State#state.request of #request{} -> %% Old request not yet finished %% Make sure to use the new value of timers in state - NewTimers = NewState#state.timers, + ?hcrd("old request still not finished", []), + NewTimers = NewState#state.timers, NewKeepAlive = queue:in(Request, State#state.keep_alive), - NewSession = + NewSession = Session#tcp_session{queue_length = %% Queue + current queue:len(NewKeepAlive) + 1, client_close = ClientClose}, httpc_manager:insert_session(NewSession, ProfileName), + ?hcrd("session updated", []), {reply, ok, State#state{keep_alive = NewKeepAlive, - session = NewSession, - timers = NewTimers}}; + session = NewSession, + timers = NewTimers}}; undefined -> %% Note: tcp-message reciving has already been %% activated by handle_pipeline/2. + ?hcrd("no current request", []), cancel_timer(Timers#timers.queue_timer, timeout_queue), NewSession = @@ -337,16 +405,23 @@ handle_call(Request, _, #state{session = Session = httpc_manager:insert_session(NewSession, ProfileName), Relaxed = (Request#request.settings)#http_options.relaxed, - {reply, ok, - NewState#state{request = Request, - session = NewSession, - mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}}} + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, + {reply, ok, NewState#state{request = Request, + session = NewSession, + mfa = MFA}} end; - {error, Reason} -> + + {error, Reason} -> + ?hcri("failed sending request", [{reason, Reason}]), {reply, {request_failed, Reason}, State} - end. + end; + + +handle_call(info, _, State) -> + Info = handler_info(State), + {reply, Info, State}. + %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | @@ -367,19 +442,30 @@ handle_call(Request, _, #state{session = Session = %% handle_keep_alive_queue/2 on the other hand will just skip the %% request as if it was never issued as in this case the request will %% not have been sent. -handle_cast({cancel, RequestId}, State = #state{request = Request = - #request{id = RequestId}, - profile_name = ProfileName}) -> +handle_cast({cancel, RequestId}, + #state{request = #request{id = RequestId} = Request, + profile_name = ProfileName, + canceled = Canceled} = State) -> + ?hcrv("cancel current request", [{request_id, RequestId}, + {profile, ProfileName}, + {canceled, Canceled}]), httpc_manager:request_canceled(RequestId, ProfileName), + ?hcrv("canceled", []), {stop, normal, - State#state{canceled = [RequestId | State#state.canceled], - request = Request#request{from = answer_sent}}}; -handle_cast({cancel, RequestId}, State = #state{profile_name = ProfileName}) -> + State#state{canceled = [RequestId | Canceled], + request = Request#request{from = answer_sent}}}; +handle_cast({cancel, RequestId}, + #state{profile_name = ProfileName, + canceled = Canceled} = State) -> + ?hcrv("cancel", [{request_id, RequestId}, + {profile, ProfileName}, + {canceled, Canceled}]), httpc_manager:request_canceled(RequestId, ProfileName), - {noreply, State#state{canceled = [RequestId | State#state.canceled]}}; + ?hcrv("canceled", []), + {noreply, State#state{canceled = [RequestId | Canceled]}}; + handle_cast(stream_next, #state{session = Session} = State) -> - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, [{active, once}]), + activate_once(Session), {noreply, State#state{once = once}}. @@ -390,7 +476,7 @@ handle_cast(stream_next, #state{session = Session} = State) -> %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- handle_info({Proto, _Socket, Data}, - #state{mfa = {Module, Function, Args} = MFA, + #state{mfa = {Module, Function, Args}, request = #request{method = Method, stream = Stream} = Request, session = Session, @@ -399,64 +485,68 @@ handle_info({Proto, _Socket, Data}, (Proto =:= ssl) orelse (Proto =:= httpc_handler) -> - ?hcri("received data", [{proto, Proto}, {data, Data}, {mfa, MFA}, {method, Method}, {stream, Stream}, {session, Session}, {status_line, StatusLine}]), + ?hcri("received data", [{proto, Proto}, + {module, Module}, + {function, Function}, + {method, Method}, + {stream, Stream}, + {session, Session}, + {status_line, StatusLine}]), FinalResult = try Module:Function([Data | Args]) of {ok, Result} -> - ?hcrd("data processed - ok", [{result, Result}]), + ?hcrd("data processed - ok", []), handle_http_msg(Result, State); {_, whole_body, _} when Method =:= head -> ?hcrd("data processed - whole body", []), handle_response(State#state{body = <<>>}); {Module, whole_body, [Body, Length]} -> - ?hcrd("data processed - whole body", [{module, Module}, {body, Body}, {length, Length}]), + ?hcrd("data processed - whole body", [{length, Length}]), {_, Code, _} = StatusLine, {NewBody, NewRequest} = stream(Body, Request, Code), %% When we stream we will not keep the already %% streamed data, that would be a waste of memory. - NewLength = case Stream of - none -> - Length; - _ -> - Length - size(Body) - end, + NewLength = + case Stream of + none -> + Length; + _ -> + Length - size(Body) + end, NewState = next_body_chunk(State), - - {noreply, NewState#state{mfa = {Module, whole_body, - [NewBody, NewLength]}, + NewMFA = {Module, whole_body, [NewBody, NewLength]}, + {noreply, NewState#state{mfa = NewMFA, request = NewRequest}}; NewMFA -> - ?hcrd("data processed", [{new_mfa, NewMFA}]), - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + ?hcrd("data processed - new mfa", []), + activate_once(Session), {noreply, State#state{mfa = NewMFA}} catch - exit:_ -> - ClientErrMsg = httpc_response:error(Request, - {could_not_parse_as_http, - Data}), - NewState = answer_request(Request, ClientErrMsg, State), + exit:_Exit -> + ?hcrd("data processing exit", [{exit, _Exit}]), + ClientReason = {could_not_parse_as_http, Data}, + ClientErrMsg = httpc_response:error(Request, ClientReason), + NewState = answer_request(Request, ClientErrMsg, State), {stop, normal, NewState}; - error:_ -> - ClientErrMsg = httpc_response:error(Request, - {could_not_parse_as_http, - Data}), - NewState = answer_request(Request, ClientErrMsg, State), + error:_Error -> + ?hcrd("data processing error", [{error, _Error}]), + ClientReason = {could_not_parse_as_http, Data}, + ClientErrMsg = httpc_response:error(Request, ClientReason), + NewState = answer_request(Request, ClientErrMsg, State), {stop, normal, NewState} end, - ?hcri("data processed", [{result, FinalResult}]), + ?hcri("data processed", []), FinalResult; handle_info({Proto, Socket, Data}, - #state{mfa = MFA, - request = Request, - session = Session, - status = Status, + #state{mfa = MFA, + request = Request, + session = Session, + status = Status, status_line = StatusLine, profile_name = Profile} = State) when (Proto =:= tcp) orelse @@ -474,6 +564,7 @@ handle_info({Proto, Socket, Data}, "~n", [Proto, Socket, Data, MFA, Request, Session, Status, StatusLine, Profile]), + {noreply, State}; @@ -513,28 +604,35 @@ handle_info({ssl_error, _, _} = Reason, State) -> handle_info({timeout, RequestId}, #state{request = #request{id = RequestId} = Request, canceled = Canceled} = State) -> + ?hcri("timeout of current request", [{id, RequestId}]), httpc_response:send(Request#request.from, - httpc_response:error(Request,timeout)), + httpc_response:error(Request, timeout)), + ?hcrv("response (timeout) sent - now terminate", []), {stop, normal, State#state{request = Request#request{from = answer_sent}, canceled = [RequestId | Canceled]}}; handle_info({timeout, RequestId}, #state{canceled = Canceled} = State) -> + ?hcri("timeout", [{id, RequestId}]), Filter = fun(#request{id = Id, from = From} = Request) when Id =:= RequestId -> + ?hcrv("found request", [{id, Id}, {from, From}]), %% Notify the owner Response = httpc_response:error(Request, timeout), httpc_response:send(From, Response), + ?hcrv("response (timeout) sent", []), [Request#request{from = answer_sent}]; (_) -> true end, case State#state.status of pipeline -> + ?hcrd("pipeline", []), Pipeline = queue:filter(Filter, State#state.pipeline), {noreply, State#state{canceled = [RequestId | Canceled], pipeline = Pipeline}}; keep_alive -> + ?hcrd("keep_alive", []), KeepAlive = queue:filter(Filter, State#state.keep_alive), {noreply, State#state{canceled = [RequestId | Canceled], keep_alive = KeepAlive}} @@ -577,9 +675,10 @@ terminate(normal, #state{session = undefined}) -> %% Init error sending, no session information has been setup but %% there is a socket that needs closing. -terminate(normal, #state{request = Request, - session = #tcp_session{id = undefined, - socket = Socket}}) -> +terminate(normal, + #state{request = Request, + session = #tcp_session{id = undefined, + socket = Socket}}) -> http_transport:close(socket_type(Request), Socket); %% Socket closed remotely @@ -590,6 +689,9 @@ terminate(normal, request = Request, timers = Timers, pipeline = Pipeline}) -> + ?hcrt("terminate(normal) - remote close", + [{id, Id}, {profile, ProfileName}]), + %% Clobber session (catch httpc_manager:delete_session(Id, ProfileName)), @@ -605,23 +707,28 @@ terminate(normal, %% And, just in case, close our side (**really** overkill) http_transport:close(socket_type(Request), Socket); -terminate(_, State = #state{session = Session, - request = undefined, - profile_name = ProfileName, - timers = Timers, - pipeline = Pipeline, - keep_alive = KeepAlive}) -> - catch httpc_manager:delete_session(Session#tcp_session.id, - ProfileName), +terminate(_, #state{session = #tcp_session{id = Id, + socket = Socket, + scheme = Scheme}, + request = undefined, + profile_name = ProfileName, + timers = Timers, + pipeline = Pipeline, + keep_alive = KeepAlive} = State) -> + (catch httpc_manager:delete_session(Id, ProfileName)), maybe_retry_queue(Pipeline, State), maybe_retry_queue(KeepAlive, State), cancel_timer(Timers#timers.queue_timer, timeout_queue), - Socket = Session#tcp_session.socket, - http_transport:close(socket_type(Session#tcp_session.scheme), Socket); + http_transport:close(socket_type(Scheme), Socket); + +terminate(Reason, #state{request = undefined}) -> + ?hcrt("terminate", [{reason, Reason}]), + ok; -terminate(Reason, State = #state{request = Request}) -> +terminate(Reason, #state{request = Request} = State) -> + ?hcrd("terminate", [{reason, Reason}, {request, Request}]), NewState = maybe_send_answer(Request, httpc_response:error(Request, Reason), State), @@ -641,13 +748,16 @@ maybe_send_answer(Request, Answer, State) -> answer_request(Request, Answer, State). deliver_answers([]) -> + ?hcrd("deliver answer done", []), ok; -deliver_answers([#request{from = From} = Request | Requests]) +deliver_answers([#request{id = Id, from = From} = Request | Requests]) when is_pid(From) -> Response = httpc_response:error(Request, socket_closed_remotely), + ?hcrd("deliver answer", [{id, Id}, {from, From}, {response, Response}]), httpc_response:send(From, Response), deliver_answers(Requests); -deliver_answers([_|Requests]) -> +deliver_answers([Request|Requests]) -> + ?hcrd("skip deliver answer", [{request, Request}]), deliver_answers(Requests). @@ -691,19 +801,22 @@ new_queue(Queue, Fun) -> end, List), queue:from_list(NewList). -%%-------------------------------------------------------------------- + +%%%-------------------------------------------------------------------- %%% Internal functions -%%-------------------------------------------------------------------- +%%%-------------------------------------------------------------------- -connect(SocketType, ToAddress, #options{ipfamily = IpFamily, - ip = FromAddress, - port = FromPort}, Timeout) -> +connect(SocketType, ToAddress, + #options{ipfamily = IpFamily, + ip = FromAddress, + port = FromPort, + socket_opts = Opts0}, Timeout) -> Opts1 = case FromPort of default -> - []; + Opts0; _ -> - [{port, FromPort}] + [{port, FromPort} | Opts0] end, Opts2 = case FromAddress of @@ -728,101 +841,157 @@ connect(SocketType, ToAddress, #options{ipfamily = IpFamily, http_transport:connect(SocketType, ToAddress, Opts3, Timeout) end. - -send_first_request(Address, Request, #state{options = Options} = State) -> - SocketType = socket_type(Request), - ConnTimeout = (Request#request.settings)#http_options.connect_timeout, - ?hcri("connect", +connect_and_send_first_request(Address, + #request{settings = Settings, + headers = Headers, + address = OrigAddress, + scheme = Scheme} = Request, + #state{options = Options} = State) -> + + ?hcrd("connect", [{address, Address}, {request, Request}, {options, Options}]), + + SocketType = socket_type(Request), + ConnTimeout = Settings#http_options.connect_timeout, case connect(SocketType, Address, Options, ConnTimeout) of {ok, Socket} -> - ?hcri("connected - now send first request", [{socket, Socket}]), + ?hcrd("connected - now send first request", [{socket, Socket}]), case httpc_request:send(Address, Request, Socket) of ok -> - ?hcri("first request sent", []), + ?hcrd("first request sent", []), ClientClose = - httpc_request:is_client_closing( - Request#request.headers), + httpc_request:is_client_closing(Headers), SessionType = httpc_manager:session_type(Options), Session = - #tcp_session{id = {Request#request.address, self()}, - scheme = Request#request.scheme, - socket = Socket, + #tcp_session{id = {OrigAddress, self()}, + scheme = Scheme, + socket = Socket, client_close = ClientClose, - type = SessionType}, - TmpState = State#state{request = Request, - session = Session, - mfa = init_mfa(Request, State), - status_line = - init_status_line(Request), - headers = undefined, - body = undefined, - status = new}, - http_transport:setopts(SocketType, - Socket, [{active, once}]), + type = SessionType}, + TmpState = + State#state{request = Request, + session = Session, + mfa = init_mfa(Request, State), + status_line = init_status_line(Request), + headers = undefined, + body = undefined, + status = new}, + ?hcrt("activate socket", []), + activate_once(Session), NewState = activate_request_timeout(TmpState), {ok, NewState}; - {error, Reason} -> - %% Commented out in wait of ssl support to avoid - %% dialyzer warning - %%case State#state.status of - %% new -> % Called from init/1 - self() ! {init_error, error_sending, - httpc_response:error(Request, Reason)}, - {ok, State#state{request = Request, - session = - #tcp_session{socket = Socket}}} - %%ssl_tunnel -> % Not called from init/1 - %% NewState = - %% answer_request(Request, - %%httpc_response:error(Request, - %%Reason), - %% State), - %% {stop, normal, NewState} - %% end + {error, Reason} -> + ?hcrv("failed sending request", [{reason, Reason}]), + Error = {error, {send_failed, + httpc_response:error(Request, Reason)}}, + {stop, Error, State#state{request = Request}} end; - {error, Reason} -> - %% Commented out in wait of ssl support to avoid - %% dialyzer warning - %% case State#state.status of - %% new -> % Called from init/1 - self() ! {init_error, error_connecting, - httpc_response:error(Request, Reason)}, - {ok, State#state{request = Request}} - %% ssl_tunnel -> % Not called from init/1 - %% NewState = - %% answer_request(Request, - %% httpc_response:error(Request, - %% Reason), - %% State), - %% {stop, normal, NewState} - %%end + {error, Reason} -> + ?hcri("connect failed", [{reason, Reason}]), + Error = {error, {connect_failed, + httpc_response:error(Request, Reason)}}, + {stop, Error, State#state{request = Request}} end. + +handler_info(#state{request = Request, + session = Session, + status_line = _StatusLine, + pipeline = Pipeline, + keep_alive = KeepAlive, + status = Status, + canceled = _Canceled, + options = _Options, + timers = _Timers} = _State) -> + + ?hcrt("handler info", [{request, Request}, + {session, Session}, + {pipeline, Pipeline}, + {keep_alive, KeepAlive}, + {status, Status}]), + + %% Info about the current request + RequestInfo = + case Request of + undefined -> + []; + #request{id = Id, + started = ReqStarted} -> + [{id, Id}, {started, ReqStarted}] + end, + + ?hcrt("handler info", [{request_info, RequestInfo}]), + + %% Info about the current session/socket + SessionType = Session#tcp_session.type, + QueueLen = case Session#tcp_session.type of + pipeline -> + queue:len(Pipeline); + keep_alive -> + queue:len(KeepAlive) + end, + Socket = Session#tcp_session.socket, + Scheme = Session#tcp_session.scheme, + SocketType = socket_type(Scheme), + + ?hcrt("handler info", [{session_type, SessionType}, + {queue_length, QueueLen}, + {scheme, Scheme}, + {socket_type, SocketType}, + {socket, Socket}]), + + SocketOpts = http_transport:getopts(SocketType, Socket), + SocketStats = http_transport:getstat(SocketType, Socket), + + Remote = http_transport:peername(SocketType, Socket), + Local = http_transport:sockname(SocketType, Socket), + + ?hcrt("handler info", [{remote, Remote}, + {local, Local}, + {socket_opts, SocketOpts}, + {socket_stats, SocketStats}]), + + SocketInfo = [{remote, Remote}, + {local, Local}, + {socket_opts, SocketOpts}, + {socket_stats, SocketStats}], + + SessionInfo = + [{type, SessionType}, + {queue_length, QueueLen}, + {scheme, Scheme}, + {socket_info, SocketInfo}], + + [{status, Status}, + {current_request, RequestInfo}, + {session, SessionInfo}]. + + + handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, State = #state{request = Request}) -> - ?hcrt("handle_http_msg", [{body, Body}]), + ?hcrt("handle_http_msg", [{headers, Headers}]), case Headers#http_response_h.'content-type' of "multipart/byteranges" ++ _Param -> - exit(not_yet_implemented); + exit({not_yet_implemented, multypart_byteranges}); _ -> - StatusLine = {Version, StatusCode, ReasonPharse}, + StatusLine = {Version, StatusCode, ReasonPharse}, {ok, NewRequest} = start_stream(StatusLine, Headers, Request), handle_http_body(Body, State#state{request = NewRequest, status_line = StatusLine, headers = Headers}) end; -handle_http_msg({ChunkedHeaders, Body}, - State = #state{headers = Headers}) -> - ?hcrt("handle_http_msg", [{chunked_headers, ChunkedHeaders}, {body, Body}]), +handle_http_msg({ChunkedHeaders, Body}, #state{headers = Headers} = State) -> + ?hcrt("handle_http_msg", + [{chunked_headers, ChunkedHeaders}, {headers, Headers}]), NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), handle_response(State#state{headers = NewHeaders, body = Body}); -handle_http_msg(Body, State = #state{status_line = {_,Code, _}}) -> - ?hcrt("handle_http_msg", [{body, Body}, {code, Code}]), - {NewBody, NewRequest}= stream(Body, State#state.request, Code), +handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) -> + ?hcrt("handle_http_msg", [{code, Code}]), + {NewBody, NewRequest} = stream(Body, State#state.request, Code), handle_response(State#state{body = NewBody, request = NewRequest}). handle_http_body(<<>>, State = #state{status_line = {_,304, _}}) -> @@ -837,11 +1006,12 @@ handle_http_body(<<>>, State = #state{request = #request{method = head}}) -> ?hcrt("handle_http_body - head", []), handle_response(State#state{body = <<>>}); -handle_http_body(Body, State = #state{headers = Headers, - max_body_size = MaxBodySize, - status_line = {_,Code, _}, - request = Request}) -> - ?hcrt("handle_http_body", [{body, Body}, {max_body_size, MaxBodySize}, {code, Code}]), +handle_http_body(Body, #state{headers = Headers, + max_body_size = MaxBodySize, + status_line = {_,Code, _}, + request = Request} = State) -> + ?hcrt("handle_http_body", + [{max_body_size, MaxBodySize}, {headers, Headers}, {code, Code}]), TransferEnc = Headers#http_response_h.'transfer-encoding', case case_insensitive_header(TransferEnc) of "chunked" -> @@ -850,12 +1020,17 @@ handle_http_body(Body, State = #state{headers = Headers, State#state.max_header_size, {Code, Request}) of {Module, Function, Args} -> - ?hcrt("handle_http_body - new mfa", [{module, Module}, {function, Function}, {args, Args}]), + ?hcrt("handle_http_body - new mfa", + [{module, Module}, + {function, Function}, + {args, Args}]), NewState = next_body_chunk(State), {noreply, NewState#state{mfa = {Module, Function, Args}}}; {ok, {ChunkedHeaders, NewBody}} -> - ?hcrt("handle_http_body - nyew body", [{chunked_headers, ChunkedHeaders}, {new_body, NewBody}]), + ?hcrt("handle_http_body - new body", + [{chunked_headers, ChunkedHeaders}, + {new_body, NewBody}]), NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), handle_response(State#state{headers = NewHeaders, @@ -872,12 +1047,13 @@ handle_http_body(Body, State = #state{headers = Headers, ?hcrt("handle_http_body - other", []), Length = list_to_integer(Headers#http_response_h.'content-length'), - case ((Length =< MaxBodySize) or (MaxBodySize == nolimit)) of + case ((Length =< MaxBodySize) orelse (MaxBodySize =:= nolimit)) of true -> case httpc_response:whole_body(Body, Length) of {ok, Body} -> - {NewBody, NewRequest}= stream(Body, Request, Code), - handle_response(State#state{body = NewBody, + {NewBody, NewRequest} = + stream(Body, Request, Code), + handle_response(State#state{body = NewBody, request = NewRequest}); MFA -> NewState = next_body_chunk(State), @@ -893,90 +1069,79 @@ handle_http_body(Body, State = #state{headers = Headers, end end. -%%% Normaly I do not comment out code, I throw it away. But this might -%%% actually be used on day if ssl is improved. -%% handle_response(State = #state{status = ssl_tunnel, -%% request = Request, -%% options = Options, -%% session = #tcp_session{socket = Socket, -%% scheme = Scheme}, -%% status_line = {_, 200, _}}) -> -%% %%% Insert code for upgrading the socket if and when ssl supports this. -%% Address = handle_proxy(Request#request.address, Options#options.proxy), -%% send_first_request(Address, Request, State); -%% handle_response(State = #state{status = ssl_tunnel, -%% request = Request}) -> -%% NewState = answer_request(Request, -%% httpc_response:error(Request, -%% ssl_proxy_tunnel_failed), -%% State), -%% {stop, normal, NewState}; - -handle_response(State = #state{status = new}) -> - handle_response(try_to_enable_pipeline_or_keep_alive(State)); - -handle_response(State = - #state{request = Request, +handle_response(#state{status = new} = State) -> + ?hcrd("handle response - status = new", []), + handle_response(try_to_enable_pipeline_or_keep_alive(State)); + +handle_response(#state{request = Request, status = Status, session = Session, status_line = StatusLine, headers = Headers, body = Body, options = Options, - profile_name = ProfileName}) when Status =/= new -> - ?hcrt("handle response", [{status, Status}, {session, Session}, {status_line, StatusLine}, {profile_name, ProfileName}]), + profile_name = ProfileName} = State) + when Status =/= new -> + + ?hcrd("handle response", [{profile, ProfileName}, + {status, Status}, + {request, Request}, + {session, Session}, + {status_line, StatusLine}]), + handle_cookies(Headers, Request, Options, ProfileName), case httpc_response:result({StatusLine, Headers, Body}, Request) of %% 100-continue continue -> + ?hcrd("handle response - continue", []), %% Send request body {_, RequestBody} = Request#request.content, http_transport:send(socket_type(Session#tcp_session.scheme), Session#tcp_session.socket, RequestBody), %% Wait for next response - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + activate_once(Session), Relaxed = (Request#request.settings)#http_options.relaxed, - {noreply, - State#state{mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, - status_line = undefined, - headers = undefined, - body = undefined - }}; + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, + {noreply, State#state{mfa = MFA, + status_line = undefined, + headers = undefined, + body = undefined}}; + %% Ignore unexpected 100-continue response and receive the %% actual response that the server will send right away. {ignore, Data} -> + ?hcrd("handle response - ignore", [{data, Data}]), Relaxed = (Request#request.settings)#http_options.relaxed, - NewState = State#state{mfa = - {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, + NewState = State#state{mfa = MFA, status_line = undefined, - headers = undefined, - body = undefined}, + headers = undefined, + body = undefined}, handle_info({httpc_handler, dummy, Data}, NewState); + %% On a redirect or retry the current request becomes %% obsolete and the manager will create a new request %% with the same id as the current. {redirect, NewRequest, Data} -> - ?hcrt("handle response - redirect", [{new_request, NewRequest}, {data, Data}]), + ?hcrt("handle response - redirect", + [{new_request, NewRequest}, {data, Data}]), ok = httpc_manager:redirect_request(NewRequest, ProfileName), handle_queue(State#state{request = undefined}, Data); {retry, TimeNewRequest, Data} -> - ?hcrt("handle response - retry", [{time_new_request, TimeNewRequest}, {data, Data}]), + ?hcrt("handle response - retry", + [{time_new_request, TimeNewRequest}, {data, Data}]), ok = httpc_manager:retry_request(TimeNewRequest, ProfileName), handle_queue(State#state{request = undefined}, Data); {ok, Msg, Data} -> - ?hcrt("handle response - result ok", [{msg, Msg}, {data, Data}]), + ?hcrd("handle response - ok", []), end_stream(StatusLine, Request), NewState = answer_request(Request, Msg, State), handle_queue(NewState, Data); {stop, Msg} -> - ?hcrt("handle response - result stop", [{msg, Msg}]), + ?hcrd("handle response - stop", [{msg, Msg}]), end_stream(StatusLine, Request), NewState = answer_request(Request, Msg, State), {stop, normal, NewState} @@ -990,60 +1155,67 @@ handle_cookies(_,_, #options{cookies = verify}, _) -> ok; handle_cookies(Headers, Request, #options{cookies = enabled}, ProfileName) -> {Host, _ } = Request#request.address, - Cookies = http_cookie:cookies(Headers#http_response_h.other, + Cookies = httpc_cookie:cookies(Headers#http_response_h.other, Request#request.path, Host), httpc_manager:store_cookies(Cookies, Request#request.address, ProfileName). %% This request could not be pipelined or used as sequential keept alive %% queue -handle_queue(State = #state{status = close}, _) -> +handle_queue(#state{status = close} = State, _) -> {stop, normal, State}; -handle_queue(State = #state{status = keep_alive}, Data) -> +handle_queue(#state{status = keep_alive} = State, Data) -> handle_keep_alive_queue(State, Data); -handle_queue(State = #state{status = pipeline}, Data) -> +handle_queue(#state{status = pipeline} = State, Data) -> handle_pipeline(State, Data). -handle_pipeline(State = - #state{status = pipeline, session = Session, +handle_pipeline(#state{status = pipeline, + session = Session, profile_name = ProfileName, - options = #options{pipeline_timeout = TimeOut}}, - Data) -> + options = #options{pipeline_timeout = TimeOut}} = + State, + Data) -> + + ?hcrd("handle pipeline", [{profile, ProfileName}, + {session, Session}, + {timeout, TimeOut}]), + case queue:out(State#state.pipeline) of {empty, _} -> + ?hcrd("epmty pipeline queue", []), + %% The server may choose too teminate an idle pipeline %% in this case we want to receive the close message %% at once and not when trying to pipeline the next %% request. - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + activate_once(Session), + %% If a pipeline that has been idle for some time is not %% closed by the server, the client may want to close it. - NewState = activate_queue_timeout(TimeOut, State), + NewState = activate_queue_timeout(TimeOut, State), NewSession = Session#tcp_session{queue_length = 0}, httpc_manager:insert_session(NewSession, ProfileName), %% Note mfa will be initilized when a new request %% arrives. {noreply, - NewState#state{request = undefined, - mfa = undefined, + NewState#state{request = undefined, + mfa = undefined, status_line = undefined, - headers = undefined, - body = undefined - } - }; + headers = undefined, + body = undefined}}; {{value, NextRequest}, Pipeline} -> case lists:member(NextRequest#request.id, State#state.canceled) of true -> + ?hcrv("next request had been cancelled", []), %% See comment for handle_cast({cancel, RequestId}) {stop, normal, State#state{request = NextRequest#request{from = answer_sent}}}; false -> + ?hcrv("next request", [{request, NextRequest}]), NewSession = Session#tcp_session{queue_length = %% Queue + current @@ -1051,21 +1223,19 @@ handle_pipeline(State = httpc_manager:insert_session(NewSession, ProfileName), Relaxed = (NextRequest#request.settings)#http_options.relaxed, + MFA = {httpc_response, + parse, + [State#state.max_header_size, Relaxed]}, NewState = - State#state{pipeline = Pipeline, - request = NextRequest, - mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, + State#state{pipeline = Pipeline, + request = NextRequest, + mfa = MFA, status_line = undefined, - headers = undefined, - body = undefined}, + headers = undefined, + body = undefined}, case Data of <<>> -> - http_transport:setopts( - socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + activate_once(Session), {noreply, NewState}; _ -> %% If we already received some bytes of @@ -1076,22 +1246,25 @@ handle_pipeline(State = end end. -handle_keep_alive_queue(State = #state{status = keep_alive, - session = Session, - profile_name = ProfileName, - options = #options{keep_alive_timeout - = TimeOut} - }, - Data) -> +handle_keep_alive_queue( + #state{status = keep_alive, + session = Session, + profile_name = ProfileName, + options = #options{keep_alive_timeout = TimeOut}} = State, + Data) -> + + ?hcrd("handle keep_alive", [{profile, ProfileName}, + {session, Session}, + {timeout, TimeOut}]), + case queue:out(State#state.keep_alive) of {empty, _} -> + ?hcrd("empty keep_alive queue", []), %% The server may choose too terminate an idle keep_alive session %% in this case we want to receive the close message %% at once and not when trying to send the next %% request. - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + activate_once(Session), %% If a keep_alive session has been idle for some time is not %% closed by the server, the client may want to close it. NewState = activate_queue_timeout(TimeOut, State), @@ -1111,25 +1284,25 @@ handle_keep_alive_queue(State = #state{status = keep_alive, case lists:member(NextRequest#request.id, State#state.canceled) of true -> - handle_keep_alive_queue(State#state{keep_alive = - KeepAlive}, Data); + ?hcrv("next request has already been canceled", []), + handle_keep_alive_queue( + State#state{keep_alive = KeepAlive}, Data); false -> + ?hcrv("next request", [{request, NextRequest}]), Relaxed = (NextRequest#request.settings)#http_options.relaxed, + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, NewState = - State#state{request = NextRequest, - keep_alive = KeepAlive, - mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, + State#state{request = NextRequest, + keep_alive = KeepAlive, + mfa = MFA, status_line = undefined, - headers = undefined, - body = undefined}, + headers = undefined, + body = undefined}, case Data of <<>> -> - http_transport:setopts( - socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, [{active, once}]), + activate_once(Session), {noreply, NewState}; _ -> %% If we already received some bytes of @@ -1140,11 +1313,6 @@ handle_keep_alive_queue(State = #state{status = keep_alive, end end. -call(Msg, Pid, Timeout) -> - gen_server:call(Pid, Msg, Timeout). - -cast(Msg, Pid) -> - gen_server:cast(Pid, Msg). case_insensitive_header(Str) when is_list(Str) -> http_util:to_lower(Str); @@ -1152,20 +1320,34 @@ case_insensitive_header(Str) when is_list(Str) -> case_insensitive_header(Str) -> Str. -activate_request_timeout(State = #state{request = Request}) -> - Time = (Request#request.settings)#http_options.timeout, - case Time of +activate_once(#tcp_session{scheme = Scheme, socket = Socket}) -> + SocketType = socket_type(Scheme), + http_transport:setopts(SocketType, Socket, [{active, once}]). + +activate_request_timeout( + #state{request = #request{timer = undefined} = Request} = State) -> + Timeout = (Request#request.settings)#http_options.timeout, + case Timeout of infinity -> State; _ -> - Ref = erlang:send_after(Time, self(), - {timeout, Request#request.id}), - State#state - {timers = - #timers{request_timers = - [{Request#request.id, Ref}| - (State#state.timers)#timers.request_timers]}} - end. + ReqId = Request#request.id, + ?hcrt("activate request timer", + [{request_id, ReqId}, + {time_consumed, t() - Request#request.started}, + {timeout, Timeout}]), + Msg = {timeout, ReqId}, + Ref = erlang:send_after(Timeout, self(), Msg), + Request2 = Request#request{timer = Ref}, + ReqTimers = [{Request#request.id, Ref} | + (State#state.timers)#timers.request_timers], + Timers = #timers{request_timers = ReqTimers}, + State#state{request = Request2, timers = Timers} + end; + +%% Timer is already running! This is the case for a redirect or retry +activate_request_timeout(State) -> + State. activate_queue_timeout(infinity, State) -> State; @@ -1187,18 +1369,21 @@ is_keep_alive_enabled_server("HTTP/1.0", is_keep_alive_enabled_server(_,_) -> false. -is_keep_alive_connection(Headers, Session) -> - (not ((Session#tcp_session.client_close) or - httpc_response:is_server_closing(Headers))). - -try_to_enable_pipeline_or_keep_alive(State = - #state{session = Session, - request = #request{method = Method}, - status_line = {Version, _, _}, - headers = Headers, - profile_name = ProfileName}) -> - case (is_keep_alive_enabled_server(Version, Headers) andalso - is_keep_alive_connection(Headers, Session)) of +is_keep_alive_connection(Headers, #tcp_session{client_close = ClientClose}) -> + (not ((ClientClose) orelse httpc_response:is_server_closing(Headers))). + +try_to_enable_pipeline_or_keep_alive( + #state{session = Session, + request = #request{method = Method}, + status_line = {Version, _, _}, + headers = Headers, + profile_name = ProfileName} = State) -> + ?hcrd("try to enable pipeline or keep-alive", + [{version, Version}, + {headers, Headers}, + {session, Session}]), + case is_keep_alive_enabled_server(Version, Headers) andalso + is_keep_alive_connection(Headers, Session) of true -> case (is_pipeline_enabled_client(Session) andalso httpc_request:is_idempotent(Method)) of @@ -1209,15 +1394,16 @@ try_to_enable_pipeline_or_keep_alive(State = httpc_manager:insert_session(Session, ProfileName), %% Make sure type is keep_alive in session %% as it in this case might be pipeline - State#state{status = keep_alive, - session = - Session#tcp_session{type = keep_alive}} + NewSession = Session#tcp_session{type = keep_alive}, + State#state{status = keep_alive, + session = NewSession} end; false -> State#state{status = close} end. -answer_request(Request, Msg, #state{timers = Timers} = State) -> +answer_request(Request, Msg, #state{timers = Timers} = State) -> + ?hcrt("answer request", [{request, Request}]), httpc_response:send(Request#request.from, Msg), RequestTimers = Timers#timers.request_timers, TimerRef = @@ -1253,14 +1439,14 @@ retry_pipeline([Request | PipeLine], case (catch httpc_manager:retry_request(Request, ProfileName)) of ok -> RequestTimers = Timers#timers.request_timers, + ReqId = Request#request.id, TimerRef = - proplists:get_value(Request#request.id, RequestTimers, - undefined), - cancel_timer(TimerRef, {timeout, Request#request.id}), - State#state{timers = Timers#timers{request_timers = - lists:delete({Request#request.id, - TimerRef}, - RequestTimers)}}; + proplists:get_value(ReqId, RequestTimers, undefined), + cancel_timer(TimerRef, {timeout, ReqId}), + NewReqsTimers = lists:delete({ReqId, TimerRef}, RequestTimers), + NewTimers = Timers#timers{request_timers = NewReqsTimers}, + State#state{timers = NewTimers}; + Error -> answer_request(Request#request.from, httpc_response:error(Request, Error), State) @@ -1345,12 +1531,14 @@ socket_type(#request{scheme = https, settings = Settings}) -> socket_type(http) -> ip_comm; socket_type(https) -> - {ssl, []}. %% Dummy value ok for ex setops that does not use this value + {ssl, []}. %% Dummy value ok for ex setopts that does not use this value -start_stream({_Version, _Code, _ReasonPhrase}, _Headers, #request{stream = none} = Request) -> +start_stream({_Version, _Code, _ReasonPhrase}, _Headers, + #request{stream = none} = Request) -> ?hcrt("start stream - none", []), {ok, Request}; -start_stream({_Version, Code, _ReasonPhrase}, Headers, #request{stream = self} = Request) +start_stream({_Version, Code, _ReasonPhrase}, Headers, + #request{stream = self} = Request) when (Code =:= 200) orelse (Code =:= 206) -> ?hcrt("start stream - self", [{code, Code}]), Msg = httpc_response:stream_start(Headers, Request, ignore), @@ -1363,7 +1551,8 @@ start_stream({_Version, Code, _ReasonPhrase}, Headers, Msg = httpc_response:stream_start(Headers, Request, self()), httpc_response:send(Request#request.from, Msg), {ok, Request}; -start_stream({_Version, Code, _ReasonPhrase}, _Headers, #request{stream = Filename} = Request) +start_stream({_Version, Code, _ReasonPhrase}, _Headers, + #request{stream = Filename} = Request) when ((Code =:= 200) orelse (Code =:= 206)) andalso is_list(Filename) -> ?hcrt("start stream", [{code, Code}, {filename, Filename}]), case file:open(Filename, [write, raw, append, delayed_write]) of @@ -1436,6 +1625,7 @@ handle_verbose(trace) -> handle_verbose(_) -> ok. + %%% Normaly I do not comment out code, I throw it away. But this might %%% actually be used one day if ssl is improved. %% send_ssl_tunnel_request(Address, Request = #request{address = {Host, Port}}, @@ -1497,3 +1687,21 @@ handle_verbose(_) -> %% d(_, _, _) -> %% ok. + +call(Msg, Pid) -> + Timeout = infinity, + call(Msg, Pid, Timeout). +call(Msg, Pid, Timeout) -> + gen_server:call(Pid, Msg, Timeout). + +cast(Msg, Pid) -> + gen_server:cast(Pid, Msg). + + +%% to(To, Start) when is_integer(Start) andalso (Start >= 0) -> +%% http_util:timeout(To, Start); +%% to(To, _Start) -> +%% http_util:timeout(To, t()). + +t() -> + http_util:timestamp(). |