diff options
author | Micael Karlberg <[email protected]> | 2010-02-12 19:18:03 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2010-02-13 07:29:33 +0100 |
commit | f0fc9b72f5d2c4866bb47914c7d4a3a0d63e3de7 (patch) | |
tree | edbe0b391e04705b44a3ab46c40a85d3c086e253 /lib/megaco/src/engine/megaco_messenger.erl | |
parent | 201ccdd08bd2a591fe1e348a9a1df3d94a3606e4 (diff) | |
download | otp-f0fc9b72f5d2c4866bb47914c7d4a3a0d63e3de7.tar.gz otp-f0fc9b72f5d2c4866bb47914c7d4a3a0d63e3de7.tar.bz2 otp-f0fc9b72f5d2c4866bb47914c7d4a3a0d63e3de7.zip |
OTP-8317, OTP-8323, OTP-8328, OTP-8362 & OTP-8403.
Diffstat (limited to 'lib/megaco/src/engine/megaco_messenger.erl')
-rw-r--r-- | lib/megaco/src/engine/megaco_messenger.erl | 303 |
1 files changed, 226 insertions, 77 deletions
diff --git a/lib/megaco/src/engine/megaco_messenger.erl b/lib/megaco/src/engine/megaco_messenger.erl index a9e4fd67b2..5756e8e896 100644 --- a/lib/megaco/src/engine/megaco_messenger.erl +++ b/lib/megaco/src/engine/megaco_messenger.erl @@ -1,19 +1,19 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1999-2009. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 1999-2010. All Rights Reserved. +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% @@ -99,20 +99,22 @@ %% N.B. Update cancel/1 with '_' when a new field is added -record(reply, { - trans_id, - local_mid, - state = prepare, % prepare | eval_request | waiting_for_ack | aborted - pending_timer_ref, - handler = undefined, % pid of the proc executing the callback func - timer_ref, - version, - %% bytes: Sent reply data: not acknowledged - bytes, % binary() | [{integer(), binary(), timer_ref()}] - ack_action, % discard_ack | {handle_ack, Data} - send_handle, - %% segments: Not sent reply data (segments) - segments = [] % [{integer(), binary()}] - }). + trans_id, + local_mid, + state = prepare, % prepare | eval_request | waiting_for_ack | aborted + pending_timer_ref, + handler = undefined, % pid of the proc executing the callback func + timer_ref, + version, + %% bytes: Sent reply data: not acknowledged + bytes, % binary() | [{integer(), binary(), timer_ref()}] + ack_action, % discard_ack | {handle_ack, Data} + send_handle, + %% segments: Not sent reply data (segments) + segments = [], % [{integer(), binary()}] + user_mod, + user_args + }). -record(trans_id, { @@ -1203,7 +1205,7 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) -> LocalMid = ConnHandle#megaco_conn_handle.local_mid, TransId = to_remote_trans_id(ConnData), ?rt2("prepare request", [LocalMid, TransId]), - case megaco_monitor:lookup_reply(TransId) of + case lookup_reply(ConnData, TransId) of [] -> ?rt3("brand new request"), @@ -1222,18 +1224,22 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) -> #conn_data{send_handle = SendHandle, pending_timer = InitTimer, - protocol_version = Version} = ConnData, + protocol_version = Version, + user_mod = UserMod, + user_args = UserArgs} = ConnData, {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), M = ?MODULE, F = pending_timeout, - A = [ConnHandle, TransId, CurrTimer], + A = [ConnHandle, TransId, CurrTimer], PendingRef = megaco_monitor:apply_after(M, F, A, WaitFor), Rep = #reply{send_handle = SendHandle, trans_id = TransId, local_mid = LocalMid, pending_timer_ref = PendingRef, handler = self(), - version = Version}, + version = Version, + user_mod = UserMod, + user_args = UserArgs}, case megaco_monitor:insert_reply_new(Rep) of true -> prepare_normal_trans(ConnData, Rest, AckList, @@ -1250,9 +1256,14 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) -> Extra) end; - [#reply{state = State, + %% We can ignore the Converted value here as we *know* + %% conn-data to be correct (not faked), so even if + %% the record was converted, it will now have correct + %% values for user_mod and user_args. + {_Converted, + #reply{state = State, handler = Pid, - pending_timer_ref = Ref} = Rep] + pending_timer_ref = Ref} = Rep} when (State =:= prepare) orelse (State =:= eval_request) -> ?rt2("request resend", [State, Pid, Ref]), @@ -1364,9 +1375,14 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) -> end; - [#reply{state = waiting_for_ack, + %% We can ignore the Converted value here as we *know* + %% conn-data to be correct (not faked), so even if + %% the record was converted, it will now have correct + %% values for user_mod and user_args. + {_Converted, + #reply{state = waiting_for_ack, bytes = Bin, - version = Version} = Rep] -> + version = Version} = Rep} -> ?rt3("request resend when waiting for ack"), %% We have already sent a reply, but the receiver @@ -1384,7 +1400,13 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) -> end, prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra); - [#reply{state = aborted} = Rep] -> + + %% We can ignore the Converted value here as we *know* + %% conn-data to be correct (not faked), so even if + %% the record was converted, it will now have correct + %% values for user_mod and user_args. + {_Converted, + #reply{state = aborted} = Rep} -> ?rt3("request resend when already in aborted state"), %% OTP-4956: @@ -1424,15 +1446,15 @@ prepare_ack(ConnData, [], Rest, AckList, ReqList, Extra) -> do_prepare_ack(ConnData, T, AckList) -> TransId = to_remote_trans_id(ConnData), - case megaco_monitor:lookup_reply(TransId) of + case lookup_reply(ConnData, TransId) of [] -> %% The reply has already been garbage collected. Ignore. ?report_trace(ConnData, "discard ack (no receiver)", [T]), AckList; - [Rep] when Rep#reply.state =:= waiting_for_ack -> + {_Converted, Rep} when Rep#reply.state =:= waiting_for_ack -> %% Don't care about Msg and Rep version diff [{ConnData, Rep, T} | AckList]; - [_Rep] -> + {_Converted, _Rep} -> %% Protocol violation from the sender of this ack ?report_important(ConnData, "<ERROR> discard trans", [T, {error, "got ack before reply was sent"}]), @@ -1657,8 +1679,8 @@ handle_request(ConnData, TransId, T, Extra) -> %% Furthermore, the reply timer has not been started, %% so do the cleanup now ?rt1(ConnData, "pending limit already passed", [TransId]), - case megaco_monitor:lookup_reply(TransId) of - [Rep] -> + case lookup_reply(ConnData, TransId) of + {_Converted, Rep} -> cancel_reply(ConnData, Rep, aborted); _ -> ok @@ -1671,8 +1693,8 @@ do_handle_request(_, ignore, _ConnData, _TransId) -> ignore; do_handle_request(_, ignore_trans_request, ConnData, TransId) -> ?rt1(ConnData, "ignore trans request: don't reply", [TransId]), - case megaco_monitor:lookup_reply(TransId) of - [#reply{} = Rep] -> + case lookup_reply(ConnData, TransId) of + {_Converted, #reply{} = Rep} -> cancel_reply(ConnData, Rep, ignore); _ -> ignore @@ -1689,8 +1711,8 @@ do_handle_request({pending, RequestData}, _SendReply, _ConnData, _) -> do_handle_request(AckAction, {ok, Bin}, ConnData, TransId) when is_binary(Bin) -> ?rt1(ConnData, "handle request - ok", [AckAction, TransId]), - case megaco_monitor:lookup_reply(TransId) of - [#reply{pending_timer_ref = PendingRef} = Rep] -> + case lookup_reply(ConnData, TransId) of + {_Converted, #reply{pending_timer_ref = PendingRef} = Rep} -> #conn_data{reply_timer = InitTimer, conn_handle = ConnHandle} = ConnData, @@ -1729,8 +1751,8 @@ do_handle_request(AckAction, {ok, {Sent, NotSent}}, ConnData, TransId) ?rt1(ConnData, "handle request - ok [segmented reply]", [AckAction, TransId]), - case megaco_monitor:lookup_reply(TransId) of - [#reply{pending_timer_ref = PendingRef} = Rep] -> + case lookup_reply(ConnData, TransId) of + {_Converted, #reply{pending_timer_ref = PendingRef} = Rep} -> %% d("do_handle_request -> found reply record:" %% "~n Rep: ~p", [Rep]), @@ -1772,8 +1794,8 @@ do_handle_request(AckAction, {ok, {Sent, NotSent}}, ConnData, TransId) end; do_handle_request(_, {error, aborted}, ConnData, TransId) -> ?report_trace(ConnData, "aborted during our absence", [TransId]), - case megaco_monitor:lookup_reply(TransId) of - [Rep] -> + case lookup_reply(ConnData, TransId) of + {_Converted, Rep} -> cancel_reply(ConnData, Rep, aborted); _ -> ok @@ -1781,8 +1803,8 @@ do_handle_request(_, {error, aborted}, ConnData, TransId) -> ignore; do_handle_request(AckAction, {error, Reason}, ConnData, TransId) -> ?report_trace(ConnData, "error", [TransId, Reason]), - case megaco_monitor:lookup_reply(TransId) of - [Rep] -> + case lookup_reply(ConnData, TransId) of + {_Converted, Rep} -> Rep2 = Rep#reply{state = waiting_for_ack, ack_action = AckAction}, cancel_reply(ConnData, Rep2, Reason); @@ -2819,9 +2841,10 @@ handle_segment_reply(CD, {segment_no, SN}, {segmentation_complete, SC}]), TransId2 = to_remote_trans_id(CD#conn_data{serial = TransId}), - case megaco_monitor:lookup_reply(TransId2) of - [#reply{bytes = Sent, - segments = []} = Rep] when is_list(Sent) -> + case lookup_reply(CD, TransId2) of + {_Converted, + #reply{bytes = Sent, + segments = []} = Rep} when is_list(Sent) -> ?rt2("no unsent segments", [Sent]), handle_segment_reply_callback(CD, TransId, SN, SC, Extra), case lists:keysearch(SN, 1, Sent) of @@ -2845,8 +2868,9 @@ handle_segment_reply(CD, ok end; - [#reply{bytes = Sent, - segments = NotSent}] when is_list(Sent) andalso + {_Converted, + #reply{bytes = Sent, + segments = NotSent}} when is_list(Sent) andalso is_list(NotSent) -> ?rt2("unsent segments", [Sent, NotSent]), handle_segment_reply_callback(CD, TransId, SN, SC, Extra), @@ -2883,7 +2907,8 @@ handle_segment_reply(CD, ok end; - [#reply{state = State}] -> + {_Converted, + #reply{state = State}} -> %% We received a segment reply for a segmented reply we have %% not yet sent? This is either some sort of race condition %% or the "the other side" is really confused. @@ -3012,7 +3037,7 @@ handle_ack_callback(ConnData, AckStatus, {handle_ack, AckData}, T, Extra) -> _ -> [ConnHandle, Version, AckStatus, AckData, Extra | UserArgs] end, - Res = (catch apply(UserMod, handle_trans_ack, Args)), + Res = (catch handle_callback(ConnData, UserMod, handle_trans_ack, Args)), ?report_debug(ConnData, "return: trans ack", [T, AckData, {return, Res}]), case Res of ok -> @@ -3024,6 +3049,14 @@ handle_ack_callback(ConnData, AckStatus, {handle_ack, AckData}, T, Extra) -> Res. +handle_callback(ConnData, undefined = _UserMod, Func, Args) -> + ?report_important(ConnData, "callback: unknown callback module", + [{func, Func}, {args, Args}]), + ok; +handle_callback(_ConnData, UserMod, Func, Args) -> + (catch apply(UserMod, Func, Args)). + + handle_message_error(ConnData, _Error, _Extra) when ConnData#conn_data.monitor_ref == undefined_monitor_ref -> %% May occur if another process already has setup a @@ -4370,11 +4403,15 @@ receive_reply_remote(ConnData, UserReply, Extra) -> return_unexpected_trans_reply(ConnData, TransId, UserReply, Extra) end. -cancel_reply(ConnData, #reply{state = waiting_for_ack} = Rep, Reason) -> +cancel_reply(ConnData, #reply{state = waiting_for_ack, + user_mod = UserMod, + user_args = UserArgs} = Rep, Reason) -> ?report_trace(ignore, "cancel reply [waiting_for_ack]", [Rep]), megaco_monitor:cancel_apply_after(Rep#reply.pending_timer_ref), Serial = (Rep#reply.trans_id)#trans_id.serial, - ConnData2 = ConnData#conn_data{serial = Serial}, + ConnData2 = ConnData#conn_data{serial = Serial, + user_mod = UserMod, + user_args = UserArgs}, T = #'TransactionAck'{firstAck = Serial}, Extra = ?default_user_callback_extra, handle_ack(ConnData2, {error, Reason}, Rep, T, Extra); @@ -4558,16 +4595,30 @@ reply_timeout(ConnHandle, TransId, {_, timeout}) -> reply_timeout(ConnHandle, TransId, Timer) -> ?report_trace(ConnHandle, "reply timeout", [Timer, TransId]), - case megaco_monitor:lookup_reply(TransId) of + case lookup_reply(undefined, TransId) of [] -> reply_not_found_ignore; - [#reply{state = waiting_for_ack, - ack_action = {handle_ack, _}} = Rep] -> + {Converted, + #reply{state = waiting_for_ack, + ack_action = {handle_ack, _}} = Rep} -> case megaco_config:lookup_local_conn(ConnHandle) of [CD] when (CD#conn_data.cancel =:= true) -> cancel_in_progress_ignore; - [CD] -> + [CD] when (Converted =:= true) -> + incNumTimerRecovery(ConnHandle), + %% When we did the reply record lookup, we had no + %% conn_data record, and the reply record was + %% converted. This means that the reply record + %% has no valid info about user_mod or user_args. + %% Therefor, the user_mod and user_args of the + %% conn_data record is better then nothing. + #conn_data{user_mod = UserMod, + user_args = UserArgs} = CD, + Rep2 = Rep#reply{user_mod = UserMod, + user_args = UserArgs}, + do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep2); + [CD] when (Converted =:= false) -> incNumTimerRecovery(ConnHandle), do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep); [] -> @@ -4576,10 +4627,25 @@ reply_timeout(ConnHandle, TransId, Timer) -> do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep) end; - [#reply{state = waiting_for_ack, - bytes = Sent} = Rep] when is_list(Sent) -> + {Converted, + #reply{state = waiting_for_ack, + bytes = Sent} = Rep} when is_list(Sent) -> case megaco_config:lookup_local_conn(ConnHandle) of - [ConnData] -> + [ConnData] when (Converted =:= true) -> + incNumTimerRecovery(ConnHandle), + %% When we did the reply record lookup, we had no + %% conn_data record, and the reply record was + %% converted. This means that the reply record + %% has no valid info about user_mod or user_args. + %% Therefor, the user_mod and user_args of the + %% conn_data record is better then nothing. + #conn_data{user_mod = UserMod, + user_args = UserArgs} = ConnData, + Rep2 = Rep#reply{user_mod = UserMod, + user_args = UserArgs}, + do_reply_timeout(ConnHandle, TransId, ConnData, + Timer, Rep2); + [ConnData] when (Converted =:= false) -> incNumTimerRecovery(ConnHandle), do_reply_timeout(ConnHandle, TransId, ConnData, Timer, Rep); @@ -4590,10 +4656,12 @@ reply_timeout(ConnHandle, TransId, Timer) -> Timer, Rep) end; - [#reply{state = waiting_for_ack} = Rep] -> + {_Converted, + #reply{state = waiting_for_ack} = Rep} -> do_reply_timeout(ConnHandle, TransId, Timer, Rep); - [#reply{state = aborted} = Rep] -> + {_Converted, + #reply{state = aborted} = Rep} -> do_reply_timeout(ConnHandle, TransId, Timer, Rep); _ -> @@ -4686,22 +4754,41 @@ handle_reply_timer_timeout(ConnHandle, TransId) -> ?report_trace(ConnHandle, "handle reply timeout", [timeout, TransId]), incNumTimerRecovery(ConnHandle), %% OTP-4378 - case megaco_monitor:lookup_reply(TransId) of - [#reply{state = waiting_for_ack} = Rep] -> + case lookup_reply(undefined, TransId) of + {Converted, + #reply{state = waiting_for_ack} = Rep} -> Serial = (Rep#reply.trans_id)#trans_id.serial, - ConnData = + {Rep2, ConnData} = case megaco_config:lookup_local_conn(ConnHandle) of - [ConnData0] -> - ConnData0; - [] -> - fake_conn_data(ConnHandle) + [ConnData0] when (Converted =:= false) -> + #reply{user_mod = UserMod, + user_args = UserArgs} = Rep, + {Rep, + ConnData0#conn_data{user_mod = UserMod, + user_args = UserArgs}}; + [ConnData0] when (Converted =:= true) -> + {Rep#reply{user_mod = ConnData0#conn_data.user_mod, + user_args = ConnData0#conn_data.user_args}, + ConnData0}; + [] when (Converted =:= false) -> + ConnData0 = fake_conn_data(ConnHandle), + #reply{user_mod = UserMod, + user_args = UserArgs} = Rep, + {Rep, + ConnData0#conn_data{user_mod = UserMod, + user_args = UserArgs}}; + [] when (Converted =:= true) -> + %% We have no valid info about user_mod and user_args + {Rep, fake_conn_data(ConnHandle)} end, ConnData2 = ConnData#conn_data{serial = Serial}, T = #'TransactionAck'{firstAck = Serial}, Extra = ?default_user_callback_extra, - handle_ack(ConnData2, {error, timeout}, Rep, T, Extra); - [#reply{pending_timer_ref = Ref, % aborted? - bytes = SegSent}] -> % may be a binary + handle_ack(ConnData2, {error, timeout}, Rep2, T, Extra); + + {_Converted, + #reply{pending_timer_ref = Ref, % aborted? + bytes = SegSent}} -> % may be a binary megaco_monitor:cancel_apply_after(Ref), cancel_segment_timers(SegSent), megaco_monitor:delete_reply(TransId), @@ -4803,9 +4890,10 @@ pending_timeout(ConnHandle, TransId, Timer) -> handle_pending_timeout(CD, TransId, Timer) -> ?report_trace(CD, "handle pending timeout", []), - case megaco_monitor:lookup_reply(TransId) of - [#reply{state = State, - handler = Pid} = Rep] when (State =:= prepare) orelse + case lookup_reply(CD, TransId) of + {_Converted, + #reply{state = State, + handler = Pid} = Rep} when (State =:= prepare) orelse (State =:= eval_request) -> #conn_data{sent_pending_limit = Limit, @@ -4885,12 +4973,14 @@ handle_pending_timeout(CD, TransId, Timer) -> [] -> reply_not_found; % Trace ?? - [#reply{state = waiting_for_ack}] -> + {_Converted, + #reply{state = waiting_for_ack}} -> %% The reply has already been sent %% No need for any pending trans reply reply_has_been_sent; - [#reply{state = aborted} = Rep] -> + {_Converted, + #reply{state = aborted} = Rep} -> %% glitch, but cleanup just the same cancel_reply(CD, Rep, aborted), reply_aborted_state @@ -5096,7 +5186,65 @@ make_transaction_reply(_, TransId, IAR, TransRes) -> %%----------------------------------------------------------------- +%% This function is used as a wrapper for reply-record lookups. +%% The intention is that during upgrade, this function +%% can perform on-the-fly conversions of reply-records. +lookup_reply(CD, TransId) -> + case megaco_monitor:lookup_reply(TransId) of + [#reply{} = Rep] -> + {false, Rep}; + + %% Old (pre-3.13.1) version of the record => Convert to new version + [{reply, TransId, + LocalMid, State, PendingTmrRef, Handler, TimerRef, + Version, Bytes, AckAction, SendHandle, Segments}] + when is_record(CD, conn_data) -> + #conn_data{user_mod = UserMod, + user_args = UserArgs} = CD, + Rep = #reply{trans_id = TransId, + local_mid = LocalMid, + state = State, + pending_timer_ref = PendingTmrRef, + handler = Handler, + timer_ref = TimerRef, + version = Version, + bytes = Bytes, + ack_action = AckAction, + send_handle = SendHandle, + segments = Segments, + user_mod = UserMod, + user_args = UserArgs}, + {true, Rep}; + + %% Old (pre-3.13.1) version of the record => Convert to new version + [{reply, TransId, + LocalMid, State, PendingTmrRef, Handler, TimerRef, + Version, Bytes, AckAction, SendHandle, Segments}] -> + %% ConnData is not known here, so ignore for now + Rep = #reply{trans_id = TransId, + local_mid = LocalMid, + state = State, + pending_timer_ref = PendingTmrRef, + handler = Handler, + timer_ref = TimerRef, + version = Version, + bytes = Bytes, + ack_action = AckAction, + send_handle = SendHandle, + segments = Segments}, + {true, Rep}; + + Else -> + Else + end. + + +%%----------------------------------------------------------------- + %%----------------------------------------------------------------- +%% info_msg(F, A) -> +%% ?megaco_info(F, A). + warning_msg(F, A) -> ?megaco_warning(F, A). @@ -5131,7 +5279,7 @@ error_msg(F, A) -> %% Time in milli seconds t() -> - {A,B,C} = erlang:now(), + {A,B,C} = os:timestamp(), A*1000000000+B*1000+(C div 1000). @@ -5156,3 +5304,4 @@ incNum(Cnt) -> Old end. + |