diff options
author | Micael Karlberg <[email protected]> | 2010-01-13 16:18:24 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2010-01-13 16:18:24 +0000 |
commit | 6153ba7599f2ce1ab22959a40b6ca33b4238f0d0 (patch) | |
tree | a81d50b08c7828d3662dd50e48bcf55b72f507b2 /lib/inets/src/http_client/httpc_handler.erl | |
parent | 68c2f188c3446f53fad03d0f652207a9a8bb1946 (diff) | |
download | otp-6153ba7599f2ce1ab22959a40b6ca33b4238f0d0.tar.gz otp-6153ba7599f2ce1ab22959a40b6ca33b4238f0d0.tar.bz2 otp-6153ba7599f2ce1ab22959a40b6ca33b4238f0d0.zip |
OTP-8016, OTP-8056, OTP-8103, OTP-8106, OTP-8312, OTP-8315, OTP-8327, OTP-8349,
OTP-8351, OTP-8359 & OTP-8371.
Diffstat (limited to 'lib/inets/src/http_client/httpc_handler.erl')
-rw-r--r-- | lib/inets/src/http_client/httpc_handler.erl | 767 |
1 files changed, 439 insertions, 328 deletions
diff --git a/lib/inets/src/http_client/httpc_handler.erl b/lib/inets/src/http_client/httpc_handler.erl index 7b737c2f86..25f9b0777f 100644 --- a/lib/inets/src/http_client/httpc_handler.erl +++ b/lib/inets/src/http_client/httpc_handler.erl @@ -1,19 +1,19 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 2002-2009. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 2002-2010. All Rights Reserved. +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% %% @@ -28,7 +28,8 @@ %%-------------------------------------------------------------------- %% Internal Application API --export([start_link/3, send/2, cancel/2, stream/3, stream_next/1]). +-export([start_link/2, connect_and_send/2, + send/2, cancel/2, stream/3, stream_next/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -50,7 +51,7 @@ mfa, % {Moduel, Function, Args} pipeline = queue:new(), % queue() keep_alive = queue:new(), % queue() - status = new, % new | pipeline | keep_alive | close | ssl_tunnel + status, % undefined | new | pipeline | keep_alive | close | ssl_tunnel canceled = [], % [RequestId] max_header_size = nolimit, % nolimit | integer() max_body_size = nolimit, % nolimit | integer() @@ -85,9 +86,13 @@ %% the reply or part of it has arrived.) %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- -start_link(Request, Options, ProfileName) -> - {ok, proc_lib:spawn_link(?MODULE, init, [[Request, Options, - ProfileName]])}. + +start_link(Options, ProfileName) -> + Args = [Options, ProfileName], + gen_server:start_link(?MODULE, Args, []). + +connect_and_send(Request, HandlerPid) -> + call({connect_and_send, Request}, HandlerPid). %%-------------------------------------------------------------------- @@ -192,10 +197,9 @@ stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed %%==================================================================== %%-------------------------------------------------------------------- -%% Function: init([Request, Options, ProfileName]) -> {ok, State} | -%% {ok, State, Timeout} | ignore |{stop, Reason} +%% Function: init([Options, ProfileName]) -> {ok, State} | +%% {ok, State, Timeout} | ignore | {stop, Reason} %% -%% Request = #request{} %% Options = #options{} %% ProfileName = atom() - id of httpc manager process %% @@ -206,30 +210,16 @@ stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed %% but we do not want that so errors will be handled by the process %% sending an init_error message to itself. %%-------------------------------------------------------------------- -init([Request, Options, ProfileName]) -> +init([Options, ProfileName]) -> + ?hcrv("init - starting", [{options, Options}, {profile, ProfileName}]), process_flag(trap_exit, true), - handle_verbose(Options#options.verbose), - Address = handle_proxy(Request#request.address, Options#options.proxy), - {ok, State} = - case {Address /= Request#request.address, Request#request.scheme} of - {true, https} -> - Error = https_through_proxy_is_not_currently_supported, - self() ! {init_error, - Error, httpc_response:error(Request, Error)}, - {ok, #state{request = Request, options = Options, - status = ssl_tunnel}}; - %% This is what we should do if and when ssl supports - %% "socket upgrading" - %%send_ssl_tunnel_request(Address, Request, - %% #state{options = Options, - %% status = ssl_tunnel}); - {_, _} -> - send_first_request(Address, Request, - #state{options = Options, - profile_name = ProfileName}) - end, - gen_server:enter_loop(?MODULE, [], State). + State = #state{status = undefined, + options = Options, + profile_name = ProfileName}, + ?hcrd("init - started", []), + {ok, State}. + %%-------------------------------------------------------------------- %% Function: handle_call(Request, From, State) -> {reply, Reply, State} | @@ -240,39 +230,85 @@ init([Request, Options, ProfileName]) -> %% {stop, Reason, State} (terminate/2 is called) %% Description: Handling call messages %%-------------------------------------------------------------------- -handle_call(Request, _, State = #state{session = Session = - #tcp_session{socket = Socket, - type = pipeline}, - timers = Timers, - options = Options, - profile_name = ProfileName}) -> + + +%% This is the first request, the reason the proc was started +handle_call({connect_and_send, #request{address = Address0, + scheme = Scheme} = Request}, + _From, + #state{options = #options{proxy = Proxy}, + status = undefined, + session = undefined} = State) -> + ?hcrv("connect and send", [{address0, Address0}, {proxy, Proxy}]), + Address = handle_proxy(Address0, Proxy), + if + ((Address =/= Address0) andalso (Scheme =:= https)) -> + %% This is what we should do if and when ssl supports + %% "socket upgrading" + %%send_ssl_tunnel_request(Address, Request, + %% #state{options = Options, + %% status = ssl_tunnel}); + Reason = https_through_proxy_is_not_currently_supported, + Error = {error, Reason}, + {stop, Error, Error, State}; + true -> + case connect_and_send_first_request(Address, Request, State) of + {ok, NewState} -> + {reply, ok, NewState}; + {stop, Error, NewState} -> + {stop, Error, Error, NewState} + end + end; + +handle_call(Request, _, + #state{status = Status, + session = #tcp_session{socket = Socket, + type = pipeline} = Session, + timers = Timers, + options = Options, + profile_name = ProfileName} = State) + when Status =/= undefined -> + + ?hcrv("new request", [{request, Request}, + {profile, ProfileName}, + {status, Status}, + {session_type, pipeline}, + {timers, Timers}]), + Address = handle_proxy(Request#request.address, Options#options.proxy), case httpc_request:send(Address, Request, Socket) of ok -> + + ?hcrd("request sent", []), + %% Activate the request time out for the new request - NewState = activate_request_timeout(State#state{request = - Request}), + NewState = + activate_request_timeout(State#state{request = Request}), + + ClientClose = + httpc_request:is_client_closing(Request#request.headers), - ClientClose = httpc_request:is_client_closing( - Request#request.headers), case State#state.request of - #request{} -> %% Old request no yet finished + #request{} -> %% Old request not yet finished + ?hcrd("old request still not finished", []), %% Make sure to use the new value of timers in state - NewTimers = NewState#state.timers, + NewTimers = NewState#state.timers, NewPipeline = queue:in(Request, State#state.pipeline), - NewSession = + NewSession = Session#tcp_session{queue_length = %% Queue + current queue:len(NewPipeline) + 1, client_close = ClientClose}, httpc_manager:insert_session(NewSession, ProfileName), + ?hcrd("session updated", []), {reply, ok, State#state{pipeline = NewPipeline, - session = NewSession, - timers = NewTimers}}; + session = NewSession, + timers = NewTimers}}; undefined -> - %% Note: tcp-message reciving has already been + %% Note: tcp-message receiving has already been %% activated by handle_pipeline/2. + ?hcrd("no current request", []), cancel_timer(Timers#timers.queue_timer, timeout_queue), NewSession = @@ -281,54 +317,67 @@ handle_call(Request, _, State = #state{session = Session = httpc_manager:insert_session(NewSession, ProfileName), Relaxed = (Request#request.settings)#http_options.relaxed, - {reply, ok, - NewState#state{request = Request, - session = NewSession, - mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, - timers = - Timers#timers{queue_timer = - undefined}}} + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, + NewTimers = Timers#timers{queue_timer = undefined}, + ?hcrd("session created", []), + {reply, ok, NewState#state{request = Request, + session = NewSession, + mfa = MFA, + timers = NewTimers}} end; {error, Reason} -> + ?hcri("failed sending request", [{reason, Reason}]), {reply, {pipeline_failed, Reason}, State} end; -handle_call(Request, _, #state{session = Session = - #tcp_session{type = keep_alive, - socket = Socket}, - timers = Timers, - options = Options, - profile_name = ProfileName} = State) -> - - ClientClose = httpc_request:is_client_closing(Request#request.headers), +handle_call(Request, _, + #state{status = Status, + session = #tcp_session{socket = Socket, + type = keep_alive} = Session, + timers = Timers, + options = Options, + profile_name = ProfileName} = State) + when Status =/= undefined -> - Address = handle_proxy(Request#request.address, - Options#options.proxy), + ?hcrv("new request", [{request, Request}, + {profile, ProfileName}, + {status, Status}, + {session_type, keep_alive}]), + + Address = handle_proxy(Request#request.address, Options#options.proxy), case httpc_request:send(Address, Request, Socket) of ok -> + + ?hcrd("request sent", []), + + %% Activate the request time out for the new request NewState = - activate_request_timeout(State#state{request = - Request}), + activate_request_timeout(State#state{request = Request}), + + ClientClose = + httpc_request:is_client_closing(Request#request.headers), case State#state.request of #request{} -> %% Old request not yet finished %% Make sure to use the new value of timers in state - NewTimers = NewState#state.timers, + ?hcrd("old request still not finished", []), + NewTimers = NewState#state.timers, NewKeepAlive = queue:in(Request, State#state.keep_alive), - NewSession = + NewSession = Session#tcp_session{queue_length = %% Queue + current queue:len(NewKeepAlive) + 1, client_close = ClientClose}, httpc_manager:insert_session(NewSession, ProfileName), + ?hcrd("session updated", []), {reply, ok, State#state{keep_alive = NewKeepAlive, - session = NewSession, - timers = NewTimers}}; + session = NewSession, + timers = NewTimers}}; undefined -> %% Note: tcp-message reciving has already been %% activated by handle_pipeline/2. + ?hcrd("no current request", []), cancel_timer(Timers#timers.queue_timer, timeout_queue), NewSession = @@ -337,17 +386,19 @@ handle_call(Request, _, #state{session = Session = httpc_manager:insert_session(NewSession, ProfileName), Relaxed = (Request#request.settings)#http_options.relaxed, - {reply, ok, - NewState#state{request = Request, - session = NewSession, - mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}}} + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, + {reply, ok, NewState#state{request = Request, + session = NewSession, + mfa = MFA}} end; - {error, Reason} -> + + {error, Reason} -> + ?hcri("failed sending request", [{reason, Reason}]), {reply, {request_failed, Reason}, State} end. + %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | %% {noreply, State, Timeout} | @@ -367,16 +418,28 @@ handle_call(Request, _, #state{session = Session = %% handle_keep_alive_queue/2 on the other hand will just skip the %% request as if it was never issued as in this case the request will %% not have been sent. -handle_cast({cancel, RequestId}, State = #state{request = Request = - #request{id = RequestId}, - profile_name = ProfileName}) -> +handle_cast({cancel, RequestId}, + #state{request = #request{id = RequestId} = Request, + profile_name = ProfileName, + canceled = Canceled} = State) -> + ?hcrv("cancel current request", [{request_id, RequestId}, + {profile, ProfileName}, + {canceled, Canceled}]), httpc_manager:request_canceled(RequestId, ProfileName), + ?hcrv("canceled", []), {stop, normal, - State#state{canceled = [RequestId | State#state.canceled], - request = Request#request{from = answer_sent}}}; -handle_cast({cancel, RequestId}, State = #state{profile_name = ProfileName}) -> + State#state{canceled = [RequestId | Canceled], + request = Request#request{from = answer_sent}}}; +handle_cast({cancel, RequestId}, + #state{profile_name = ProfileName, + canceled = Canceled} = State) -> + ?hcrv("cancel", [{request_id, RequestId}, + {profile, ProfileName}, + {canceled, Canceled}]), httpc_manager:request_canceled(RequestId, ProfileName), - {noreply, State#state{canceled = [RequestId | State#state.canceled]}}; + ?hcrv("canceled", []), + {noreply, State#state{canceled = [RequestId | Canceled]}}; + handle_cast(stream_next, #state{session = Session} = State) -> http_transport:setopts(socket_type(Session#tcp_session.scheme), Session#tcp_session.socket, [{active, once}]), @@ -399,7 +462,13 @@ handle_info({Proto, _Socket, Data}, (Proto =:= ssl) orelse (Proto =:= httpc_handler) -> - ?hcri("received data", [{proto, Proto}, {data, Data}, {mfa, MFA}, {method, Method}, {stream, Stream}, {session, Session}, {status_line, StatusLine}]), + ?hcri("received data", [{proto, Proto}, + {data, Data}, + {mfa, MFA}, + {method, Method}, + {stream, Stream}, + {session, Session}, + {status_line, StatusLine}]), FinalResult = try Module:Function([Data | Args]) of @@ -410,22 +479,23 @@ handle_info({Proto, _Socket, Data}, ?hcrd("data processed - whole body", []), handle_response(State#state{body = <<>>}); {Module, whole_body, [Body, Length]} -> - ?hcrd("data processed - whole body", [{module, Module}, {body, Body}, {length, Length}]), + ?hcrd("data processed - whole body", + [{module, Module}, {body, Body}, {length, Length}]), {_, Code, _} = StatusLine, {NewBody, NewRequest} = stream(Body, Request, Code), %% When we stream we will not keep the already %% streamed data, that would be a waste of memory. - NewLength = case Stream of - none -> - Length; - _ -> - Length - size(Body) - end, + NewLength = + case Stream of + none -> + Length; + _ -> + Length - size(Body) + end, NewState = next_body_chunk(State), - - {noreply, NewState#state{mfa = {Module, whole_body, - [NewBody, NewLength]}, + NewMFA = {Module, whole_body, [NewBody, NewLength]}, + {noreply, NewState#state{mfa = NewMFA, request = NewRequest}}; NewMFA -> ?hcrd("data processed", [{new_mfa, NewMFA}]), @@ -435,16 +505,14 @@ handle_info({Proto, _Socket, Data}, {noreply, State#state{mfa = NewMFA}} catch exit:_ -> - ClientErrMsg = httpc_response:error(Request, - {could_not_parse_as_http, - Data}), - NewState = answer_request(Request, ClientErrMsg, State), + ClientReason = {could_not_parse_as_http, Data}, + ClientErrMsg = httpc_response:error(Request, ClientReason), + NewState = answer_request(Request, ClientErrMsg, State), {stop, normal, NewState}; - error:_ -> - ClientErrMsg = httpc_response:error(Request, - {could_not_parse_as_http, - Data}), - NewState = answer_request(Request, ClientErrMsg, State), + error:_ -> + ClientReason = {could_not_parse_as_http, Data}, + ClientErrMsg = httpc_response:error(Request, ClientReason), + NewState = answer_request(Request, ClientErrMsg, State), {stop, normal, NewState} end, @@ -453,10 +521,10 @@ handle_info({Proto, _Socket, Data}, handle_info({Proto, Socket, Data}, - #state{mfa = MFA, - request = Request, - session = Session, - status = Status, + #state{mfa = MFA, + request = Request, + session = Session, + status = Status, status_line = StatusLine, profile_name = Profile} = State) when (Proto =:= tcp) orelse @@ -474,6 +542,7 @@ handle_info({Proto, Socket, Data}, "~n", [Proto, Socket, Data, MFA, Request, Session, Status, StatusLine, Profile]), + {noreply, State}; @@ -513,28 +582,35 @@ handle_info({ssl_error, _, _} = Reason, State) -> handle_info({timeout, RequestId}, #state{request = #request{id = RequestId} = Request, canceled = Canceled} = State) -> + ?hcri("timeout of current request", [{id, RequestId}]), httpc_response:send(Request#request.from, - httpc_response:error(Request,timeout)), + httpc_response:error(Request, timeout)), + ?hcrv("response (timeout) sent - now terminate", []), {stop, normal, State#state{request = Request#request{from = answer_sent}, canceled = [RequestId | Canceled]}}; handle_info({timeout, RequestId}, #state{canceled = Canceled} = State) -> + ?hcri("timeout", [{id, RequestId}]), Filter = fun(#request{id = Id, from = From} = Request) when Id =:= RequestId -> + ?hcrv("found request", [{id, Id}, {from, From}]), %% Notify the owner Response = httpc_response:error(Request, timeout), httpc_response:send(From, Response), + ?hcrv("response (timeout) sent", []), [Request#request{from = answer_sent}]; (_) -> true end, case State#state.status of pipeline -> + ?hcrd("pipeline", []), Pipeline = queue:filter(Filter, State#state.pipeline), {noreply, State#state{canceled = [RequestId | Canceled], pipeline = Pipeline}}; keep_alive -> + ?hcrd("keep_alive", []), KeepAlive = queue:filter(Filter, State#state.keep_alive), {noreply, State#state{canceled = [RequestId | Canceled], keep_alive = KeepAlive}} @@ -577,9 +653,10 @@ terminate(normal, #state{session = undefined}) -> %% Init error sending, no session information has been setup but %% there is a socket that needs closing. -terminate(normal, #state{request = Request, - session = #tcp_session{id = undefined, - socket = Socket}}) -> +terminate(normal, + #state{request = Request, + session = #tcp_session{id = undefined, + socket = Socket}}) -> http_transport:close(socket_type(Request), Socket); %% Socket closed remotely @@ -605,23 +682,28 @@ terminate(normal, %% And, just in case, close our side (**really** overkill) http_transport:close(socket_type(Request), Socket); -terminate(_, State = #state{session = Session, - request = undefined, - profile_name = ProfileName, - timers = Timers, - pipeline = Pipeline, - keep_alive = KeepAlive}) -> - catch httpc_manager:delete_session(Session#tcp_session.id, - ProfileName), +terminate(_, #state{session = #tcp_session{id = Id, + socket = Socket, + scheme = Scheme}, + request = undefined, + profile_name = ProfileName, + timers = Timers, + pipeline = Pipeline, + keep_alive = KeepAlive} = State) -> + (catch httpc_manager:delete_session(Id, ProfileName)), maybe_retry_queue(Pipeline, State), maybe_retry_queue(KeepAlive, State), cancel_timer(Timers#timers.queue_timer, timeout_queue), - Socket = Session#tcp_session.socket, - http_transport:close(socket_type(Session#tcp_session.scheme), Socket); + http_transport:close(socket_type(Scheme), Socket); -terminate(Reason, State = #state{request = Request}) -> +terminate(Reason, #state{request = undefined}) -> + ?hcrt("terminate", [{reason, Reason}]), + ok; + +terminate(Reason, #state{request = Request} = State) -> + ?hcrd("terminate", [{reason, Reason}, {request, Request}]), NewState = maybe_send_answer(Request, httpc_response:error(Request, Reason), State), @@ -641,13 +723,16 @@ maybe_send_answer(Request, Answer, State) -> answer_request(Request, Answer, State). deliver_answers([]) -> + ?hcrd("deliver answer done", []), ok; -deliver_answers([#request{from = From} = Request | Requests]) +deliver_answers([#request{id = Id, from = From} = Request | Requests]) when is_pid(From) -> Response = httpc_response:error(Request, socket_closed_remotely), + ?hcrd("deliver answer", [{id, Id}, {from, From}, {response, Response}]), httpc_response:send(From, Response), deliver_answers(Requests); -deliver_answers([_|Requests]) -> +deliver_answers([Request|Requests]) -> + ?hcrd("skip deliver answer", [{request, Request}]), deliver_answers(Requests). @@ -728,77 +813,58 @@ connect(SocketType, ToAddress, #options{ipfamily = IpFamily, http_transport:connect(SocketType, ToAddress, Opts3, Timeout) end. - -send_first_request(Address, Request, #state{options = Options} = State) -> - SocketType = socket_type(Request), - ConnTimeout = (Request#request.settings)#http_options.connect_timeout, - ?hcri("connect", +connect_and_send_first_request(Address, + #request{settings = Settings, + headers = Headers, + address = OrigAddress, + scheme = Scheme} = Request, + #state{options = Options} = State) -> + + ?hcrd("connect", [{address, Address}, {request, Request}, {options, Options}]), + + SocketType = socket_type(Request), + ConnTimeout = Settings#http_options.connect_timeout, case connect(SocketType, Address, Options, ConnTimeout) of {ok, Socket} -> - ?hcri("connected - now send first request", [{socket, Socket}]), + ?hcrd("connected - now send first request", [{socket, Socket}]), case httpc_request:send(Address, Request, Socket) of ok -> - ?hcri("first request sent", []), + ?hcrd("first request sent", []), ClientClose = - httpc_request:is_client_closing( - Request#request.headers), + httpc_request:is_client_closing(Headers), SessionType = httpc_manager:session_type(Options), Session = - #tcp_session{id = {Request#request.address, self()}, - scheme = Request#request.scheme, - socket = Socket, + #tcp_session{id = {OrigAddress, self()}, + scheme = Scheme, + socket = Socket, client_close = ClientClose, - type = SessionType}, - TmpState = State#state{request = Request, - session = Session, - mfa = init_mfa(Request, State), - status_line = - init_status_line(Request), - headers = undefined, - body = undefined, - status = new}, - http_transport:setopts(SocketType, - Socket, [{active, once}]), + type = SessionType}, + TmpState = + State#state{request = Request, + session = Session, + mfa = init_mfa(Request, State), + status_line = init_status_line(Request), + headers = undefined, + body = undefined, + status = new}, + ?hcrt("activate socket", []), + activate_once(Session), NewState = activate_request_timeout(TmpState), {ok, NewState}; - {error, Reason} -> - %% Commented out in wait of ssl support to avoid - %% dialyzer warning - %%case State#state.status of - %% new -> % Called from init/1 - self() ! {init_error, error_sending, - httpc_response:error(Request, Reason)}, - {ok, State#state{request = Request, - session = - #tcp_session{socket = Socket}}} - %%ssl_tunnel -> % Not called from init/1 - %% NewState = - %% answer_request(Request, - %%httpc_response:error(Request, - %%Reason), - %% State), - %% {stop, normal, NewState} - %% end + {error, Reason} -> + ?hcrv("failed sending request", [{reason, Reason}]), + Error = {error, {send_failed, + httpc_response:error(Request, Reason)}}, + {stop, Error, State#state{request = Request}} end; - {error, Reason} -> - %% Commented out in wait of ssl support to avoid - %% dialyzer warning - %% case State#state.status of - %% new -> % Called from init/1 - self() ! {init_error, error_connecting, - httpc_response:error(Request, Reason)}, - {ok, State#state{request = Request}} - %% ssl_tunnel -> % Not called from init/1 - %% NewState = - %% answer_request(Request, - %% httpc_response:error(Request, - %% Reason), - %% State), - %% {stop, normal, NewState} - %%end + {error, Reason} -> + ?hcri("connect failed", [{reason, Reason}]), + Error = {error, {connect_failed, + httpc_response:error(Request, Reason)}}, + {stop, Error, State#state{request = Request}} end. handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, @@ -806,23 +872,23 @@ handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, ?hcrt("handle_http_msg", [{body, Body}]), case Headers#http_response_h.'content-type' of "multipart/byteranges" ++ _Param -> - exit(not_yet_implemented); + exit({not_yet_implemented, multypart_nyteranges}); _ -> - StatusLine = {Version, StatusCode, ReasonPharse}, + StatusLine = {Version, StatusCode, ReasonPharse}, {ok, NewRequest} = start_stream(StatusLine, Headers, Request), handle_http_body(Body, State#state{request = NewRequest, status_line = StatusLine, headers = Headers}) end; -handle_http_msg({ChunkedHeaders, Body}, - State = #state{headers = Headers}) -> - ?hcrt("handle_http_msg", [{chunked_headers, ChunkedHeaders}, {body, Body}]), +handle_http_msg({ChunkedHeaders, Body}, #state{headers = Headers} = State) -> + ?hcrt("handle_http_msg", + [{chunked_headers, ChunkedHeaders}, {body, Body}]), NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), handle_response(State#state{headers = NewHeaders, body = Body}); -handle_http_msg(Body, State = #state{status_line = {_,Code, _}}) -> +handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) -> ?hcrt("handle_http_msg", [{body, Body}, {code, Code}]), - {NewBody, NewRequest}= stream(Body, State#state.request, Code), + {NewBody, NewRequest} = stream(Body, State#state.request, Code), handle_response(State#state{body = NewBody, request = NewRequest}). handle_http_body(<<>>, State = #state{status_line = {_,304, _}}) -> @@ -837,11 +903,15 @@ handle_http_body(<<>>, State = #state{request = #request{method = head}}) -> ?hcrt("handle_http_body - head", []), handle_response(State#state{body = <<>>}); -handle_http_body(Body, State = #state{headers = Headers, +handle_http_body(Body, State = #state{headers = Headers, max_body_size = MaxBodySize, - status_line = {_,Code, _}, - request = Request}) -> - ?hcrt("handle_http_body", [{body, Body}, {max_body_size, MaxBodySize}, {code, Code}]), + status_line = {_,Code, _}, + request = Request}) -> + ?hcrt("handle_http_body", + [{headers, Headers}, + {body, Body}, + {max_body_size, MaxBodySize}, + {code, Code}]), TransferEnc = Headers#http_response_h.'transfer-encoding', case case_insensitive_header(TransferEnc) of "chunked" -> @@ -850,12 +920,17 @@ handle_http_body(Body, State = #state{headers = Headers, State#state.max_header_size, {Code, Request}) of {Module, Function, Args} -> - ?hcrt("handle_http_body - new mfa", [{module, Module}, {function, Function}, {args, Args}]), + ?hcrt("handle_http_body - new mfa", + [{module, Module}, + {function, Function}, + {args, Args}]), NewState = next_body_chunk(State), {noreply, NewState#state{mfa = {Module, Function, Args}}}; {ok, {ChunkedHeaders, NewBody}} -> - ?hcrt("handle_http_body - nyew body", [{chunked_headers, ChunkedHeaders}, {new_body, NewBody}]), + ?hcrt("handle_http_body - new body", + [{chunked_headers, ChunkedHeaders}, + {new_body, NewBody}]), NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), handle_response(State#state{headers = NewHeaders, @@ -872,12 +947,13 @@ handle_http_body(Body, State = #state{headers = Headers, ?hcrt("handle_http_body - other", []), Length = list_to_integer(Headers#http_response_h.'content-length'), - case ((Length =< MaxBodySize) or (MaxBodySize == nolimit)) of + case ((Length =< MaxBodySize) orelse (MaxBodySize =:= nolimit)) of true -> case httpc_response:whole_body(Body, Length) of {ok, Body} -> - {NewBody, NewRequest}= stream(Body, Request, Code), - handle_response(State#state{body = NewBody, + {NewBody, NewRequest} = + stream(Body, Request, Code), + handle_response(State#state{body = NewBody, request = NewRequest}); MFA -> NewState = next_body_chunk(State), @@ -893,42 +969,31 @@ handle_http_body(Body, State = #state{headers = Headers, end end. -%%% Normaly I do not comment out code, I throw it away. But this might -%%% actually be used on day if ssl is improved. -%% handle_response(State = #state{status = ssl_tunnel, -%% request = Request, -%% options = Options, -%% session = #tcp_session{socket = Socket, -%% scheme = Scheme}, -%% status_line = {_, 200, _}}) -> -%% %%% Insert code for upgrading the socket if and when ssl supports this. -%% Address = handle_proxy(Request#request.address, Options#options.proxy), -%% send_first_request(Address, Request, State); -%% handle_response(State = #state{status = ssl_tunnel, -%% request = Request}) -> -%% NewState = answer_request(Request, -%% httpc_response:error(Request, -%% ssl_proxy_tunnel_failed), -%% State), -%% {stop, normal, NewState}; - -handle_response(State = #state{status = new}) -> - handle_response(try_to_enable_pipeline_or_keep_alive(State)); - -handle_response(State = - #state{request = Request, +handle_response(#state{status = new} = State) -> + ?hcrd("handle response - status = new", []), + handle_response(try_to_enable_pipeline_or_keep_alive(State)); + +handle_response(#state{request = Request, status = Status, session = Session, status_line = StatusLine, headers = Headers, body = Body, options = Options, - profile_name = ProfileName}) when Status =/= new -> - ?hcrt("handle response", [{status, Status}, {session, Session}, {status_line, StatusLine}, {profile_name, ProfileName}]), + profile_name = ProfileName} = State) + when Status =/= new -> + + ?hcrd("handle response", [{profile, ProfileName}, + {status, Status}, + {request, Request}, + {session, Session}, + {status_line, StatusLine}]), + handle_cookies(Headers, Request, Options, ProfileName), case httpc_response:result({StatusLine, Headers, Body}, Request) of %% 100-continue continue -> + ?hcrd("handle response - continue", []), %% Send request body {_, RequestBody} = Request#request.content, http_transport:send(socket_type(Session#tcp_session.scheme), @@ -939,44 +1004,46 @@ handle_response(State = Session#tcp_session.socket, [{active, once}]), Relaxed = (Request#request.settings)#http_options.relaxed, - {noreply, - State#state{mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, - status_line = undefined, - headers = undefined, - body = undefined - }}; + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, + {noreply, State#state{mfa = MFA, + status_line = undefined, + headers = undefined, + body = undefined}}; + %% Ignore unexpected 100-continue response and receive the %% actual response that the server will send right away. {ignore, Data} -> + ?hcrd("handle response - ignore", [{data, Data}]), Relaxed = (Request#request.settings)#http_options.relaxed, - NewState = State#state{mfa = - {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, + NewState = State#state{mfa = MFA, status_line = undefined, - headers = undefined, - body = undefined}, + headers = undefined, + body = undefined}, handle_info({httpc_handler, dummy, Data}, NewState); + %% On a redirect or retry the current request becomes %% obsolete and the manager will create a new request %% with the same id as the current. {redirect, NewRequest, Data} -> - ?hcrt("handle response - redirect", [{new_request, NewRequest}, {data, Data}]), + ?hcrt("handle response - redirect", + [{new_request, NewRequest}, {data, Data}]), ok = httpc_manager:redirect_request(NewRequest, ProfileName), handle_queue(State#state{request = undefined}, Data); {retry, TimeNewRequest, Data} -> - ?hcrt("handle response - retry", [{time_new_request, TimeNewRequest}, {data, Data}]), + ?hcrt("handle response - retry", + [{time_new_request, TimeNewRequest}, {data, Data}]), ok = httpc_manager:retry_request(TimeNewRequest, ProfileName), handle_queue(State#state{request = undefined}, Data); {ok, Msg, Data} -> - ?hcrt("handle response - result ok", [{msg, Msg}, {data, Data}]), + ?hcrd("handle response - ok", [{msg, Msg}, {data, Data}]), end_stream(StatusLine, Request), NewState = answer_request(Request, Msg, State), handle_queue(NewState, Data); {stop, Msg} -> - ?hcrt("handle response - result stop", [{msg, Msg}]), + ?hcrd("handle response - stop", [{msg, Msg}]), end_stream(StatusLine, Request), NewState = answer_request(Request, Msg, State), {stop, normal, NewState} @@ -990,60 +1057,67 @@ handle_cookies(_,_, #options{cookies = verify}, _) -> ok; handle_cookies(Headers, Request, #options{cookies = enabled}, ProfileName) -> {Host, _ } = Request#request.address, - Cookies = http_cookie:cookies(Headers#http_response_h.other, + Cookies = httpc_cookie:cookies(Headers#http_response_h.other, Request#request.path, Host), httpc_manager:store_cookies(Cookies, Request#request.address, ProfileName). %% This request could not be pipelined or used as sequential keept alive %% queue -handle_queue(State = #state{status = close}, _) -> +handle_queue(#state{status = close} = State, _) -> {stop, normal, State}; -handle_queue(State = #state{status = keep_alive}, Data) -> +handle_queue(#state{status = keep_alive} = State, Data) -> handle_keep_alive_queue(State, Data); -handle_queue(State = #state{status = pipeline}, Data) -> +handle_queue(#state{status = pipeline} = State, Data) -> handle_pipeline(State, Data). -handle_pipeline(State = - #state{status = pipeline, session = Session, +handle_pipeline(#state{status = pipeline, + session = Session, profile_name = ProfileName, - options = #options{pipeline_timeout = TimeOut}}, - Data) -> + options = #options{pipeline_timeout = TimeOut}} = + State, + Data) -> + + ?hcrd("handle pipeline", [{profile, ProfileName}, + {session, Session}, + {timeout, TimeOut}]), + case queue:out(State#state.pipeline) of {empty, _} -> + ?hcrd("epmty pipeline queue", []), + %% The server may choose too teminate an idle pipeline %% in this case we want to receive the close message %% at once and not when trying to pipeline the next %% request. - http_transport:setopts(socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, - [{active, once}]), + activate_once(Session), + %% If a pipeline that has been idle for some time is not %% closed by the server, the client may want to close it. - NewState = activate_queue_timeout(TimeOut, State), + NewState = activate_queue_timeout(TimeOut, State), NewSession = Session#tcp_session{queue_length = 0}, httpc_manager:insert_session(NewSession, ProfileName), %% Note mfa will be initilized when a new request %% arrives. {noreply, - NewState#state{request = undefined, - mfa = undefined, + NewState#state{request = undefined, + mfa = undefined, status_line = undefined, - headers = undefined, - body = undefined - } - }; + headers = undefined, + body = undefined}}; {{value, NextRequest}, Pipeline} -> case lists:member(NextRequest#request.id, State#state.canceled) of true -> + ?hcrv("next request had been cancelled", []), %% See comment for handle_cast({cancel, RequestId}) {stop, normal, State#state{request = NextRequest#request{from = answer_sent}}}; false -> + ?hcrv("next request", [{request, NextRequest}]), NewSession = Session#tcp_session{queue_length = %% Queue + current @@ -1051,15 +1125,16 @@ handle_pipeline(State = httpc_manager:insert_session(NewSession, ProfileName), Relaxed = (NextRequest#request.settings)#http_options.relaxed, + MFA = {httpc_response, + parse, + [State#state.max_header_size, Relaxed]}, NewState = - State#state{pipeline = Pipeline, - request = NextRequest, - mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, + State#state{pipeline = Pipeline, + request = NextRequest, + mfa = MFA, status_line = undefined, - headers = undefined, - body = undefined}, + headers = undefined, + body = undefined}, case Data of <<>> -> http_transport:setopts( @@ -1076,15 +1151,20 @@ handle_pipeline(State = end end. -handle_keep_alive_queue(State = #state{status = keep_alive, - session = Session, - profile_name = ProfileName, - options = #options{keep_alive_timeout - = TimeOut} - }, - Data) -> +handle_keep_alive_queue( + #state{status = keep_alive, + session = Session, + profile_name = ProfileName, + options = #options{keep_alive_timeout = TimeOut}} = State, + Data) -> + + ?hcrd("handle keep_alive", [{profile, ProfileName}, + {session, Session}, + {timeout, TimeOut}]), + case queue:out(State#state.keep_alive) of {empty, _} -> + ?hcrd("epmty keep_alive queue", []), %% The server may choose too terminate an idle keep_alive session %% in this case we want to receive the close message %% at once and not when trying to send the next @@ -1111,25 +1191,25 @@ handle_keep_alive_queue(State = #state{status = keep_alive, case lists:member(NextRequest#request.id, State#state.canceled) of true -> - handle_keep_alive_queue(State#state{keep_alive = - KeepAlive}, Data); + ?hcrv("next request has already been canceled", []), + handle_keep_alive_queue( + State#state{keep_alive = KeepAlive}, Data); false -> + ?hcrv("next request", [{request, NextRequest}]), Relaxed = (NextRequest#request.settings)#http_options.relaxed, + MFA = {httpc_response, parse, + [State#state.max_header_size, Relaxed]}, NewState = - State#state{request = NextRequest, - keep_alive = KeepAlive, - mfa = {httpc_response, parse, - [State#state.max_header_size, - Relaxed]}, + State#state{request = NextRequest, + keep_alive = KeepAlive, + mfa = MFA, status_line = undefined, - headers = undefined, - body = undefined}, + headers = undefined, + body = undefined}, case Data of <<>> -> - http_transport:setopts( - socket_type(Session#tcp_session.scheme), - Session#tcp_session.socket, [{active, once}]), + activate_once(Session), {noreply, NewState}; _ -> %% If we already received some bytes of @@ -1140,11 +1220,6 @@ handle_keep_alive_queue(State = #state{status = keep_alive, end end. -call(Msg, Pid, Timeout) -> - gen_server:call(Pid, Msg, Timeout). - -cast(Msg, Pid) -> - gen_server:cast(Pid, Msg). case_insensitive_header(Str) when is_list(Str) -> http_util:to_lower(Str); @@ -1152,20 +1227,34 @@ case_insensitive_header(Str) when is_list(Str) -> case_insensitive_header(Str) -> Str. -activate_request_timeout(State = #state{request = Request}) -> - Time = (Request#request.settings)#http_options.timeout, - case Time of +activate_once(#tcp_session{scheme = Scheme, socket = Socket}) -> + SocketType = socket_type(Scheme), + http_transport:setopts(SocketType, Socket, [{active, once}]). + +activate_request_timeout( + #state{request = #request{timer = undefined} = Request} = State) -> + Timeout = (Request#request.settings)#http_options.timeout, + case Timeout of infinity -> State; _ -> - Ref = erlang:send_after(Time, self(), - {timeout, Request#request.id}), - State#state - {timers = - #timers{request_timers = - [{Request#request.id, Ref}| - (State#state.timers)#timers.request_timers]}} - end. + ReqId = Request#request.id, + ?hcrt("activate request timer", + [{request_id, ReqId}, + {time_consumed, t() - Request#request.started}, + {timeout, Timeout}]), + Msg = {timeout, ReqId}, + Ref = erlang:send_after(Timeout, self(), Msg), + Request2 = Request#request{timer = Ref}, + ReqTimers = [{Request#request.id, Ref} | + (State#state.timers)#timers.request_timers], + Timers = #timers{request_timers = ReqTimers}, + State#state{request = Request2, timers = Timers} + end; + +%% Timer is already running! This is the case for a redirect or retry +activate_request_timeout(State) -> + State. activate_queue_timeout(infinity, State) -> State; @@ -1191,12 +1280,12 @@ is_keep_alive_connection(Headers, Session) -> (not ((Session#tcp_session.client_close) or httpc_response:is_server_closing(Headers))). -try_to_enable_pipeline_or_keep_alive(State = - #state{session = Session, - request = #request{method = Method}, - status_line = {Version, _, _}, - headers = Headers, - profile_name = ProfileName}) -> +try_to_enable_pipeline_or_keep_alive( + #state{session = Session, + request = #request{method = Method}, + status_line = {Version, _, _}, + headers = Headers, + profile_name = ProfileName} = State) -> case (is_keep_alive_enabled_server(Version, Headers) andalso is_keep_alive_connection(Headers, Session)) of true -> @@ -1209,15 +1298,16 @@ try_to_enable_pipeline_or_keep_alive(State = httpc_manager:insert_session(Session, ProfileName), %% Make sure type is keep_alive in session %% as it in this case might be pipeline - State#state{status = keep_alive, - session = - Session#tcp_session{type = keep_alive}} + NewSession = Session#tcp_session{type = keep_alive}, + State#state{status = keep_alive, + session = NewSession} end; false -> State#state{status = close} end. -answer_request(Request, Msg, #state{timers = Timers} = State) -> +answer_request(Request, Msg, #state{timers = Timers} = State) -> + ?hcrt("answer request", [{request, Request}, {msg, Msg}]), httpc_response:send(Request#request.from, Msg), RequestTimers = Timers#timers.request_timers, TimerRef = @@ -1253,14 +1343,14 @@ retry_pipeline([Request | PipeLine], case (catch httpc_manager:retry_request(Request, ProfileName)) of ok -> RequestTimers = Timers#timers.request_timers, + ReqId = Request#request.id, TimerRef = - proplists:get_value(Request#request.id, RequestTimers, - undefined), - cancel_timer(TimerRef, {timeout, Request#request.id}), - State#state{timers = Timers#timers{request_timers = - lists:delete({Request#request.id, - TimerRef}, - RequestTimers)}}; + proplists:get_value(ReqId, RequestTimers, undefined), + cancel_timer(TimerRef, {timeout, ReqId}), + NewReqsTimers = lists:delete({ReqId, TimerRef}, RequestTimers), + NewTimers = Timers#timers{request_timers = NewReqsTimers}, + State#state{timers = NewTimers}; + Error -> answer_request(Request#request.from, httpc_response:error(Request, Error), State) @@ -1347,10 +1437,12 @@ socket_type(http) -> socket_type(https) -> {ssl, []}. %% Dummy value ok for ex setops that does not use this value -start_stream({_Version, _Code, _ReasonPhrase}, _Headers, #request{stream = none} = Request) -> +start_stream({_Version, _Code, _ReasonPhrase}, _Headers, + #request{stream = none} = Request) -> ?hcrt("start stream - none", []), {ok, Request}; -start_stream({_Version, Code, _ReasonPhrase}, Headers, #request{stream = self} = Request) +start_stream({_Version, Code, _ReasonPhrase}, Headers, + #request{stream = self} = Request) when (Code =:= 200) orelse (Code =:= 206) -> ?hcrt("start stream - self", [{code, Code}]), Msg = httpc_response:stream_start(Headers, Request, ignore), @@ -1363,7 +1455,8 @@ start_stream({_Version, Code, _ReasonPhrase}, Headers, Msg = httpc_response:stream_start(Headers, Request, self()), httpc_response:send(Request#request.from, Msg), {ok, Request}; -start_stream({_Version, Code, _ReasonPhrase}, _Headers, #request{stream = Filename} = Request) +start_stream({_Version, Code, _ReasonPhrase}, _Headers, + #request{stream = Filename} = Request) when ((Code =:= 200) orelse (Code =:= 206)) andalso is_list(Filename) -> ?hcrt("start stream", [{code, Code}, {filename, Filename}]), case file:open(Filename, [write, raw, append, delayed_write]) of @@ -1497,3 +1590,21 @@ handle_verbose(_) -> %% d(_, _, _) -> %% ok. + +call(Msg, Pid) -> + Timeout = infinity, + call(Msg, Pid, Timeout). +call(Msg, Pid, Timeout) -> + gen_server:call(Pid, Msg, Timeout). + +cast(Msg, Pid) -> + gen_server:cast(Pid, Msg). + + +%% to(To, Start) when is_integer(Start) andalso (Start >= 0) -> +%% http_util:timeout(To, Start); +%% to(To, _Start) -> +%% http_util:timeout(To, t()). + +t() -> + http_util:timestamp(). |