aboutsummaryrefslogtreecommitdiffstats
path: root/lib/inets/src/http_client/httpc_handler.erl
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2010-01-13 16:18:24 +0000
committerErlang/OTP <[email protected]>2010-01-13 16:18:24 +0000
commit6153ba7599f2ce1ab22959a40b6ca33b4238f0d0 (patch)
treea81d50b08c7828d3662dd50e48bcf55b72f507b2 /lib/inets/src/http_client/httpc_handler.erl
parent68c2f188c3446f53fad03d0f652207a9a8bb1946 (diff)
downloadotp-6153ba7599f2ce1ab22959a40b6ca33b4238f0d0.tar.gz
otp-6153ba7599f2ce1ab22959a40b6ca33b4238f0d0.tar.bz2
otp-6153ba7599f2ce1ab22959a40b6ca33b4238f0d0.zip
OTP-8016, OTP-8056, OTP-8103, OTP-8106, OTP-8312, OTP-8315, OTP-8327, OTP-8349,
OTP-8351, OTP-8359 & OTP-8371.
Diffstat (limited to 'lib/inets/src/http_client/httpc_handler.erl')
-rw-r--r--lib/inets/src/http_client/httpc_handler.erl767
1 files changed, 439 insertions, 328 deletions
diff --git a/lib/inets/src/http_client/httpc_handler.erl b/lib/inets/src/http_client/httpc_handler.erl
index 7b737c2f86..25f9b0777f 100644
--- a/lib/inets/src/http_client/httpc_handler.erl
+++ b/lib/inets/src/http_client/httpc_handler.erl
@@ -1,19 +1,19 @@
%%
%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 2002-2009. All Rights Reserved.
-%%
+%%
+%% Copyright Ericsson AB 2002-2010. All Rights Reserved.
+%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% %CopyrightEnd%
%%
%%
@@ -28,7 +28,8 @@
%%--------------------------------------------------------------------
%% Internal Application API
--export([start_link/3, send/2, cancel/2, stream/3, stream_next/1]).
+-export([start_link/2, connect_and_send/2,
+ send/2, cancel/2, stream/3, stream_next/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -50,7 +51,7 @@
mfa, % {Moduel, Function, Args}
pipeline = queue:new(), % queue()
keep_alive = queue:new(), % queue()
- status = new, % new | pipeline | keep_alive | close | ssl_tunnel
+ status, % undefined | new | pipeline | keep_alive | close | ssl_tunnel
canceled = [], % [RequestId]
max_header_size = nolimit, % nolimit | integer()
max_body_size = nolimit, % nolimit | integer()
@@ -85,9 +86,13 @@
%% the reply or part of it has arrived.)
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
-start_link(Request, Options, ProfileName) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [[Request, Options,
- ProfileName]])}.
+
+start_link(Options, ProfileName) ->
+ Args = [Options, ProfileName],
+ gen_server:start_link(?MODULE, Args, []).
+
+connect_and_send(Request, HandlerPid) ->
+ call({connect_and_send, Request}, HandlerPid).
%%--------------------------------------------------------------------
@@ -192,10 +197,9 @@ stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed
%%====================================================================
%%--------------------------------------------------------------------
-%% Function: init([Request, Options, ProfileName]) -> {ok, State} |
-%% {ok, State, Timeout} | ignore |{stop, Reason}
+%% Function: init([Options, ProfileName]) -> {ok, State} |
+%% {ok, State, Timeout} | ignore | {stop, Reason}
%%
-%% Request = #request{}
%% Options = #options{}
%% ProfileName = atom() - id of httpc manager process
%%
@@ -206,30 +210,16 @@ stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed
%% but we do not want that so errors will be handled by the process
%% sending an init_error message to itself.
%%--------------------------------------------------------------------
-init([Request, Options, ProfileName]) ->
+init([Options, ProfileName]) ->
+ ?hcrv("init - starting", [{options, Options}, {profile, ProfileName}]),
process_flag(trap_exit, true),
-
handle_verbose(Options#options.verbose),
- Address = handle_proxy(Request#request.address, Options#options.proxy),
- {ok, State} =
- case {Address /= Request#request.address, Request#request.scheme} of
- {true, https} ->
- Error = https_through_proxy_is_not_currently_supported,
- self() ! {init_error,
- Error, httpc_response:error(Request, Error)},
- {ok, #state{request = Request, options = Options,
- status = ssl_tunnel}};
- %% This is what we should do if and when ssl supports
- %% "socket upgrading"
- %%send_ssl_tunnel_request(Address, Request,
- %% #state{options = Options,
- %% status = ssl_tunnel});
- {_, _} ->
- send_first_request(Address, Request,
- #state{options = Options,
- profile_name = ProfileName})
- end,
- gen_server:enter_loop(?MODULE, [], State).
+ State = #state{status = undefined,
+ options = Options,
+ profile_name = ProfileName},
+ ?hcrd("init - started", []),
+ {ok, State}.
+
%%--------------------------------------------------------------------
%% Function: handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -240,39 +230,85 @@ init([Request, Options, ProfileName]) ->
%% {stop, Reason, State} (terminate/2 is called)
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call(Request, _, State = #state{session = Session =
- #tcp_session{socket = Socket,
- type = pipeline},
- timers = Timers,
- options = Options,
- profile_name = ProfileName}) ->
+
+
+%% This is the first request, the reason the proc was started
+handle_call({connect_and_send, #request{address = Address0,
+ scheme = Scheme} = Request},
+ _From,
+ #state{options = #options{proxy = Proxy},
+ status = undefined,
+ session = undefined} = State) ->
+ ?hcrv("connect and send", [{address0, Address0}, {proxy, Proxy}]),
+ Address = handle_proxy(Address0, Proxy),
+ if
+ ((Address =/= Address0) andalso (Scheme =:= https)) ->
+ %% This is what we should do if and when ssl supports
+ %% "socket upgrading"
+ %%send_ssl_tunnel_request(Address, Request,
+ %% #state{options = Options,
+ %% status = ssl_tunnel});
+ Reason = https_through_proxy_is_not_currently_supported,
+ Error = {error, Reason},
+ {stop, Error, Error, State};
+ true ->
+ case connect_and_send_first_request(Address, Request, State) of
+ {ok, NewState} ->
+ {reply, ok, NewState};
+ {stop, Error, NewState} ->
+ {stop, Error, Error, NewState}
+ end
+ end;
+
+handle_call(Request, _,
+ #state{status = Status,
+ session = #tcp_session{socket = Socket,
+ type = pipeline} = Session,
+ timers = Timers,
+ options = Options,
+ profile_name = ProfileName} = State)
+ when Status =/= undefined ->
+
+ ?hcrv("new request", [{request, Request},
+ {profile, ProfileName},
+ {status, Status},
+ {session_type, pipeline},
+ {timers, Timers}]),
+
Address = handle_proxy(Request#request.address, Options#options.proxy),
case httpc_request:send(Address, Request, Socket) of
ok ->
+
+ ?hcrd("request sent", []),
+
%% Activate the request time out for the new request
- NewState = activate_request_timeout(State#state{request =
- Request}),
+ NewState =
+ activate_request_timeout(State#state{request = Request}),
+
+ ClientClose =
+ httpc_request:is_client_closing(Request#request.headers),
- ClientClose = httpc_request:is_client_closing(
- Request#request.headers),
case State#state.request of
- #request{} -> %% Old request no yet finished
+ #request{} -> %% Old request not yet finished
+ ?hcrd("old request still not finished", []),
%% Make sure to use the new value of timers in state
- NewTimers = NewState#state.timers,
+ NewTimers = NewState#state.timers,
NewPipeline = queue:in(Request, State#state.pipeline),
- NewSession =
+ NewSession =
Session#tcp_session{queue_length =
%% Queue + current
queue:len(NewPipeline) + 1,
client_close = ClientClose},
httpc_manager:insert_session(NewSession, ProfileName),
+ ?hcrd("session updated", []),
{reply, ok, State#state{pipeline = NewPipeline,
- session = NewSession,
- timers = NewTimers}};
+ session = NewSession,
+ timers = NewTimers}};
undefined ->
- %% Note: tcp-message reciving has already been
+ %% Note: tcp-message receiving has already been
%% activated by handle_pipeline/2.
+ ?hcrd("no current request", []),
cancel_timer(Timers#timers.queue_timer,
timeout_queue),
NewSession =
@@ -281,54 +317,67 @@ handle_call(Request, _, State = #state{session = Session =
httpc_manager:insert_session(NewSession, ProfileName),
Relaxed =
(Request#request.settings)#http_options.relaxed,
- {reply, ok,
- NewState#state{request = Request,
- session = NewSession,
- mfa = {httpc_response, parse,
- [State#state.max_header_size,
- Relaxed]},
- timers =
- Timers#timers{queue_timer =
- undefined}}}
+ MFA = {httpc_response, parse,
+ [State#state.max_header_size, Relaxed]},
+ NewTimers = Timers#timers{queue_timer = undefined},
+ ?hcrd("session created", []),
+ {reply, ok, NewState#state{request = Request,
+ session = NewSession,
+ mfa = MFA,
+ timers = NewTimers}}
end;
{error, Reason} ->
+ ?hcri("failed sending request", [{reason, Reason}]),
{reply, {pipeline_failed, Reason}, State}
end;
-handle_call(Request, _, #state{session = Session =
- #tcp_session{type = keep_alive,
- socket = Socket},
- timers = Timers,
- options = Options,
- profile_name = ProfileName} = State) ->
-
- ClientClose = httpc_request:is_client_closing(Request#request.headers),
+handle_call(Request, _,
+ #state{status = Status,
+ session = #tcp_session{socket = Socket,
+ type = keep_alive} = Session,
+ timers = Timers,
+ options = Options,
+ profile_name = ProfileName} = State)
+ when Status =/= undefined ->
- Address = handle_proxy(Request#request.address,
- Options#options.proxy),
+ ?hcrv("new request", [{request, Request},
+ {profile, ProfileName},
+ {status, Status},
+ {session_type, keep_alive}]),
+
+ Address = handle_proxy(Request#request.address, Options#options.proxy),
case httpc_request:send(Address, Request, Socket) of
ok ->
+
+ ?hcrd("request sent", []),
+
+ %% Activate the request time out for the new request
NewState =
- activate_request_timeout(State#state{request =
- Request}),
+ activate_request_timeout(State#state{request = Request}),
+
+ ClientClose =
+ httpc_request:is_client_closing(Request#request.headers),
case State#state.request of
#request{} -> %% Old request not yet finished
%% Make sure to use the new value of timers in state
- NewTimers = NewState#state.timers,
+ ?hcrd("old request still not finished", []),
+ NewTimers = NewState#state.timers,
NewKeepAlive = queue:in(Request, State#state.keep_alive),
- NewSession =
+ NewSession =
Session#tcp_session{queue_length =
%% Queue + current
queue:len(NewKeepAlive) + 1,
client_close = ClientClose},
httpc_manager:insert_session(NewSession, ProfileName),
+ ?hcrd("session updated", []),
{reply, ok, State#state{keep_alive = NewKeepAlive,
- session = NewSession,
- timers = NewTimers}};
+ session = NewSession,
+ timers = NewTimers}};
undefined ->
%% Note: tcp-message reciving has already been
%% activated by handle_pipeline/2.
+ ?hcrd("no current request", []),
cancel_timer(Timers#timers.queue_timer,
timeout_queue),
NewSession =
@@ -337,17 +386,19 @@ handle_call(Request, _, #state{session = Session =
httpc_manager:insert_session(NewSession, ProfileName),
Relaxed =
(Request#request.settings)#http_options.relaxed,
- {reply, ok,
- NewState#state{request = Request,
- session = NewSession,
- mfa = {httpc_response, parse,
- [State#state.max_header_size,
- Relaxed]}}}
+ MFA = {httpc_response, parse,
+ [State#state.max_header_size, Relaxed]},
+ {reply, ok, NewState#state{request = Request,
+ session = NewSession,
+ mfa = MFA}}
end;
- {error, Reason} ->
+
+ {error, Reason} ->
+ ?hcri("failed sending request", [{reason, Reason}]),
{reply, {request_failed, Reason}, State}
end.
+
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
@@ -367,16 +418,28 @@ handle_call(Request, _, #state{session = Session =
%% handle_keep_alive_queue/2 on the other hand will just skip the
%% request as if it was never issued as in this case the request will
%% not have been sent.
-handle_cast({cancel, RequestId}, State = #state{request = Request =
- #request{id = RequestId},
- profile_name = ProfileName}) ->
+handle_cast({cancel, RequestId},
+ #state{request = #request{id = RequestId} = Request,
+ profile_name = ProfileName,
+ canceled = Canceled} = State) ->
+ ?hcrv("cancel current request", [{request_id, RequestId},
+ {profile, ProfileName},
+ {canceled, Canceled}]),
httpc_manager:request_canceled(RequestId, ProfileName),
+ ?hcrv("canceled", []),
{stop, normal,
- State#state{canceled = [RequestId | State#state.canceled],
- request = Request#request{from = answer_sent}}};
-handle_cast({cancel, RequestId}, State = #state{profile_name = ProfileName}) ->
+ State#state{canceled = [RequestId | Canceled],
+ request = Request#request{from = answer_sent}}};
+handle_cast({cancel, RequestId},
+ #state{profile_name = ProfileName,
+ canceled = Canceled} = State) ->
+ ?hcrv("cancel", [{request_id, RequestId},
+ {profile, ProfileName},
+ {canceled, Canceled}]),
httpc_manager:request_canceled(RequestId, ProfileName),
- {noreply, State#state{canceled = [RequestId | State#state.canceled]}};
+ ?hcrv("canceled", []),
+ {noreply, State#state{canceled = [RequestId | Canceled]}};
+
handle_cast(stream_next, #state{session = Session} = State) ->
http_transport:setopts(socket_type(Session#tcp_session.scheme),
Session#tcp_session.socket, [{active, once}]),
@@ -399,7 +462,13 @@ handle_info({Proto, _Socket, Data},
(Proto =:= ssl) orelse
(Proto =:= httpc_handler) ->
- ?hcri("received data", [{proto, Proto}, {data, Data}, {mfa, MFA}, {method, Method}, {stream, Stream}, {session, Session}, {status_line, StatusLine}]),
+ ?hcri("received data", [{proto, Proto},
+ {data, Data},
+ {mfa, MFA},
+ {method, Method},
+ {stream, Stream},
+ {session, Session},
+ {status_line, StatusLine}]),
FinalResult =
try Module:Function([Data | Args]) of
@@ -410,22 +479,23 @@ handle_info({Proto, _Socket, Data},
?hcrd("data processed - whole body", []),
handle_response(State#state{body = <<>>});
{Module, whole_body, [Body, Length]} ->
- ?hcrd("data processed - whole body", [{module, Module}, {body, Body}, {length, Length}]),
+ ?hcrd("data processed - whole body",
+ [{module, Module}, {body, Body}, {length, Length}]),
{_, Code, _} = StatusLine,
{NewBody, NewRequest} = stream(Body, Request, Code),
%% When we stream we will not keep the already
%% streamed data, that would be a waste of memory.
- NewLength = case Stream of
- none ->
- Length;
- _ ->
- Length - size(Body)
- end,
+ NewLength =
+ case Stream of
+ none ->
+ Length;
+ _ ->
+ Length - size(Body)
+ end,
NewState = next_body_chunk(State),
-
- {noreply, NewState#state{mfa = {Module, whole_body,
- [NewBody, NewLength]},
+ NewMFA = {Module, whole_body, [NewBody, NewLength]},
+ {noreply, NewState#state{mfa = NewMFA,
request = NewRequest}};
NewMFA ->
?hcrd("data processed", [{new_mfa, NewMFA}]),
@@ -435,16 +505,14 @@ handle_info({Proto, _Socket, Data},
{noreply, State#state{mfa = NewMFA}}
catch
exit:_ ->
- ClientErrMsg = httpc_response:error(Request,
- {could_not_parse_as_http,
- Data}),
- NewState = answer_request(Request, ClientErrMsg, State),
+ ClientReason = {could_not_parse_as_http, Data},
+ ClientErrMsg = httpc_response:error(Request, ClientReason),
+ NewState = answer_request(Request, ClientErrMsg, State),
{stop, normal, NewState};
- error:_ ->
- ClientErrMsg = httpc_response:error(Request,
- {could_not_parse_as_http,
- Data}),
- NewState = answer_request(Request, ClientErrMsg, State),
+ error:_ ->
+ ClientReason = {could_not_parse_as_http, Data},
+ ClientErrMsg = httpc_response:error(Request, ClientReason),
+ NewState = answer_request(Request, ClientErrMsg, State),
{stop, normal, NewState}
end,
@@ -453,10 +521,10 @@ handle_info({Proto, _Socket, Data},
handle_info({Proto, Socket, Data},
- #state{mfa = MFA,
- request = Request,
- session = Session,
- status = Status,
+ #state{mfa = MFA,
+ request = Request,
+ session = Session,
+ status = Status,
status_line = StatusLine,
profile_name = Profile} = State)
when (Proto =:= tcp) orelse
@@ -474,6 +542,7 @@ handle_info({Proto, Socket, Data},
"~n",
[Proto, Socket, Data, MFA,
Request, Session, Status, StatusLine, Profile]),
+
{noreply, State};
@@ -513,28 +582,35 @@ handle_info({ssl_error, _, _} = Reason, State) ->
handle_info({timeout, RequestId},
#state{request = #request{id = RequestId} = Request,
canceled = Canceled} = State) ->
+ ?hcri("timeout of current request", [{id, RequestId}]),
httpc_response:send(Request#request.from,
- httpc_response:error(Request,timeout)),
+ httpc_response:error(Request, timeout)),
+ ?hcrv("response (timeout) sent - now terminate", []),
{stop, normal,
State#state{request = Request#request{from = answer_sent},
canceled = [RequestId | Canceled]}};
handle_info({timeout, RequestId}, #state{canceled = Canceled} = State) ->
+ ?hcri("timeout", [{id, RequestId}]),
Filter =
fun(#request{id = Id, from = From} = Request) when Id =:= RequestId ->
+ ?hcrv("found request", [{id, Id}, {from, From}]),
%% Notify the owner
Response = httpc_response:error(Request, timeout),
httpc_response:send(From, Response),
+ ?hcrv("response (timeout) sent", []),
[Request#request{from = answer_sent}];
(_) ->
true
end,
case State#state.status of
pipeline ->
+ ?hcrd("pipeline", []),
Pipeline = queue:filter(Filter, State#state.pipeline),
{noreply, State#state{canceled = [RequestId | Canceled],
pipeline = Pipeline}};
keep_alive ->
+ ?hcrd("keep_alive", []),
KeepAlive = queue:filter(Filter, State#state.keep_alive),
{noreply, State#state{canceled = [RequestId | Canceled],
keep_alive = KeepAlive}}
@@ -577,9 +653,10 @@ terminate(normal, #state{session = undefined}) ->
%% Init error sending, no session information has been setup but
%% there is a socket that needs closing.
-terminate(normal, #state{request = Request,
- session = #tcp_session{id = undefined,
- socket = Socket}}) ->
+terminate(normal,
+ #state{request = Request,
+ session = #tcp_session{id = undefined,
+ socket = Socket}}) ->
http_transport:close(socket_type(Request), Socket);
%% Socket closed remotely
@@ -605,23 +682,28 @@ terminate(normal,
%% And, just in case, close our side (**really** overkill)
http_transport:close(socket_type(Request), Socket);
-terminate(_, State = #state{session = Session,
- request = undefined,
- profile_name = ProfileName,
- timers = Timers,
- pipeline = Pipeline,
- keep_alive = KeepAlive}) ->
- catch httpc_manager:delete_session(Session#tcp_session.id,
- ProfileName),
+terminate(_, #state{session = #tcp_session{id = Id,
+ socket = Socket,
+ scheme = Scheme},
+ request = undefined,
+ profile_name = ProfileName,
+ timers = Timers,
+ pipeline = Pipeline,
+ keep_alive = KeepAlive} = State) ->
+ (catch httpc_manager:delete_session(Id, ProfileName)),
maybe_retry_queue(Pipeline, State),
maybe_retry_queue(KeepAlive, State),
cancel_timer(Timers#timers.queue_timer, timeout_queue),
- Socket = Session#tcp_session.socket,
- http_transport:close(socket_type(Session#tcp_session.scheme), Socket);
+ http_transport:close(socket_type(Scheme), Socket);
-terminate(Reason, State = #state{request = Request}) ->
+terminate(Reason, #state{request = undefined}) ->
+ ?hcrt("terminate", [{reason, Reason}]),
+ ok;
+
+terminate(Reason, #state{request = Request} = State) ->
+ ?hcrd("terminate", [{reason, Reason}, {request, Request}]),
NewState = maybe_send_answer(Request,
httpc_response:error(Request, Reason),
State),
@@ -641,13 +723,16 @@ maybe_send_answer(Request, Answer, State) ->
answer_request(Request, Answer, State).
deliver_answers([]) ->
+ ?hcrd("deliver answer done", []),
ok;
-deliver_answers([#request{from = From} = Request | Requests])
+deliver_answers([#request{id = Id, from = From} = Request | Requests])
when is_pid(From) ->
Response = httpc_response:error(Request, socket_closed_remotely),
+ ?hcrd("deliver answer", [{id, Id}, {from, From}, {response, Response}]),
httpc_response:send(From, Response),
deliver_answers(Requests);
-deliver_answers([_|Requests]) ->
+deliver_answers([Request|Requests]) ->
+ ?hcrd("skip deliver answer", [{request, Request}]),
deliver_answers(Requests).
@@ -728,77 +813,58 @@ connect(SocketType, ToAddress, #options{ipfamily = IpFamily,
http_transport:connect(SocketType, ToAddress, Opts3, Timeout)
end.
-
-send_first_request(Address, Request, #state{options = Options} = State) ->
- SocketType = socket_type(Request),
- ConnTimeout = (Request#request.settings)#http_options.connect_timeout,
- ?hcri("connect",
+connect_and_send_first_request(Address,
+ #request{settings = Settings,
+ headers = Headers,
+ address = OrigAddress,
+ scheme = Scheme} = Request,
+ #state{options = Options} = State) ->
+
+ ?hcrd("connect",
[{address, Address}, {request, Request}, {options, Options}]),
+
+ SocketType = socket_type(Request),
+ ConnTimeout = Settings#http_options.connect_timeout,
case connect(SocketType, Address, Options, ConnTimeout) of
{ok, Socket} ->
- ?hcri("connected - now send first request", [{socket, Socket}]),
+ ?hcrd("connected - now send first request", [{socket, Socket}]),
case httpc_request:send(Address, Request, Socket) of
ok ->
- ?hcri("first request sent", []),
+ ?hcrd("first request sent", []),
ClientClose =
- httpc_request:is_client_closing(
- Request#request.headers),
+ httpc_request:is_client_closing(Headers),
SessionType = httpc_manager:session_type(Options),
Session =
- #tcp_session{id = {Request#request.address, self()},
- scheme = Request#request.scheme,
- socket = Socket,
+ #tcp_session{id = {OrigAddress, self()},
+ scheme = Scheme,
+ socket = Socket,
client_close = ClientClose,
- type = SessionType},
- TmpState = State#state{request = Request,
- session = Session,
- mfa = init_mfa(Request, State),
- status_line =
- init_status_line(Request),
- headers = undefined,
- body = undefined,
- status = new},
- http_transport:setopts(SocketType,
- Socket, [{active, once}]),
+ type = SessionType},
+ TmpState =
+ State#state{request = Request,
+ session = Session,
+ mfa = init_mfa(Request, State),
+ status_line = init_status_line(Request),
+ headers = undefined,
+ body = undefined,
+ status = new},
+ ?hcrt("activate socket", []),
+ activate_once(Session),
NewState = activate_request_timeout(TmpState),
{ok, NewState};
- {error, Reason} ->
- %% Commented out in wait of ssl support to avoid
- %% dialyzer warning
- %%case State#state.status of
- %% new -> % Called from init/1
- self() ! {init_error, error_sending,
- httpc_response:error(Request, Reason)},
- {ok, State#state{request = Request,
- session =
- #tcp_session{socket = Socket}}}
- %%ssl_tunnel -> % Not called from init/1
- %% NewState =
- %% answer_request(Request,
- %%httpc_response:error(Request,
- %%Reason),
- %% State),
- %% {stop, normal, NewState}
- %% end
+ {error, Reason} ->
+ ?hcrv("failed sending request", [{reason, Reason}]),
+ Error = {error, {send_failed,
+ httpc_response:error(Request, Reason)}},
+ {stop, Error, State#state{request = Request}}
end;
- {error, Reason} ->
- %% Commented out in wait of ssl support to avoid
- %% dialyzer warning
- %% case State#state.status of
- %% new -> % Called from init/1
- self() ! {init_error, error_connecting,
- httpc_response:error(Request, Reason)},
- {ok, State#state{request = Request}}
- %% ssl_tunnel -> % Not called from init/1
- %% NewState =
- %% answer_request(Request,
- %% httpc_response:error(Request,
- %% Reason),
- %% State),
- %% {stop, normal, NewState}
- %%end
+ {error, Reason} ->
+ ?hcri("connect failed", [{reason, Reason}]),
+ Error = {error, {connect_failed,
+ httpc_response:error(Request, Reason)}},
+ {stop, Error, State#state{request = Request}}
end.
handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body},
@@ -806,23 +872,23 @@ handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body},
?hcrt("handle_http_msg", [{body, Body}]),
case Headers#http_response_h.'content-type' of
"multipart/byteranges" ++ _Param ->
- exit(not_yet_implemented);
+ exit({not_yet_implemented, multypart_nyteranges});
_ ->
- StatusLine = {Version, StatusCode, ReasonPharse},
+ StatusLine = {Version, StatusCode, ReasonPharse},
{ok, NewRequest} = start_stream(StatusLine, Headers, Request),
handle_http_body(Body,
State#state{request = NewRequest,
status_line = StatusLine,
headers = Headers})
end;
-handle_http_msg({ChunkedHeaders, Body},
- State = #state{headers = Headers}) ->
- ?hcrt("handle_http_msg", [{chunked_headers, ChunkedHeaders}, {body, Body}]),
+handle_http_msg({ChunkedHeaders, Body}, #state{headers = Headers} = State) ->
+ ?hcrt("handle_http_msg",
+ [{chunked_headers, ChunkedHeaders}, {body, Body}]),
NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders),
handle_response(State#state{headers = NewHeaders, body = Body});
-handle_http_msg(Body, State = #state{status_line = {_,Code, _}}) ->
+handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) ->
?hcrt("handle_http_msg", [{body, Body}, {code, Code}]),
- {NewBody, NewRequest}= stream(Body, State#state.request, Code),
+ {NewBody, NewRequest} = stream(Body, State#state.request, Code),
handle_response(State#state{body = NewBody, request = NewRequest}).
handle_http_body(<<>>, State = #state{status_line = {_,304, _}}) ->
@@ -837,11 +903,15 @@ handle_http_body(<<>>, State = #state{request = #request{method = head}}) ->
?hcrt("handle_http_body - head", []),
handle_response(State#state{body = <<>>});
-handle_http_body(Body, State = #state{headers = Headers,
+handle_http_body(Body, State = #state{headers = Headers,
max_body_size = MaxBodySize,
- status_line = {_,Code, _},
- request = Request}) ->
- ?hcrt("handle_http_body", [{body, Body}, {max_body_size, MaxBodySize}, {code, Code}]),
+ status_line = {_,Code, _},
+ request = Request}) ->
+ ?hcrt("handle_http_body",
+ [{headers, Headers},
+ {body, Body},
+ {max_body_size, MaxBodySize},
+ {code, Code}]),
TransferEnc = Headers#http_response_h.'transfer-encoding',
case case_insensitive_header(TransferEnc) of
"chunked" ->
@@ -850,12 +920,17 @@ handle_http_body(Body, State = #state{headers = Headers,
State#state.max_header_size,
{Code, Request}) of
{Module, Function, Args} ->
- ?hcrt("handle_http_body - new mfa", [{module, Module}, {function, Function}, {args, Args}]),
+ ?hcrt("handle_http_body - new mfa",
+ [{module, Module},
+ {function, Function},
+ {args, Args}]),
NewState = next_body_chunk(State),
{noreply, NewState#state{mfa =
{Module, Function, Args}}};
{ok, {ChunkedHeaders, NewBody}} ->
- ?hcrt("handle_http_body - nyew body", [{chunked_headers, ChunkedHeaders}, {new_body, NewBody}]),
+ ?hcrt("handle_http_body - new body",
+ [{chunked_headers, ChunkedHeaders},
+ {new_body, NewBody}]),
NewHeaders = http_chunk:handle_headers(Headers,
ChunkedHeaders),
handle_response(State#state{headers = NewHeaders,
@@ -872,12 +947,13 @@ handle_http_body(Body, State = #state{headers = Headers,
?hcrt("handle_http_body - other", []),
Length =
list_to_integer(Headers#http_response_h.'content-length'),
- case ((Length =< MaxBodySize) or (MaxBodySize == nolimit)) of
+ case ((Length =< MaxBodySize) orelse (MaxBodySize =:= nolimit)) of
true ->
case httpc_response:whole_body(Body, Length) of
{ok, Body} ->
- {NewBody, NewRequest}= stream(Body, Request, Code),
- handle_response(State#state{body = NewBody,
+ {NewBody, NewRequest} =
+ stream(Body, Request, Code),
+ handle_response(State#state{body = NewBody,
request = NewRequest});
MFA ->
NewState = next_body_chunk(State),
@@ -893,42 +969,31 @@ handle_http_body(Body, State = #state{headers = Headers,
end
end.
-%%% Normaly I do not comment out code, I throw it away. But this might
-%%% actually be used on day if ssl is improved.
-%% handle_response(State = #state{status = ssl_tunnel,
-%% request = Request,
-%% options = Options,
-%% session = #tcp_session{socket = Socket,
-%% scheme = Scheme},
-%% status_line = {_, 200, _}}) ->
-%% %%% Insert code for upgrading the socket if and when ssl supports this.
-%% Address = handle_proxy(Request#request.address, Options#options.proxy),
-%% send_first_request(Address, Request, State);
-%% handle_response(State = #state{status = ssl_tunnel,
-%% request = Request}) ->
-%% NewState = answer_request(Request,
-%% httpc_response:error(Request,
-%% ssl_proxy_tunnel_failed),
-%% State),
-%% {stop, normal, NewState};
-
-handle_response(State = #state{status = new}) ->
- handle_response(try_to_enable_pipeline_or_keep_alive(State));
-
-handle_response(State =
- #state{request = Request,
+handle_response(#state{status = new} = State) ->
+ ?hcrd("handle response - status = new", []),
+ handle_response(try_to_enable_pipeline_or_keep_alive(State));
+
+handle_response(#state{request = Request,
status = Status,
session = Session,
status_line = StatusLine,
headers = Headers,
body = Body,
options = Options,
- profile_name = ProfileName}) when Status =/= new ->
- ?hcrt("handle response", [{status, Status}, {session, Session}, {status_line, StatusLine}, {profile_name, ProfileName}]),
+ profile_name = ProfileName} = State)
+ when Status =/= new ->
+
+ ?hcrd("handle response", [{profile, ProfileName},
+ {status, Status},
+ {request, Request},
+ {session, Session},
+ {status_line, StatusLine}]),
+
handle_cookies(Headers, Request, Options, ProfileName),
case httpc_response:result({StatusLine, Headers, Body}, Request) of
%% 100-continue
continue ->
+ ?hcrd("handle response - continue", []),
%% Send request body
{_, RequestBody} = Request#request.content,
http_transport:send(socket_type(Session#tcp_session.scheme),
@@ -939,44 +1004,46 @@ handle_response(State =
Session#tcp_session.socket,
[{active, once}]),
Relaxed = (Request#request.settings)#http_options.relaxed,
- {noreply,
- State#state{mfa = {httpc_response, parse,
- [State#state.max_header_size,
- Relaxed]},
- status_line = undefined,
- headers = undefined,
- body = undefined
- }};
+ MFA = {httpc_response, parse,
+ [State#state.max_header_size, Relaxed]},
+ {noreply, State#state{mfa = MFA,
+ status_line = undefined,
+ headers = undefined,
+ body = undefined}};
+
%% Ignore unexpected 100-continue response and receive the
%% actual response that the server will send right away.
{ignore, Data} ->
+ ?hcrd("handle response - ignore", [{data, Data}]),
Relaxed = (Request#request.settings)#http_options.relaxed,
- NewState = State#state{mfa =
- {httpc_response, parse,
- [State#state.max_header_size,
- Relaxed]},
+ MFA = {httpc_response, parse,
+ [State#state.max_header_size, Relaxed]},
+ NewState = State#state{mfa = MFA,
status_line = undefined,
- headers = undefined,
- body = undefined},
+ headers = undefined,
+ body = undefined},
handle_info({httpc_handler, dummy, Data}, NewState);
+
%% On a redirect or retry the current request becomes
%% obsolete and the manager will create a new request
%% with the same id as the current.
{redirect, NewRequest, Data} ->
- ?hcrt("handle response - redirect", [{new_request, NewRequest}, {data, Data}]),
+ ?hcrt("handle response - redirect",
+ [{new_request, NewRequest}, {data, Data}]),
ok = httpc_manager:redirect_request(NewRequest, ProfileName),
handle_queue(State#state{request = undefined}, Data);
{retry, TimeNewRequest, Data} ->
- ?hcrt("handle response - retry", [{time_new_request, TimeNewRequest}, {data, Data}]),
+ ?hcrt("handle response - retry",
+ [{time_new_request, TimeNewRequest}, {data, Data}]),
ok = httpc_manager:retry_request(TimeNewRequest, ProfileName),
handle_queue(State#state{request = undefined}, Data);
{ok, Msg, Data} ->
- ?hcrt("handle response - result ok", [{msg, Msg}, {data, Data}]),
+ ?hcrd("handle response - ok", [{msg, Msg}, {data, Data}]),
end_stream(StatusLine, Request),
NewState = answer_request(Request, Msg, State),
handle_queue(NewState, Data);
{stop, Msg} ->
- ?hcrt("handle response - result stop", [{msg, Msg}]),
+ ?hcrd("handle response - stop", [{msg, Msg}]),
end_stream(StatusLine, Request),
NewState = answer_request(Request, Msg, State),
{stop, normal, NewState}
@@ -990,60 +1057,67 @@ handle_cookies(_,_, #options{cookies = verify}, _) ->
ok;
handle_cookies(Headers, Request, #options{cookies = enabled}, ProfileName) ->
{Host, _ } = Request#request.address,
- Cookies = http_cookie:cookies(Headers#http_response_h.other,
+ Cookies = httpc_cookie:cookies(Headers#http_response_h.other,
Request#request.path, Host),
httpc_manager:store_cookies(Cookies, Request#request.address,
ProfileName).
%% This request could not be pipelined or used as sequential keept alive
%% queue
-handle_queue(State = #state{status = close}, _) ->
+handle_queue(#state{status = close} = State, _) ->
{stop, normal, State};
-handle_queue(State = #state{status = keep_alive}, Data) ->
+handle_queue(#state{status = keep_alive} = State, Data) ->
handle_keep_alive_queue(State, Data);
-handle_queue(State = #state{status = pipeline}, Data) ->
+handle_queue(#state{status = pipeline} = State, Data) ->
handle_pipeline(State, Data).
-handle_pipeline(State =
- #state{status = pipeline, session = Session,
+handle_pipeline(#state{status = pipeline,
+ session = Session,
profile_name = ProfileName,
- options = #options{pipeline_timeout = TimeOut}},
- Data) ->
+ options = #options{pipeline_timeout = TimeOut}} =
+ State,
+ Data) ->
+
+ ?hcrd("handle pipeline", [{profile, ProfileName},
+ {session, Session},
+ {timeout, TimeOut}]),
+
case queue:out(State#state.pipeline) of
{empty, _} ->
+ ?hcrd("epmty pipeline queue", []),
+
%% The server may choose too teminate an idle pipeline
%% in this case we want to receive the close message
%% at once and not when trying to pipeline the next
%% request.
- http_transport:setopts(socket_type(Session#tcp_session.scheme),
- Session#tcp_session.socket,
- [{active, once}]),
+ activate_once(Session),
+
%% If a pipeline that has been idle for some time is not
%% closed by the server, the client may want to close it.
- NewState = activate_queue_timeout(TimeOut, State),
+ NewState = activate_queue_timeout(TimeOut, State),
NewSession = Session#tcp_session{queue_length = 0},
httpc_manager:insert_session(NewSession, ProfileName),
%% Note mfa will be initilized when a new request
%% arrives.
{noreply,
- NewState#state{request = undefined,
- mfa = undefined,
+ NewState#state{request = undefined,
+ mfa = undefined,
status_line = undefined,
- headers = undefined,
- body = undefined
- }
- };
+ headers = undefined,
+ body = undefined}};
{{value, NextRequest}, Pipeline} ->
case lists:member(NextRequest#request.id,
State#state.canceled) of
true ->
+ ?hcrv("next request had been cancelled", []),
%% See comment for handle_cast({cancel, RequestId})
{stop, normal,
State#state{request =
NextRequest#request{from = answer_sent}}};
false ->
+ ?hcrv("next request", [{request, NextRequest}]),
NewSession =
Session#tcp_session{queue_length =
%% Queue + current
@@ -1051,15 +1125,16 @@ handle_pipeline(State =
httpc_manager:insert_session(NewSession, ProfileName),
Relaxed =
(NextRequest#request.settings)#http_options.relaxed,
+ MFA = {httpc_response,
+ parse,
+ [State#state.max_header_size, Relaxed]},
NewState =
- State#state{pipeline = Pipeline,
- request = NextRequest,
- mfa = {httpc_response, parse,
- [State#state.max_header_size,
- Relaxed]},
+ State#state{pipeline = Pipeline,
+ request = NextRequest,
+ mfa = MFA,
status_line = undefined,
- headers = undefined,
- body = undefined},
+ headers = undefined,
+ body = undefined},
case Data of
<<>> ->
http_transport:setopts(
@@ -1076,15 +1151,20 @@ handle_pipeline(State =
end
end.
-handle_keep_alive_queue(State = #state{status = keep_alive,
- session = Session,
- profile_name = ProfileName,
- options = #options{keep_alive_timeout
- = TimeOut}
- },
- Data) ->
+handle_keep_alive_queue(
+ #state{status = keep_alive,
+ session = Session,
+ profile_name = ProfileName,
+ options = #options{keep_alive_timeout = TimeOut}} = State,
+ Data) ->
+
+ ?hcrd("handle keep_alive", [{profile, ProfileName},
+ {session, Session},
+ {timeout, TimeOut}]),
+
case queue:out(State#state.keep_alive) of
{empty, _} ->
+ ?hcrd("epmty keep_alive queue", []),
%% The server may choose too terminate an idle keep_alive session
%% in this case we want to receive the close message
%% at once and not when trying to send the next
@@ -1111,25 +1191,25 @@ handle_keep_alive_queue(State = #state{status = keep_alive,
case lists:member(NextRequest#request.id,
State#state.canceled) of
true ->
- handle_keep_alive_queue(State#state{keep_alive =
- KeepAlive}, Data);
+ ?hcrv("next request has already been canceled", []),
+ handle_keep_alive_queue(
+ State#state{keep_alive = KeepAlive}, Data);
false ->
+ ?hcrv("next request", [{request, NextRequest}]),
Relaxed =
(NextRequest#request.settings)#http_options.relaxed,
+ MFA = {httpc_response, parse,
+ [State#state.max_header_size, Relaxed]},
NewState =
- State#state{request = NextRequest,
- keep_alive = KeepAlive,
- mfa = {httpc_response, parse,
- [State#state.max_header_size,
- Relaxed]},
+ State#state{request = NextRequest,
+ keep_alive = KeepAlive,
+ mfa = MFA,
status_line = undefined,
- headers = undefined,
- body = undefined},
+ headers = undefined,
+ body = undefined},
case Data of
<<>> ->
- http_transport:setopts(
- socket_type(Session#tcp_session.scheme),
- Session#tcp_session.socket, [{active, once}]),
+ activate_once(Session),
{noreply, NewState};
_ ->
%% If we already received some bytes of
@@ -1140,11 +1220,6 @@ handle_keep_alive_queue(State = #state{status = keep_alive,
end
end.
-call(Msg, Pid, Timeout) ->
- gen_server:call(Pid, Msg, Timeout).
-
-cast(Msg, Pid) ->
- gen_server:cast(Pid, Msg).
case_insensitive_header(Str) when is_list(Str) ->
http_util:to_lower(Str);
@@ -1152,20 +1227,34 @@ case_insensitive_header(Str) when is_list(Str) ->
case_insensitive_header(Str) ->
Str.
-activate_request_timeout(State = #state{request = Request}) ->
- Time = (Request#request.settings)#http_options.timeout,
- case Time of
+activate_once(#tcp_session{scheme = Scheme, socket = Socket}) ->
+ SocketType = socket_type(Scheme),
+ http_transport:setopts(SocketType, Socket, [{active, once}]).
+
+activate_request_timeout(
+ #state{request = #request{timer = undefined} = Request} = State) ->
+ Timeout = (Request#request.settings)#http_options.timeout,
+ case Timeout of
infinity ->
State;
_ ->
- Ref = erlang:send_after(Time, self(),
- {timeout, Request#request.id}),
- State#state
- {timers =
- #timers{request_timers =
- [{Request#request.id, Ref}|
- (State#state.timers)#timers.request_timers]}}
- end.
+ ReqId = Request#request.id,
+ ?hcrt("activate request timer",
+ [{request_id, ReqId},
+ {time_consumed, t() - Request#request.started},
+ {timeout, Timeout}]),
+ Msg = {timeout, ReqId},
+ Ref = erlang:send_after(Timeout, self(), Msg),
+ Request2 = Request#request{timer = Ref},
+ ReqTimers = [{Request#request.id, Ref} |
+ (State#state.timers)#timers.request_timers],
+ Timers = #timers{request_timers = ReqTimers},
+ State#state{request = Request2, timers = Timers}
+ end;
+
+%% Timer is already running! This is the case for a redirect or retry
+activate_request_timeout(State) ->
+ State.
activate_queue_timeout(infinity, State) ->
State;
@@ -1191,12 +1280,12 @@ is_keep_alive_connection(Headers, Session) ->
(not ((Session#tcp_session.client_close) or
httpc_response:is_server_closing(Headers))).
-try_to_enable_pipeline_or_keep_alive(State =
- #state{session = Session,
- request = #request{method = Method},
- status_line = {Version, _, _},
- headers = Headers,
- profile_name = ProfileName}) ->
+try_to_enable_pipeline_or_keep_alive(
+ #state{session = Session,
+ request = #request{method = Method},
+ status_line = {Version, _, _},
+ headers = Headers,
+ profile_name = ProfileName} = State) ->
case (is_keep_alive_enabled_server(Version, Headers) andalso
is_keep_alive_connection(Headers, Session)) of
true ->
@@ -1209,15 +1298,16 @@ try_to_enable_pipeline_or_keep_alive(State =
httpc_manager:insert_session(Session, ProfileName),
%% Make sure type is keep_alive in session
%% as it in this case might be pipeline
- State#state{status = keep_alive,
- session =
- Session#tcp_session{type = keep_alive}}
+ NewSession = Session#tcp_session{type = keep_alive},
+ State#state{status = keep_alive,
+ session = NewSession}
end;
false ->
State#state{status = close}
end.
-answer_request(Request, Msg, #state{timers = Timers} = State) ->
+answer_request(Request, Msg, #state{timers = Timers} = State) ->
+ ?hcrt("answer request", [{request, Request}, {msg, Msg}]),
httpc_response:send(Request#request.from, Msg),
RequestTimers = Timers#timers.request_timers,
TimerRef =
@@ -1253,14 +1343,14 @@ retry_pipeline([Request | PipeLine],
case (catch httpc_manager:retry_request(Request, ProfileName)) of
ok ->
RequestTimers = Timers#timers.request_timers,
+ ReqId = Request#request.id,
TimerRef =
- proplists:get_value(Request#request.id, RequestTimers,
- undefined),
- cancel_timer(TimerRef, {timeout, Request#request.id}),
- State#state{timers = Timers#timers{request_timers =
- lists:delete({Request#request.id,
- TimerRef},
- RequestTimers)}};
+ proplists:get_value(ReqId, RequestTimers, undefined),
+ cancel_timer(TimerRef, {timeout, ReqId}),
+ NewReqsTimers = lists:delete({ReqId, TimerRef}, RequestTimers),
+ NewTimers = Timers#timers{request_timers = NewReqsTimers},
+ State#state{timers = NewTimers};
+
Error ->
answer_request(Request#request.from,
httpc_response:error(Request, Error), State)
@@ -1347,10 +1437,12 @@ socket_type(http) ->
socket_type(https) ->
{ssl, []}. %% Dummy value ok for ex setops that does not use this value
-start_stream({_Version, _Code, _ReasonPhrase}, _Headers, #request{stream = none} = Request) ->
+start_stream({_Version, _Code, _ReasonPhrase}, _Headers,
+ #request{stream = none} = Request) ->
?hcrt("start stream - none", []),
{ok, Request};
-start_stream({_Version, Code, _ReasonPhrase}, Headers, #request{stream = self} = Request)
+start_stream({_Version, Code, _ReasonPhrase}, Headers,
+ #request{stream = self} = Request)
when (Code =:= 200) orelse (Code =:= 206) ->
?hcrt("start stream - self", [{code, Code}]),
Msg = httpc_response:stream_start(Headers, Request, ignore),
@@ -1363,7 +1455,8 @@ start_stream({_Version, Code, _ReasonPhrase}, Headers,
Msg = httpc_response:stream_start(Headers, Request, self()),
httpc_response:send(Request#request.from, Msg),
{ok, Request};
-start_stream({_Version, Code, _ReasonPhrase}, _Headers, #request{stream = Filename} = Request)
+start_stream({_Version, Code, _ReasonPhrase}, _Headers,
+ #request{stream = Filename} = Request)
when ((Code =:= 200) orelse (Code =:= 206)) andalso is_list(Filename) ->
?hcrt("start stream", [{code, Code}, {filename, Filename}]),
case file:open(Filename, [write, raw, append, delayed_write]) of
@@ -1497,3 +1590,21 @@ handle_verbose(_) ->
%% d(_, _, _) ->
%% ok.
+
+call(Msg, Pid) ->
+ Timeout = infinity,
+ call(Msg, Pid, Timeout).
+call(Msg, Pid, Timeout) ->
+ gen_server:call(Pid, Msg, Timeout).
+
+cast(Msg, Pid) ->
+ gen_server:cast(Pid, Msg).
+
+
+%% to(To, Start) when is_integer(Start) andalso (Start >= 0) ->
+%% http_util:timeout(To, Start);
+%% to(To, _Start) ->
+%% http_util:timeout(To, t()).
+
+t() ->
+ http_util:timestamp().