diff options
author | Micael Karlberg <[email protected]> | 2012-01-23 14:10:54 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2012-01-23 14:10:54 +0100 |
commit | d01767b8145982659bb24cd101ec097a9566e4b4 (patch) | |
tree | 72000239d903e3da2ac1bbb96127e18dd36777fd /lib/inets/src/http_client | |
parent | 87f506ebc6d0caca687b5cd67a844db2a8f8f7b1 (diff) | |
parent | 7e6ac31a0c918f42e747628db8fec610f5bd424c (diff) | |
download | otp-d01767b8145982659bb24cd101ec097a9566e4b4.tar.gz otp-d01767b8145982659bb24cd101ec097a9566e4b4.tar.bz2 otp-d01767b8145982659bb24cd101ec097a9566e4b4.zip |
Merge branch 'maint'
Diffstat (limited to 'lib/inets/src/http_client')
-rw-r--r-- | lib/inets/src/http_client/httpc_handler.erl | 140 | ||||
-rw-r--r-- | lib/inets/src/http_client/httpc_internal.hrl | 36 | ||||
-rw-r--r-- | lib/inets/src/http_client/httpc_manager.erl | 90 |
3 files changed, 212 insertions, 54 deletions
diff --git a/lib/inets/src/http_client/httpc_handler.erl b/lib/inets/src/http_client/httpc_handler.erl index 587e24cc8d..bfe9b14ef6 100644 --- a/lib/inets/src/http_client/httpc_handler.erl +++ b/lib/inets/src/http_client/httpc_handler.erl @@ -157,12 +157,12 @@ info(Pid) -> %% memory in vain.) %%-------------------------------------------------------------------- %% Request should not be streamed -stream(BodyPart, Request = #request{stream = none}, _) -> +stream(BodyPart, #request{stream = none} = Request, _) -> ?hcrt("stream - none", []), {BodyPart, Request}; %% Stream to caller -stream(BodyPart, Request = #request{stream = Self}, Code) +stream(BodyPart, #request{stream = Self} = Request, Code) when ((Code =:= 200) orelse (Code =:= 206)) andalso ((Self =:= self) orelse (Self =:= {self, once})) -> ?hcrt("stream - self", [{stream, Self}, {code, Code}]), @@ -170,17 +170,10 @@ stream(BodyPart, Request = #request{stream = Self}, Code) {Request#request.id, stream, BodyPart}), {<<>>, Request}; -stream(BodyPart, Request = #request{stream = Self}, 404) - when (Self =:= self) orelse (Self =:= {self, once}) -> - ?hcrt("stream - self with 404", [{stream, Self}]), - httpc_response:send(Request#request.from, - {Request#request.id, stream, BodyPart}), - {<<>>, Request}; - %% Stream to file %% This has been moved to start_stream/3 %% We keep this for backward compatibillity... -stream(BodyPart, Request = #request{stream = Filename}, Code) +stream(BodyPart, #request{stream = Filename} = Request, Code) when ((Code =:= 200) orelse (Code =:= 206)) andalso is_list(Filename) -> ?hcrt("stream - filename", [{stream, Filename}, {code, Code}]), case file:open(Filename, [write, raw, append, delayed_write]) of @@ -192,7 +185,7 @@ stream(BodyPart, Request = #request{stream = Filename}, Code) end; %% Stream to file -stream(BodyPart, Request = #request{stream = Fd}, Code) +stream(BodyPart, #request{stream = Fd} = Request, Code) when ((Code =:= 200) orelse (Code =:= 206)) -> ?hcrt("stream to file", [{stream, Fd}, {code, Code}]), case file:write(Fd, BodyPart) of @@ -295,7 +288,7 @@ handle_call(#request{address = Addr} = Request, _, %% Queue + current queue:len(NewPipeline) + 1, client_close = ClientClose}, - httpc_manager:insert_session(NewSession, ProfileName), + insert_session(NewSession, ProfileName), ?hcrd("session updated", []), {reply, ok, State#state{pipeline = NewPipeline, session = NewSession, @@ -363,7 +356,7 @@ handle_call(#request{address = Addr} = Request, _, %% Queue + current queue:len(NewKeepAlive) + 1, client_close = ClientClose}, - httpc_manager:insert_session(NewSession, ProfileName), + insert_session(NewSession, ProfileName), ?hcrd("session updated", []), {reply, ok, State#state{keep_alive = NewKeepAlive, session = NewSession, @@ -377,7 +370,7 @@ handle_call(#request{address = Addr} = Request, _, NewSession = Session#session{queue_length = 1, client_close = ClientClose}, - httpc_manager:insert_session(NewSession, ProfileName), + insert_session(NewSession, ProfileName), Relaxed = (Request#request.settings)#http_options.relaxed, MFA = {httpc_response, parse, @@ -766,23 +759,52 @@ deliver_answer(Request) -> %% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState} %% Purpose: Convert process state when code is changed %%-------------------------------------------------------------------- -%% code_change(_, #state{request = Request, pipeline = Queue} = State, -%% [{from, '5.0.1'}, {to, '5.0.2'}]) -> -%% Settings = new_http_options(Request#request.settings), -%% NewRequest = Request#request{settings = Settings}, -%% NewQueue = new_queue(Queue, fun new_http_options/1), -%% {ok, State#state{request = NewRequest, pipeline = NewQueue}}; - -%% code_change(_, #state{request = Request, pipeline = Queue} = State, -%% [{from, '5.0.2'}, {to, '5.0.1'}]) -> -%% Settings = old_http_options(Request#request.settings), -%% NewRequest = Request#request{settings = Settings}, -%% NewQueue = new_queue(Queue, fun old_http_options/1), -%% {ok, State#state{request = NewRequest, pipeline = NewQueue}}; + +code_change(_, + #state{session = OldSession, + profile_name = ProfileName} = State, + upgrade_from_pre_5_8_1) -> + case OldSession of + {session, + Id, ClientClose, Scheme, Socket, SocketType, QueueLen, Type} -> + NewSession = #session{id = Id, + client_close = ClientClose, + scheme = Scheme, + socket = Socket, + socket_type = SocketType, + queue_length = QueueLen, + type = Type}, + insert_session(NewSession, ProfileName), + {ok, State#state{session = NewSession}}; + _ -> + {ok, State} + end; + +code_change(_, + #state{session = OldSession, + profile_name = ProfileName} = State, + downgrade_to_pre_5_8_1) -> + case OldSession of + #session{id = Id, + client_close = ClientClose, + scheme = Scheme, + socket = Socket, + socket_type = SocketType, + queue_length = QueueLen, + type = Type} -> + NewSession = {session, + Id, ClientClose, Scheme, Socket, SocketType, + QueueLen, Type}, + insert_session(NewSession, ProfileName), + {ok, State#state{session = NewSession}}; + _ -> + {ok, State} + end; code_change(_, State, _) -> {ok, State}. + %% new_http_options({http_options, TimeOut, AutoRedirect, SslOpts, %% Auth, Relaxed}) -> %% {http_options, "HTTP/1.1", TimeOut, AutoRedirect, SslOpts, @@ -1181,7 +1203,7 @@ handle_pipeline(#state{status = pipeline, case queue:out(State#state.pipeline) of {empty, _} -> - ?hcrd("epmty pipeline queue", []), + ?hcrd("pipeline queue empty", []), %% The server may choose too teminate an idle pipeline %% in this case we want to receive the close message @@ -1191,9 +1213,8 @@ handle_pipeline(#state{status = pipeline, %% If a pipeline that has been idle for some time is not %% closed by the server, the client may want to close it. - NewState = activate_queue_timeout(TimeOut, State), - NewSession = Session#session{queue_length = 0}, - httpc_manager:insert_session(NewSession, ProfileName), + NewState = activate_queue_timeout(TimeOut, State), + update_session(ProfileName, Session, #session.queue_length, 0), %% Note mfa will be initilized when a new request %% arrives. {noreply, @@ -1203,6 +1224,7 @@ handle_pipeline(#state{status = pipeline, headers = undefined, body = undefined}}; {{value, NextRequest}, Pipeline} -> + ?hcrd("pipeline queue non-empty", []), case lists:member(NextRequest#request.id, State#state.canceled) of true -> @@ -1218,7 +1240,7 @@ handle_pipeline(#state{status = pipeline, Session#session{queue_length = %% Queue + current queue:len(Pipeline) + 1}, - httpc_manager:insert_session(NewSession, ProfileName), + insert_session(NewSession, ProfileName), Relaxed = (NextRequest#request.settings)#http_options.relaxed, MFA = {httpc_response, @@ -1257,7 +1279,7 @@ handle_keep_alive_queue( case queue:out(State#state.keep_alive) of {empty, _} -> - ?hcrd("empty keep_alive queue", []), + ?hcrd("keep_alive queue empty", []), %% The server may choose too terminate an idle keep_alive session %% in this case we want to receive the close message %% at once and not when trying to send the next @@ -1266,8 +1288,7 @@ handle_keep_alive_queue( %% If a keep_alive session has been idle for some time is not %% closed by the server, the client may want to close it. NewState = activate_queue_timeout(TimeOut, State), - NewSession = Session#session{queue_length = 0}, - httpc_manager:insert_session(NewSession, ProfileName), + update_session(ProfileName, Session, #session.queue_length, 0), %% Note mfa will be initilized when a new request %% arrives. {noreply, @@ -1279,6 +1300,7 @@ handle_keep_alive_queue( } }; {{value, NextRequest}, KeepAlive} -> + ?hcrd("keep_alive queue non-empty", []), case lists:member(NextRequest#request.id, State#state.canceled) of true -> @@ -1388,10 +1410,10 @@ try_to_enable_pipeline_or_keep_alive( case (is_pipeline_enabled_client(Session) andalso httpc_request:is_idempotent(Method)) of true -> - httpc_manager:insert_session(Session, ProfileName), + insert_session(Session, ProfileName), State#state{status = pipeline}; false -> - httpc_manager:insert_session(Session, ProfileName), + insert_session(Session, ProfileName), %% Make sure type is keep_alive in session %% as it in this case might be pipeline NewSession = Session#session{type = keep_alive}, @@ -1403,7 +1425,9 @@ try_to_enable_pipeline_or_keep_alive( end. answer_request(#request{id = RequestId, from = From} = Request, Msg, - #state{timers = Timers, profile_name = ProfileName} = State) -> + #state{session = Session, + timers = Timers, + profile_name = ProfileName} = State) -> ?hcrt("answer request", [{request, Request}, {msg, Msg}]), httpc_response:send(From, Msg), RequestTimers = Timers#timers.request_timers, @@ -1412,12 +1436,20 @@ answer_request(#request{id = RequestId, from = From} = Request, Msg, Timer = {RequestId, TimerRef}, cancel_timer(TimerRef, {timeout, Request#request.id}), httpc_manager:request_done(RequestId, ProfileName), - + NewSession = maybe_make_session_available(ProfileName, Session), + Timers2 = Timers#timers{request_timers = lists:delete(Timer, + RequestTimers)}, State#state{request = Request#request{from = answer_sent}, - timers = - Timers#timers{request_timers = - lists:delete(Timer, RequestTimers)}}. - + session = NewSession, + timers = Timers2}. + +maybe_make_session_available(ProfileName, + #session{available = false} = Session) -> + update_session(ProfileName, Session, #session.available, true), + Session#session{available = true}; +maybe_make_session_available(_ProfileName, Session) -> + Session. + cancel_timers(#timers{request_timers = ReqTmrs, queue_timer = QTmr}) -> cancel_timer(QTmr, timeout_queue), CancelTimer = fun({_, Timer}) -> cancel_timer(Timer, timeout) end, @@ -1656,6 +1688,28 @@ send_raw(SocketType, Socket, ProcessBody, Acc) -> end. +%% --------------------------------------------------------------------- +%% Session wrappers +%% --------------------------------------------------------------------- + +insert_session(Session, ProfileName) -> + httpc_manager:insert_session(Session, ProfileName). + + +update_session(ProfileName, #session{id = SessionId} = Session, Pos, Value) -> + try + begin + httpc_manager:update_session(ProfileName, SessionId, Pos, Value) + end + catch + error:undef -> % This could happen during code upgrade + Session2 = erlang:setelement(Pos, Session, Value), + insert_session(Session2, ProfileName) + end. + + +%% --------------------------------------------------------------------- + call(Msg, Pid) -> Timeout = infinity, call(Msg, Pid, Timeout). diff --git a/lib/inets/src/http_client/httpc_internal.hrl b/lib/inets/src/http_client/httpc_internal.hrl index 1fbbaa8d13..8af752546c 100644 --- a/lib/inets/src/http_client/httpc_internal.hrl +++ b/lib/inets/src/http_client/httpc_internal.hrl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2005-2011. All Rights Reserved. +%% Copyright Ericsson AB 2005-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -115,17 +115,37 @@ } ). + -record(session, { - id, % {{Host, Port}, HandlerPid} - client_close, % true | false - scheme, % http (HTTP/TCP) | https (HTTP/SSL/TCP) - socket, % Open socket, used by connection - socket_type, % socket-type, used by connection - queue_length = 1, % Current length of pipeline or keep-alive queue - type % pipeline | keep_alive (wait for response before sending new request) + %% {{Host, Port}, HandlerPid} + id, + + %% true | false + client_close, + + %% http (HTTP/TCP) | https (HTTP/SSL/TCP) + scheme, + + %% Open socket, used by connection + socket, + + %% socket-type, used by connection + socket_type, + + %% Current length of pipeline or keep-alive queue + queue_length = 1, + + %% pipeline | keep_alive (wait for response before sending new request) + type, + + %% true | false + %% This will be true, when a response has been received for + %% the first request. See type above. + available = false }). + -record(http_cookie, { domain, diff --git a/lib/inets/src/http_client/httpc_manager.erl b/lib/inets/src/http_client/httpc_manager.erl index a97cbb83f1..453081da21 100644 --- a/lib/inets/src/http_client/httpc_manager.erl +++ b/lib/inets/src/http_client/httpc_manager.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2002-2011. All Rights Reserved. +%% Copyright Ericsson AB 2002-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -34,6 +34,7 @@ retry_request/2, redirect_request/2, insert_session/2, + update_session/4, delete_session/2, set_options/2, store_cookies/3, @@ -193,6 +194,27 @@ insert_session(Session, ProfileName) -> %%-------------------------------------------------------------------- +%% Function: update_session(ProfileName, SessionId, Pos, Value) -> _ +%% Session - #session{} +%% ProfileName - atom() +%% +%% Description: Update, only one field (Pos) of the session record +%% identified by the SessionId, the session information +%% of the httpc manager table <ProfileName>_session_db. +%% Intended to be called by the httpc request handler process. +%%-------------------------------------------------------------------- + +update_session(ProfileName, SessionId, Pos, Value) -> + SessionDbName = session_db_name(ProfileName), + ?hcrt("update session", + [{id, SessionId}, + {pos, Pos}, + {value, Value}, + {profile, ProfileName}]), + ets:update_element(SessionDbName, SessionId, {Pos, Value}). + + +%%-------------------------------------------------------------------- %% Function: delete_session(SessionId, ProfileName) -> _ %% SessionId - {{Host, Port}, HandlerPid} %% ProfileName - atom() @@ -559,9 +581,70 @@ terminate(_, State) -> %% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState} %% Purpose: Convert process state when code is changed %%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> +code_change(_, + #state{session_db = SessionDB} = State, + upgrade_from_pre_5_8_1) -> + Upgrade = + fun({session, + Id, ClientClose, Scheme, Socket, SocketType, QueueLen, Type}) -> + {ok, #session{id = Id, + client_close = ClientClose, + scheme = Scheme, + socket = Socket, + socket_type = SocketType, + queue_length = QueueLen, + type = Type}}; + (_) -> % Already upgraded (by handler) + ignore + end, + (catch update_session_table(SessionDB, Upgrade)), + {ok, State}; + +code_change(_, + #state{session_db = SessionDB} = State, + downgrade_to_pre_5_8_1) -> + Downgrade = + fun(#session{id = Id, + client_close = ClientClose, + scheme = Scheme, + socket = Socket, + socket_type = SocketType, + queue_length = QueueLen, + type = Type}) -> + {ok, {session, + Id, ClientClose, Scheme, Socket, SocketType, + QueueLen, Type}}; + (_) -> % Already downgraded (by handler) + ignore + end, + (catch update_session_table(SessionDB, Downgrade)), + {ok, State}; + +code_change(_, State, _) -> {ok, State}. +%% This function is to catch everything that calls through the cracks... +update_session_table(SessionDB, Transform) -> + ets:safe_fixtable(SessionDB, true), + update_session_table(SessionDB, ets:first(SessionDB), Transform), + ets:safe_fixtable(SessionDB, false). + +update_session_table(_SessionDB, '$end_of_table', _Transform) -> + ok; +update_session_table(SessionDB, Key, Transform) -> + case ets:lookup(SessionDB, Key) of + [OldSession] -> + case Transform(OldSession) of + {ok, NewSession} -> + ets:insert(SessionDB, NewSession); + ignore -> + ok + end; + _ -> + ok + end, + update_session_table(SessionDB, ets:next(SessionDB, Key), Transform). + %%-------------------------------------------------------------------- %% Internal functions @@ -690,6 +773,7 @@ select_session(Method, HostPort, Scheme, SessionType, scheme = Scheme, queue_length = '$2', type = SessionType, + available = true, _ = '_'}, %% {'_', {HostPort, '$1'}, false, Scheme, '_', '$2', SessionTyp}, Candidates = ets:match(SessionDb, Pattern), @@ -727,7 +811,7 @@ pipeline_or_keep_alive(Request, HandlerPid, State) -> ets:insert(State#state.handler_db, {Request#request.id, HandlerPid, Request#request.from}); - _ -> %timeout pipelining failed + _ -> % timeout pipelining failed start_handler(Request, State) end. |