diff options
Diffstat (limited to 'lib/inets/src/http_client/httpc_handler.erl')
-rw-r--r-- | lib/inets/src/http_client/httpc_handler.erl | 672 |
1 files changed, 240 insertions, 432 deletions
diff --git a/lib/inets/src/http_client/httpc_handler.erl b/lib/inets/src/http_client/httpc_handler.erl index bb500dbb46..c99200777b 100644 --- a/lib/inets/src/http_client/httpc_handler.erl +++ b/lib/inets/src/http_client/httpc_handler.erl @@ -32,7 +32,6 @@ %% Internal Application API -export([ start_link/4, - %% connect_and_send/2, send/2, cancel/2, stream_next/1, @@ -165,14 +164,12 @@ info(Pid) -> %%-------------------------------------------------------------------- %% Request should not be streamed stream(BodyPart, #request{stream = none} = Request, _) -> - ?hcrt("stream - none", []), {false, BodyPart, Request}; %% Stream to caller stream(BodyPart, #request{stream = Self} = Request, Code) when ?IS_STREAMED(Code) andalso ((Self =:= self) orelse (Self =:= {self, once})) -> - ?hcrt("stream - self", [{stream, Self}, {code, Code}]), httpc_response:send(Request#request.from, {Request#request.id, stream, BodyPart}), {true, <<>>, Request}; @@ -182,10 +179,8 @@ stream(BodyPart, #request{stream = Self} = Request, Code) %% We keep this for backward compatibillity... stream(BodyPart, #request{stream = Filename} = Request, Code) when ?IS_STREAMED(Code) andalso is_list(Filename) -> - ?hcrt("stream - filename", [{stream, Filename}, {code, Code}]), case file:open(Filename, [write, raw, append, delayed_write]) of {ok, Fd} -> - ?hcrt("stream - file open ok", [{fd, Fd}]), stream(BodyPart, Request#request{stream = Fd}, 200); {error, Reason} -> exit({stream_to_file_failed, Reason}) @@ -194,7 +189,6 @@ stream(BodyPart, #request{stream = Filename} = Request, Code) %% Stream to file stream(BodyPart, #request{stream = Fd} = Request, Code) when ?IS_STREAMED(Code) -> - ?hcrt("stream to file", [{stream, Fd}, {code, Code}]), case file:write(Fd, BodyPart) of ok -> {true, <<>>, Request}; @@ -203,7 +197,6 @@ stream(BodyPart, #request{stream = Fd} = Request, Code) end; stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed - ?hcrt("stream - ignore", [{request, Request}]), {false, BodyPart, Request}. @@ -257,22 +250,148 @@ init([Parent, Request, Options, ProfileName]) -> %% {stop, Reason, State} (terminate/2 is called) %% Description: Handling call messages %%-------------------------------------------------------------------- -handle_call(#request{address = Addr} = Request, _, +handle_call(Request, From, State) -> + try do_handle_call(Request, From, State) of + Result -> + Result + catch + _:Reason -> + {stop, {shutdown, Reason} , State} + end. + + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(Msg, State) -> + try do_handle_cast(Msg, State) of + Result -> + Result + catch + _:Reason -> + {stop, {shutdown, Reason} , State} + end. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(Info, State) -> + try do_handle_info(Info, State) of + Result -> + Result + catch + _:Reason -> + {stop, {shutdown, Reason} , State} + end. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> _ (ignored by gen_server) +%% Description: Shutdown the httpc_handler +%%-------------------------------------------------------------------- + +%% Init error there is no socket to be closed. +terminate(normal, + #state{request = Request, + session = {send_failed, _} = Reason} = State) -> + maybe_send_answer(Request, + httpc_response:error(Request, Reason), + State), + ok; + +terminate(normal, + #state{request = Request, + session = {connect_failed, _} = Reason} = State) -> + maybe_send_answer(Request, + httpc_response:error(Request, Reason), + State), + ok; + +terminate(normal, #state{session = undefined}) -> + ok; + +%% Init error sending, no session information has been setup but +%% there is a socket that needs closing. +terminate(normal, + #state{session = #session{id = undefined} = Session}) -> + close_socket(Session); + +%% Socket closed remotely +terminate(normal, + #state{session = #session{socket = {remote_close, Socket}, + socket_type = SocketType, + id = Id}, + profile_name = ProfileName, + request = Request, + timers = Timers, + pipeline = Pipeline, + keep_alive = KeepAlive} = State) -> + %% Clobber session + (catch httpc_manager:delete_session(Id, ProfileName)), + + maybe_retry_queue(Pipeline, State), + maybe_retry_queue(KeepAlive, State), + + %% Cancel timers + cancel_timers(Timers), + + %% Maybe deliver answers to requests + deliver_answer(Request), + + %% And, just in case, close our side (**really** overkill) + http_transport:close(SocketType, Socket); + +terminate(_Reason, #state{session = #session{id = Id, + socket = Socket, + socket_type = SocketType}, + request = undefined, + profile_name = ProfileName, + timers = Timers, + pipeline = Pipeline, + keep_alive = KeepAlive} = State) -> + + %% Clobber session + (catch httpc_manager:delete_session(Id, ProfileName)), + + maybe_retry_queue(Pipeline, State), + maybe_retry_queue(KeepAlive, State), + + cancel_timer(Timers#timers.queue_timer, timeout_queue), + http_transport:close(SocketType, Socket); + +terminate(_Reason, #state{request = undefined}) -> + ok; + +terminate(Reason, #state{request = Request} = State) -> + NewState = maybe_send_answer(Request, + httpc_response:error(Request, Reason), + State), + terminate(Reason, NewState#state{request = undefined}). + +%%-------------------------------------------------------------------- +%% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState} +%% Purpose: Convert process state when code is changed +%%-------------------------------------------------------------------- + +code_change(_, State, _) -> + {ok, State}. + +%%%-------------------------------------------------------------------- +%%% Internal functions +%%%-------------------------------------------------------------------- +do_handle_call(#request{address = Addr} = Request, _, #state{status = Status, session = #session{type = pipeline} = Session, timers = Timers, options = #options{proxy = Proxy} = _Options, profile_name = ProfileName} = State0) when Status =/= undefined -> - - ?hcrv("new request on a pipeline session", - [{request, Request}, - {profile, ProfileName}, - {status, Status}, - {timers, Timers}]), - Address = handle_proxy(Addr, Proxy), - case httpc_request:send(Address, Session, Request) of ok -> @@ -287,9 +406,8 @@ handle_call(#request{address = Addr} = Request, _, case State0#state.request of #request{} = OldRequest -> %% Old request not yet finished - ?hcrd("old request still not finished", []), %% Make sure to use the new value of timers in state - NewTimers = State1#state.timers, + NewTimers = State1#state.timers, NewPipeline = queue:in(Request, State1#state.pipeline), NewSession = Session#session{queue_length = @@ -297,7 +415,6 @@ handle_call(#request{address = Addr} = Request, _, queue:len(NewPipeline) + 1, client_close = ClientClose}, insert_session(NewSession, ProfileName), - ?hcrd("session updated", []), {reply, ok, State1#state{ request = OldRequest, pipeline = NewPipeline, @@ -306,7 +423,6 @@ handle_call(#request{address = Addr} = Request, _, undefined -> %% Note: tcp-message receiving has already been %% activated by handle_pipeline/2. - ?hcrd("no current request", []), cancel_timer(Timers#timers.queue_timer, timeout_queue), NewSession = @@ -314,18 +430,16 @@ handle_call(#request{address = Addr} = Request, _, client_close = ClientClose}, httpc_manager:insert_session(NewSession, ProfileName), NewTimers = Timers#timers{queue_timer = undefined}, - ?hcrd("session created", []), State = init_wait_for_response_state(Request, State1#state{session = NewSession, timers = NewTimers}), {reply, ok, State} end; {error, Reason} -> - ?hcri("failed sending request", [{reason, Reason}]), NewPipeline = queue:in(Request, State0#state.pipeline), - {stop, shutdown, {pipeline_failed, Reason}, State0#state{pipeline = NewPipeline}} + {stop, {shutdown, {pipeline_failed, Reason}}, State0#state{pipeline = NewPipeline}} end; -handle_call(#request{address = Addr} = Request, _, +do_handle_call(#request{address = Addr} = Request, _, #state{status = Status, session = #session{type = keep_alive} = Session, timers = Timers, @@ -333,17 +447,11 @@ handle_call(#request{address = Addr} = Request, _, profile_name = ProfileName} = State0) when Status =/= undefined -> - ?hcrv("new request on a keep-alive session", - [{request, Request}, - {profile, ProfileName}, - {status, Status}]), - ClientClose = httpc_request:is_client_closing(Request#request.headers), case State0#state.request of #request{} -> %% Old request not yet finished %% Make sure to use the new value of timers in state - ?hcrd("old request still not finished", []), NewKeepAlive = queue:in(Request, State0#state.keep_alive), NewSession = Session#session{queue_length = @@ -351,13 +459,11 @@ handle_call(#request{address = Addr} = Request, _, queue:len(NewKeepAlive) + 1, client_close = ClientClose}, insert_session(NewSession, ProfileName), - ?hcrd("session updated", []), {reply, ok, State0#state{keep_alive = NewKeepAlive, session = NewSession}}; undefined -> %% Note: tcp-message receiving has already been %% activated by handle_pipeline/2. - ?hcrd("no current request", []), cancel_timer(Timers#timers.queue_timer, timeout_queue), NewTimers = Timers#timers{queue_timer = undefined}, @@ -365,8 +471,6 @@ handle_call(#request{address = Addr} = Request, _, Address = handle_proxy(Addr, Proxy), case httpc_request:send(Address, Session, Request) of ok -> - ?hcrd("request sent", []), - %% Activate the request time out for the new request State2 = activate_request_timeout(State1#state{request = Request}), @@ -377,22 +481,13 @@ handle_call(#request{address = Addr} = Request, _, State = init_wait_for_response_state(Request, State2#state{session = NewSession}), {reply, ok, State}; {error, Reason} -> - ?hcri("failed sending request", [{reason, Reason}]), - {stop, shutdown, {keepalive_failed, Reason}, State1} + {stop, {shutdown, {keepalive_failed, Reason}}, State1} end end; - -handle_call(info, _, State) -> +do_handle_call(info, _, State) -> Info = handler_info(State), {reply, Info, State}. -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} (terminate/2 is called) -%% Description: Handling cast messages -%%-------------------------------------------------------------------- - %% When the request in process has been canceled the handler process is %% stopped and the pipelined requests will be reissued or remaining %% requests will be sent on a new connection. This is is @@ -405,145 +500,102 @@ handle_call(info, _, State) -> %% handle_keep_alive_queue/2 on the other hand will just skip the %% request as if it was never issued as in this case the request will %% not have been sent. -handle_cast({cancel, RequestId}, +do_handle_cast({cancel, RequestId}, #state{request = #request{id = RequestId} = Request, - profile_name = ProfileName, canceled = Canceled} = State) -> - ?hcrv("cancel current request", [{request_id, RequestId}, - {profile, ProfileName}, - {canceled, Canceled}]), {stop, normal, State#state{canceled = [RequestId | Canceled], request = Request#request{from = answer_sent}}}; -handle_cast({cancel, RequestId}, - #state{profile_name = ProfileName, - request = #request{id = CurrId}, - canceled = Canceled} = State) -> - ?hcrv("cancel", [{request_id, RequestId}, - {curr_req_id, CurrId}, - {profile, ProfileName}, - {canceled, Canceled}]), +do_handle_cast({cancel, RequestId}, + #state{request = #request{}, + canceled = Canceled} = State) -> {noreply, State#state{canceled = [RequestId | Canceled]}}; -handle_cast({cancel, RequestId}, - #state{profile_name = ProfileName, - request = undefined, - canceled = Canceled} = State) -> - ?hcrv("cancel", [{request_id, RequestId}, - {curr_req_id, undefined}, - {profile, ProfileName}, - {canceled, Canceled}]), +do_handle_cast({cancel, _}, + #state{request = undefined} = State) -> {noreply, State}; - -handle_cast(stream_next, #state{session = Session} = State) -> +do_handle_cast(stream_next, #state{session = Session} = State) -> activate_once(Session), %% Inactivate the #state.once here because we don't want %% next_body_chunk/1 to activate the socket twice. {noreply, State#state{once = inactive}}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} (terminate/2 is called) -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info({Proto, _Socket, Data}, +do_handle_info({Proto, _Socket, Data}, #state{mfa = {Module, Function, Args}, - request = #request{method = Method, - stream = Stream} = Request, + request = #request{method = Method} = Request, session = Session, status_line = StatusLine} = State) when (Proto =:= tcp) orelse (Proto =:= ssl) orelse (Proto =:= httpc_handler) -> - ?hcri("received data", [{proto, Proto}, - {module, Module}, - {function, Function}, - {method, Method}, - {stream, Stream}, - {session, Session}, - {status_line, StatusLine}]), - - FinalResult = - try Module:Function([Data | Args]) of - {ok, Result} -> - ?hcrd("data processed - ok", []), - handle_http_msg(Result, State); - {_, whole_body, _} when Method =:= head -> - ?hcrd("data processed - whole body", []), - handle_response(State#state{body = <<>>}); - {Module, whole_body, [Body, Length]} -> - ?hcrd("data processed - whole body", [{length, Length}]), - {_, Code, _} = StatusLine, - {Streamed, NewBody, NewRequest} = stream(Body, Request, Code), - %% When we stream we will not keep the already - %% streamed data, that would be a waste of memory. - NewLength = - case Streamed of - false -> - Length; - true -> - Length - size(Body) - end, - - NewState = next_body_chunk(State, Code), - NewMFA = {Module, whole_body, [NewBody, NewLength]}, - {noreply, NewState#state{mfa = NewMFA, - request = NewRequest}}; - {Module, decode_size, + try Module:Function([Data | Args]) of + {ok, Result} -> + handle_http_msg(Result, State); + {_, whole_body, _} when Method =:= head -> + handle_response(State#state{body = <<>>}); + {Module, whole_body, [Body, Length]} -> + {_, Code, _} = StatusLine, + {Streamed, NewBody, NewRequest} = stream(Body, Request, Code), + %% When we stream we will not keep the already + %% streamed data, that would be a waste of memory. + NewLength = + case Streamed of + false -> + Length; + true -> + Length - size(Body) + end, + + NewState = next_body_chunk(State, Code), + NewMFA = {Module, whole_body, [NewBody, NewLength]}, + {noreply, NewState#state{mfa = NewMFA, + request = NewRequest}}; + {Module, decode_size, [TotalChunk, HexList, AccHeaderSize, {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]} - when BodySoFar =/= <<>> -> - ?hcrd("data processed - decode_size", []), - %% The response body is chunk-encoded. Steal decoded - %% chunks as much as possible to stream. - {_, Code, _} = StatusLine, - {_, NewBody, NewRequest} = stream(BodySoFar, Request, Code), - NewState = next_body_chunk(State, Code), - NewMFA = {Module, decode_size, - [TotalChunk, HexList, AccHeaderSize, + when BodySoFar =/= <<>> -> + %% The response body is chunk-encoded. Steal decoded + %% chunks as much as possible to stream. + {_, Code, _} = StatusLine, + {_, NewBody, NewRequest} = stream(BodySoFar, Request, Code), + NewState = next_body_chunk(State, Code), + NewMFA = {Module, decode_size, + [TotalChunk, HexList, AccHeaderSize, {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]}, + {noreply, NewState#state{mfa = NewMFA, + request = NewRequest}}; + {Module, decode_data, + [ChunkSize, TotalChunk, + {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]} + when TotalChunk =/= <<>> orelse BodySoFar =/= <<>> -> + %% The response body is chunk-encoded. Steal decoded + %% chunks as much as possible to stream. + ChunkSizeToSteal = min(ChunkSize, byte_size(TotalChunk)), + <<StolenChunk:ChunkSizeToSteal/binary, NewTotalChunk/binary>> = TotalChunk, + StolenBody = <<BodySoFar/binary, StolenChunk/binary>>, + NewChunkSize = ChunkSize - ChunkSizeToSteal, + {_, Code, _} = StatusLine, + + {_, NewBody, NewRequest} = stream(StolenBody, Request, Code), + NewState = next_body_chunk(State, Code), + NewMFA = {Module, decode_data, + [NewChunkSize, NewTotalChunk, + {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]}, {noreply, NewState#state{mfa = NewMFA, request = NewRequest}}; - {Module, decode_data, - [ChunkSize, TotalChunk, - {MaxBodySize, BodySoFar, AccLength, MaxHeaderSize}]} - when TotalChunk =/= <<>> orelse BodySoFar =/= <<>> -> - ?hcrd("data processed - decode_data", []), - %% The response body is chunk-encoded. Steal decoded - %% chunks as much as possible to stream. - ChunkSizeToSteal = min(ChunkSize, byte_size(TotalChunk)), - <<StolenChunk:ChunkSizeToSteal/binary, NewTotalChunk/binary>> = TotalChunk, - StolenBody = <<BodySoFar/binary, StolenChunk/binary>>, - NewChunkSize = ChunkSize - ChunkSizeToSteal, - {_, Code, _} = StatusLine, - - {_, NewBody, NewRequest} = stream(StolenBody, Request, Code), - NewState = next_body_chunk(State, Code), - NewMFA = {Module, decode_data, - [NewChunkSize, NewTotalChunk, - {MaxBodySize, NewBody, AccLength, MaxHeaderSize}]}, - {noreply, NewState#state{mfa = NewMFA, - request = NewRequest}}; - NewMFA -> - ?hcrd("data processed - new mfa", []), - activate_once(Session), - {noreply, State#state{mfa = NewMFA}} - catch - _:_Reason -> - ?hcrd("data processing exit", [{exit, _Reason}]), - ClientReason = {could_not_parse_as_http, Data}, - ClientErrMsg = httpc_response:error(Request, ClientReason), - NewState = answer_request(Request, ClientErrMsg, State), - {stop, normal, NewState} - end, - ?hcri("data processed", [{final_result, FinalResult}]), - FinalResult; - + NewMFA -> + activate_once(Session), + {noreply, State#state{mfa = NewMFA}} + catch + _:Reason -> + ClientReason = {could_not_parse_as_http, Data}, + ClientErrMsg = httpc_response:error(Request, ClientReason), + NewState = answer_request(Request, ClientErrMsg, State), + {stop, {shutdown, Reason}, NewState} + end; -handle_info({Proto, Socket, Data}, +do_handle_info({Proto, Socket, Data}, #state{mfa = MFA, request = Request, session = Session, @@ -568,200 +620,107 @@ handle_info({Proto, Socket, Data}, {noreply, State}; - %% The Server may close the connection to indicate that the %% whole body is now sent instead of sending an length %% indicator. -handle_info({tcp_closed, _}, State = #state{mfa = {_, whole_body, Args}}) -> +do_handle_info({tcp_closed, _}, State = #state{mfa = {_, whole_body, Args}}) -> handle_response(State#state{body = hd(Args)}); -handle_info({ssl_closed, _}, State = #state{mfa = {_, whole_body, Args}}) -> +do_handle_info({ssl_closed, _}, State = #state{mfa = {_, whole_body, Args}}) -> handle_response(State#state{body = hd(Args)}); %%% Server closes idle pipeline -handle_info({tcp_closed, _}, State = #state{request = undefined}) -> +do_handle_info({tcp_closed, _}, State = #state{request = undefined}) -> {stop, normal, State}; -handle_info({ssl_closed, _}, State = #state{request = undefined}) -> +do_handle_info({ssl_closed, _}, State = #state{request = undefined}) -> {stop, normal, State}; %%% Error cases -handle_info({tcp_closed, _}, #state{session = Session0} = State) -> +do_handle_info({tcp_closed, _}, #state{session = Session0} = State) -> Socket = Session0#session.socket, Session = Session0#session{socket = {remote_close, Socket}}, %% {stop, session_remotly_closed, State}; {stop, normal, State#state{session = Session}}; -handle_info({ssl_closed, _}, #state{session = Session0} = State) -> +do_handle_info({ssl_closed, _}, #state{session = Session0} = State) -> Socket = Session0#session.socket, Session = Session0#session{socket = {remote_close, Socket}}, %% {stop, session_remotly_closed, State}; {stop, normal, State#state{session = Session}}; -handle_info({tcp_error, _, _} = Reason, State) -> +do_handle_info({tcp_error, _, _} = Reason, State) -> {stop, Reason, State}; -handle_info({ssl_error, _, _} = Reason, State) -> +do_handle_info({ssl_error, _, _} = Reason, State) -> {stop, Reason, State}; %% Timeouts %% Internally, to a request handling process, a request timeout is %% seen as a canceled request. -handle_info({timeout, RequestId}, +do_handle_info({timeout, RequestId}, #state{request = #request{id = RequestId} = Request, canceled = Canceled, profile_name = ProfileName} = State) -> - ?hcri("timeout of current request", [{id, RequestId}]), httpc_response:send(Request#request.from, httpc_response:error(Request, timeout)), httpc_manager:request_done(RequestId, ProfileName), - ?hcrv("response (timeout) sent - now terminate", []), {stop, normal, State#state{request = Request#request{from = answer_sent}, canceled = [RequestId | Canceled]}}; -handle_info({timeout, RequestId}, +do_handle_info({timeout, RequestId}, #state{canceled = Canceled, profile_name = ProfileName} = State) -> - ?hcri("timeout", [{id, RequestId}]), Filter = fun(#request{id = Id, from = From} = Request) when Id =:= RequestId -> - ?hcrv("found request", [{id, Id}, {from, From}]), %% Notify the owner httpc_response:send(From, httpc_response:error(Request, timeout)), httpc_manager:request_done(RequestId, ProfileName), - ?hcrv("response (timeout) sent", []), [Request#request{from = answer_sent}]; (_) -> true end, case State#state.status of pipeline -> - ?hcrd("pipeline", []), Pipeline = queue:filter(Filter, State#state.pipeline), {noreply, State#state{canceled = [RequestId | Canceled], pipeline = Pipeline}}; keep_alive -> - ?hcrd("keep_alive", []), KeepAlive = queue:filter(Filter, State#state.keep_alive), {noreply, State#state{canceled = [RequestId | Canceled], keep_alive = KeepAlive}} end; -handle_info(timeout_queue, State = #state{request = undefined}) -> +do_handle_info(timeout_queue, State = #state{request = undefined}) -> {stop, normal, State}; %% Timing was such as the queue_timeout was not canceled! -handle_info(timeout_queue, #state{timers = Timers} = State) -> +do_handle_info(timeout_queue, #state{timers = Timers} = State) -> {noreply, State#state{timers = Timers#timers{queue_timer = undefined}}}; %% Setting up the connection to the server somehow failed. -handle_info({init_error, Tag, ClientErrMsg}, +do_handle_info({init_error, Reason, ClientErrMsg}, State = #state{request = Request}) -> - ?hcrv("init error", [{tag, Tag}, {client_error, ClientErrMsg}]), NewState = answer_request(Request, ClientErrMsg, State), - {stop, normal, NewState}; - + {stop, {shutdown, Reason}, NewState}; %%% httpc_manager process dies. -handle_info({'EXIT', _, _}, State = #state{request = undefined}) -> +do_handle_info({'EXIT', _, _}, State = #state{request = undefined}) -> {stop, normal, State}; %%Try to finish the current request anyway, %% there is a fairly high probability that it can be done successfully. %% Then close the connection, hopefully a new manager is started that %% can retry requests in the pipeline. -handle_info({'EXIT', _, _}, State) -> +do_handle_info({'EXIT', _, _}, State) -> {noreply, State#state{status = close}}. -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> _ (ignored by gen_server) -%% Description: Shutdown the httpc_handler -%%-------------------------------------------------------------------- - -%% Init error there is no socket to be closed. -terminate(normal, - #state{request = Request, - session = {send_failed, AReason} = Reason} = State) -> - ?hcrd("terminate", [{send_reason, AReason}, {request, Request}]), - maybe_send_answer(Request, - httpc_response:error(Request, Reason), - State), - ok; - -terminate(normal, - #state{request = Request, - session = {connect_failed, AReason} = Reason} = State) -> - ?hcrd("terminate", [{connect_reason, AReason}, {request, Request}]), - maybe_send_answer(Request, - httpc_response:error(Request, Reason), - State), - ok; - -terminate(normal, #state{session = undefined}) -> - ok; - -%% Init error sending, no session information has been setup but -%% there is a socket that needs closing. -terminate(normal, - #state{session = #session{id = undefined} = Session}) -> - close_socket(Session); - -%% Socket closed remotely -terminate(normal, - #state{session = #session{socket = {remote_close, Socket}, - socket_type = SocketType, - id = Id}, - profile_name = ProfileName, - request = Request, - timers = Timers, - pipeline = Pipeline, - keep_alive = KeepAlive} = State) -> - ?hcrt("terminate(normal) - remote close", - [{id, Id}, {profile, ProfileName}]), - - %% Clobber session - (catch httpc_manager:delete_session(Id, ProfileName)), - - maybe_retry_queue(Pipeline, State), - maybe_retry_queue(KeepAlive, State), - - %% Cancel timers - cancel_timers(Timers), - - %% Maybe deliver answers to requests - deliver_answer(Request), - - %% And, just in case, close our side (**really** overkill) - http_transport:close(SocketType, Socket); - -terminate(Reason, #state{session = #session{id = Id, - socket = Socket, - socket_type = SocketType}, - request = undefined, - profile_name = ProfileName, - timers = Timers, - pipeline = Pipeline, - keep_alive = KeepAlive} = State) -> - ?hcrt("terminate", - [{id, Id}, {profile, ProfileName}, {reason, Reason}]), - - %% Clobber session - (catch httpc_manager:delete_session(Id, ProfileName)), - - maybe_retry_queue(Pipeline, State), - maybe_retry_queue(KeepAlive, State), - - cancel_timer(Timers#timers.queue_timer, timeout_queue), - http_transport:close(SocketType, Socket); +call(Msg, Pid) -> + call(Msg, Pid, infinity). -terminate(Reason, #state{request = undefined}) -> - ?hcrt("terminate", [{reason, Reason}]), - ok; +call(Msg, Pid, Timeout) -> + gen_server:call(Pid, Msg, Timeout). -terminate(Reason, #state{request = Request} = State) -> - ?hcrd("terminate", [{reason, Reason}, {request, Request}]), - NewState = maybe_send_answer(Request, - httpc_response:error(Request, Reason), - State), - terminate(Reason, NewState#state{request = undefined}). +cast(Msg, Pid) -> + gen_server:cast(Pid, Msg). maybe_retry_queue(Q, State) -> case queue:is_empty(Q) of @@ -776,45 +735,13 @@ maybe_send_answer(#request{from = answer_sent}, _Reason, State) -> maybe_send_answer(Request, Answer, State) -> answer_request(Request, Answer, State). -deliver_answer(#request{id = Id, from = From} = Request) +deliver_answer(#request{from = From} = Request) when is_pid(From) -> Response = httpc_response:error(Request, socket_closed_remotely), - ?hcrd("deliver answer", [{id, Id}, {from, From}, {response, Response}]), httpc_response:send(From, Response); -deliver_answer(Request) -> - ?hcrd("skip deliver answer", [{request, Request}]), +deliver_answer(_Request) -> ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState} -%% Purpose: Convert process state when code is changed -%%-------------------------------------------------------------------- - -code_change(_, State, _) -> - {ok, State}. - - -%% new_http_options({http_options, TimeOut, AutoRedirect, SslOpts, -%% Auth, Relaxed}) -> -%% {http_options, "HTTP/1.1", TimeOut, AutoRedirect, SslOpts, -%% Auth, Relaxed}. - -%% old_http_options({http_options, _, TimeOut, AutoRedirect, -%% SslOpts, Auth, Relaxed}) -> -%% {http_options, TimeOut, AutoRedirect, SslOpts, Auth, Relaxed}. - -%% new_queue(Queue, Fun) -> -%% List = queue:to_list(Queue), -%% NewList = -%% lists:map(fun(Request) -> -%% Settings = -%% Fun(Request#request.settings), -%% Request#request{settings = Settings} -%% end, List), -%% queue:from_list(NewList). - - %%%-------------------------------------------------------------------- %%% Internal functions %%%-------------------------------------------------------------------- @@ -872,26 +799,21 @@ connect(SocketType, ToAddress, connect_and_send_first_request(Address, Request, #state{options = Options} = State) -> SocketType = socket_type(Request), ConnTimeout = (Request#request.settings)#http_options.connect_timeout, - ?hcri("connect", - [{address, Address}, {request, Request}, {options, Options}]), case connect(SocketType, Address, Options, ConnTimeout) of {ok, Socket} -> ClientClose = - httpc_request:is_client_closing( - Request#request.headers), + httpc_request:is_client_closing( + Request#request.headers), SessionType = httpc_manager:session_type(Options), SocketType = socket_type(Request), Session = #session{id = {Request#request.address, self()}, scheme = Request#request.scheme, socket = Socket, - socket_type = SocketType, - client_close = ClientClose, - type = SessionType}, - ?hcri("connected - now send first request", [{socket, Socket}]), - + socket_type = SocketType, + client_close = ClientClose, + type = SessionType}, case httpc_request:send(Address, Session, Request) of ok -> - ?hcri("first request sent", []), TmpState = State#state{request = Request, session = Session, mfa = init_mfa(Request, State), @@ -949,12 +871,6 @@ handler_info(#state{request = Request, options = _Options, timers = _Timers} = _State) -> - ?hcrt("handler info", [{request, Request}, - {session, Session}, - {pipeline, Pipeline}, - {keep_alive, KeepAlive}, - {status, Status}]), - %% Info about the current request RequestInfo = case Request of @@ -965,8 +881,6 @@ handler_info(#state{request = Request, [{id, Id}, {started, ReqStarted}] end, - ?hcrt("handler info", [{request_info, RequestInfo}]), - %% Info about the current session/socket SessionType = Session#session.type, QueueLen = case SessionType of @@ -979,22 +893,12 @@ handler_info(#state{request = Request, Socket = Session#session.socket, SocketType = Session#session.socket_type, - ?hcrt("handler info", [{session_type, SessionType}, - {queue_length, QueueLen}, - {scheme, Scheme}, - {socket, Socket}]), - SocketOpts = http_transport:getopts(SocketType, Socket), SocketStats = http_transport:getstat(SocketType, Socket), Remote = http_transport:peername(SocketType, Socket), Local = http_transport:sockname(SocketType, Socket), - ?hcrt("handler info", [{remote, Remote}, - {local, Local}, - {socket_opts, SocketOpts}, - {socket_stats, SocketStats}]), - SocketInfo = [{remote, Remote}, {local, Local}, {socket_opts, SocketOpts}, @@ -1014,7 +918,6 @@ handler_info(#state{request = Request, handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, State = #state{request = Request}) -> - ?hcrt("handle_http_msg", [{headers, Headers}]), case Headers#http_response_h.'content-type' of "multipart/byteranges" ++ _Param -> exit({not_yet_implemented, multypart_byteranges}); @@ -1028,15 +931,12 @@ handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, end; handle_http_msg({ChunkedHeaders, Body}, #state{status_line = {_, Code, _}, headers = Headers} = State) -> - ?hcrt("handle_http_msg", - [{chunked_headers, ChunkedHeaders}, {headers, Headers}]), NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), {_, NewBody, NewRequest} = stream(Body, State#state.request, Code), handle_response(State#state{headers = NewHeaders, body = NewBody, request = NewRequest}); handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) -> - ?hcrt("handle_http_msg", [{code, Code}]), {_, NewBody, NewRequest} = stream(Body, State#state.request, Code), handle_response(State#state{body = NewBody, request = NewRequest}). @@ -1051,41 +951,28 @@ handle_http_body(_, #state{status = {ssl_tunnel, Request}, {stop, normal, NewState}; handle_http_body(<<>>, #state{status_line = {_,304, _}} = State) -> - ?hcrt("handle_http_body - 304", []), handle_response(State#state{body = <<>>}); handle_http_body(<<>>, #state{status_line = {_,204, _}} = State) -> - ?hcrt("handle_http_body - 204", []), handle_response(State#state{body = <<>>}); handle_http_body(<<>>, #state{request = #request{method = head}} = State) -> - ?hcrt("handle_http_body - head", []), handle_response(State#state{body = <<>>}); handle_http_body(Body, #state{headers = Headers, max_body_size = MaxBodySize, status_line = {_,Code, _}, request = Request} = State) -> - ?hcrt("handle_http_body", - [{max_body_size, MaxBodySize}, {headers, Headers}, {code, Code}]), TransferEnc = Headers#http_response_h.'transfer-encoding', case case_insensitive_header(TransferEnc) of "chunked" -> - ?hcrt("handle_http_body - chunked", []), try http_chunk:decode(Body, State#state.max_body_size, State#state.max_header_size) of {Module, Function, Args} -> - ?hcrt("handle_http_body - new mfa", - [{module, Module}, - {function, Function}, - {args, Args}]), NewState = next_body_chunk(State, Code), {noreply, NewState#state{mfa = {Module, Function, Args}}}; {ok, {ChunkedHeaders, NewBody}} -> - ?hcrt("handle_http_body - new body", - [{chunked_headers, ChunkedHeaders}, - {new_body, NewBody}]), NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders), case Body of @@ -1107,7 +994,6 @@ handle_http_body(Body, #state{headers = Headers, {stop, normal, NewState} end; Enc when Enc =:= "identity"; Enc =:= undefined -> - ?hcrt("handle_http_body - identity", []), Length = list_to_integer(Headers#http_response_h.'content-length'), case ((Length =< MaxBodySize) orelse (MaxBodySize =:= nolimit)) of @@ -1131,7 +1017,6 @@ handle_http_body(Body, #state{headers = Headers, {stop, normal, NewState} end; Encoding when is_list(Encoding) -> - ?hcrt("handle_http_body - other", [{encoding, Encoding}]), NewState = answer_request(Request, httpc_response:error(Request, unknown_encoding), @@ -1152,18 +1037,10 @@ handle_response(#state{request = Request, options = Options, profile_name = ProfileName} = State) when Status =/= new -> - - ?hcrd("handle response", [{profile, ProfileName}, - {status, Status}, - {request, Request}, - {session, Session}, - {status_line, StatusLine}]), - handle_cookies(Headers, Request, Options, ProfileName), case httpc_response:result({StatusLine, Headers, Body}, Request) of %% 100-continue continue -> - ?hcrd("handle response - continue", []), %% Send request body {_, RequestBody} = Request#request.content, send_raw(Session, RequestBody), @@ -1180,7 +1057,6 @@ handle_response(#state{request = Request, %% Ignore unexpected 100-continue response and receive the %% actual response that the server will send right away. {ignore, Data} -> - ?hcrd("handle response - ignore", [{data, Data}]), Relaxed = (Request#request.settings)#http_options.relaxed, MFA = {httpc_response, parse, [State#state.max_header_size, Relaxed]}, @@ -1194,23 +1070,17 @@ handle_response(#state{request = Request, %% obsolete and the manager will create a new request %% with the same id as the current. {redirect, NewRequest, Data} -> - ?hcrt("handle response - redirect", - [{new_request, NewRequest}, {data, Data}]), ok = httpc_manager:redirect_request(NewRequest, ProfileName), handle_queue(State#state{request = undefined}, Data); {retry, TimeNewRequest, Data} -> - ?hcrt("handle response - retry", - [{time_new_request, TimeNewRequest}, {data, Data}]), ok = httpc_manager:retry_request(TimeNewRequest, ProfileName), handle_queue(State#state{request = undefined}, Data); {ok, Msg, Data} -> - ?hcrd("handle response - ok", []), stream_remaining_body(Body, Request, StatusLine), end_stream(StatusLine, Request), NewState = maybe_send_answer(Request, Msg, State), handle_queue(NewState, Data); {stop, Msg} -> - ?hcrd("handle response - stop", [{msg, Msg}]), end_stream(StatusLine, Request), NewState = maybe_send_answer(Request, Msg, State), {stop, normal, NewState} @@ -1245,28 +1115,19 @@ handle_pipeline(#state{status = pipeline, profile_name = ProfileName, options = #options{pipeline_timeout = TimeOut}} = State, Data) -> - - ?hcrd("handle pipeline", [{profile, ProfileName}, - {session, Session}, - {timeout, TimeOut}]), - case queue:out(State#state.pipeline) of {empty, _} -> - ?hcrd("pipeline queue empty", []), handle_empty_queue(Session, ProfileName, TimeOut, State); {{value, NextRequest}, Pipeline} -> - ?hcrd("pipeline queue non-empty", []), case lists:member(NextRequest#request.id, State#state.canceled) of true -> - ?hcrv("next request had been cancelled", []), %% See comment for handle_cast({cancel, RequestId}) {stop, normal, State#state{request = NextRequest#request{from = answer_sent}, pipeline = Pipeline}}; false -> - ?hcrv("next request", [{request, NextRequest}]), NewSession = Session#session{queue_length = %% Queue + current @@ -1283,25 +1144,16 @@ handle_keep_alive_queue(#state{status = keep_alive, options = #options{keep_alive_timeout = TimeOut, proxy = Proxy}} = State, Data) -> - - ?hcrd("handle keep_alive", [{profile, ProfileName}, - {session, Session}, - {timeout, TimeOut}]), - case queue:out(State#state.keep_alive) of {empty, _} -> - ?hcrd("keep_alive queue empty", []), handle_empty_queue(Session, ProfileName, TimeOut, State); {{value, NextRequest}, KeepAlive} -> - ?hcrd("keep_alive queue non-empty", []), case lists:member(NextRequest#request.id, State#state.canceled) of true -> - ?hcrv("next request has already been canceled", []), handle_keep_alive_queue( State#state{keep_alive = KeepAlive}, Data); false -> - ?hcrv("next request", [{request, NextRequest}]), #request{address = Addr} = NextRequest, Address = handle_proxy(Addr, Proxy), case httpc_request:send(Address, Session, NextRequest) of @@ -1314,7 +1166,6 @@ handle_keep_alive_queue(#state{status = keep_alive, end end end. - handle_empty_queue(Session, ProfileName, TimeOut, State) -> %% The server may choose too terminate an idle pipline| keep_alive session %% in this case we want to receive the close message @@ -1350,7 +1201,6 @@ init_wait_for_response_state(Request, State) -> status_line = undefined, headers = undefined, body = undefined}. - gather_data(<<>>, Session, State) -> activate_once(Session), {noreply, State}; @@ -1381,10 +1231,6 @@ activate_request_timeout( State; _ -> ReqId = Request#request.id, - ?hcrt("activate request timer", - [{request_id, ReqId}, - {time_consumed, t() - Request#request.started}, - {timeout, Timeout}]), Msg = {timeout, ReqId}, Ref = erlang:send_after(Timeout, self(), Msg), Request2 = Request#request{timer = Ref}, @@ -1427,10 +1273,6 @@ try_to_enable_pipeline_or_keep_alive( status_line = {Version, _, _}, headers = Headers, profile_name = ProfileName} = State) -> - ?hcrd("try to enable pipeline or keep-alive", - [{version, Version}, - {headers, Headers}, - {session, Session}]), case is_keep_alive_enabled_server(Version, Headers) andalso is_keep_alive_connection(Headers, Session) of true -> @@ -1455,7 +1297,6 @@ answer_request(#request{id = RequestId, from = From} = Request, Msg, #state{session = Session, timers = Timers, profile_name = ProfileName} = State) -> - ?hcrt("answer request", [{request, Request}, {msg, Msg}]), httpc_response:send(From, Msg), RequestTimers = Timers#timers.request_timers, TimerRef = @@ -1602,42 +1443,32 @@ socket_type(#request{scheme = http}) -> ip_comm; socket_type(#request{scheme = https, settings = Settings}) -> Settings#http_options.ssl. -%% socket_type(http) -> -%% ip_comm; -%% socket_type(https) -> -%% {ssl1, []}. %% Dummy value ok for ex setopts that does not use this value start_stream({_Version, _Code, _ReasonPhrase}, _Headers, #request{stream = none} = Request) -> - ?hcrt("start stream - none", []), {ok, Request}; start_stream({_Version, Code, _ReasonPhrase}, Headers, #request{stream = self} = Request) when ?IS_STREAMED(Code) -> - ?hcrt("start stream - self", [{code, Code}]), Msg = httpc_response:stream_start(Headers, Request, ignore), httpc_response:send(Request#request.from, Msg), {ok, Request}; start_stream({_Version, Code, _ReasonPhrase}, Headers, #request{stream = {self, once}} = Request) when ?IS_STREAMED(Code) -> - ?hcrt("start stream - self:once", [{code, Code}]), Msg = httpc_response:stream_start(Headers, Request, self()), httpc_response:send(Request#request.from, Msg), {ok, Request}; start_stream({_Version, Code, _ReasonPhrase}, _Headers, #request{stream = Filename} = Request) when ?IS_STREAMED(Code) andalso is_list(Filename) -> - ?hcrt("start stream", [{code, Code}, {filename, Filename}]), case file:open(Filename, [write, raw, append, delayed_write]) of {ok, Fd} -> - ?hcri("start stream - file open ok", [{fd, Fd}]), {ok, Request#request{stream = Fd}}; {error, Reason} -> exit({stream_to_file_failed, Reason}) end; start_stream(_StatusLine, _Headers, Request) -> - ?hcrt("start stream - no op", []), {ok, Request}. stream_remaining_body(<<>>, _, _) -> @@ -1648,16 +1479,12 @@ stream_remaining_body(Body, Request, {_, Code, _}) -> %% Note the end stream message is handled by httpc_response and will %% be sent by answer_request end_stream(_, #request{stream = none}) -> - ?hcrt("end stream - none", []), ok; end_stream(_, #request{stream = self}) -> - ?hcrt("end stream - self", []), ok; end_stream(_, #request{stream = {self, once}}) -> - ?hcrt("end stream - self:once", []), ok; end_stream({_,200,_}, #request{stream = Fd}) -> - ?hcrt("end stream - 200", [{stream, Fd}]), case file:close(Fd) of ok -> ok; @@ -1665,15 +1492,13 @@ end_stream({_,200,_}, #request{stream = Fd}) -> file:close(Fd) end; end_stream({_,206,_}, #request{stream = Fd}) -> - ?hcrt("end stream - 206", [{stream, Fd}]), case file:close(Fd) of ok -> ok; {error, enospc} -> % Could be due to delayed_write file:close(Fd) end; -end_stream(SL, R) -> - ?hcrt("end stream", [{status_line, SL}, {request, R}]), +end_stream(_, _) -> ok. @@ -1702,11 +1527,8 @@ handle_verbose(trace) -> handle_verbose(_) -> ok. - - send_raw(#session{socket = Socket, socket_type = SocketType}, {ProcessBody, Acc}) when is_function(ProcessBody, 1) -> - ?hcrt("send raw", [{acc, Acc}]), send_raw(SocketType, Socket, ProcessBody, Acc); send_raw(#session{socket = Socket, socket_type = SocketType}, Body) -> http_transport:send(SocketType, Socket, Body). @@ -1717,7 +1539,6 @@ send_raw(SocketType, Socket, ProcessBody, Acc) -> ok; {ok, Data, NewAcc} -> DataBin = iolist_to_binary(Data), - ?hcrd("send", [{data, DataBin}]), case http_transport:send(SocketType, Socket, DataBin) of ok -> send_raw(SocketType, Socket, ProcessBody, NewAcc); @@ -1883,16 +1704,3 @@ update_session(ProfileName, #session{id = SessionId} = Session, Pos, Value) -> end. -%% --------------------------------------------------------------------- - -call(Msg, Pid) -> - call(Msg, Pid, infinity). - -call(Msg, Pid, Timeout) -> - gen_server:call(Pid, Msg, Timeout). - -cast(Msg, Pid) -> - gen_server:cast(Pid, Msg). - -t() -> - http_util:timestamp(). |