aboutsummaryrefslogtreecommitdiffstats
path: root/lib/megaco/src/engine/megaco_messenger.erl
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2010-02-12 19:18:03 +0000
committerErlang/OTP <[email protected]>2010-02-13 07:29:33 +0100
commitf0fc9b72f5d2c4866bb47914c7d4a3a0d63e3de7 (patch)
treeedbe0b391e04705b44a3ab46c40a85d3c086e253 /lib/megaco/src/engine/megaco_messenger.erl
parent201ccdd08bd2a591fe1e348a9a1df3d94a3606e4 (diff)
downloadotp-f0fc9b72f5d2c4866bb47914c7d4a3a0d63e3de7.tar.gz
otp-f0fc9b72f5d2c4866bb47914c7d4a3a0d63e3de7.tar.bz2
otp-f0fc9b72f5d2c4866bb47914c7d4a3a0d63e3de7.zip
OTP-8317, OTP-8323, OTP-8328, OTP-8362 & OTP-8403.
Diffstat (limited to 'lib/megaco/src/engine/megaco_messenger.erl')
-rw-r--r--lib/megaco/src/engine/megaco_messenger.erl303
1 files changed, 226 insertions, 77 deletions
diff --git a/lib/megaco/src/engine/megaco_messenger.erl b/lib/megaco/src/engine/megaco_messenger.erl
index a9e4fd67b2..5756e8e896 100644
--- a/lib/megaco/src/engine/megaco_messenger.erl
+++ b/lib/megaco/src/engine/megaco_messenger.erl
@@ -1,19 +1,19 @@
%%
%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 1999-2009. All Rights Reserved.
-%%
+%%
+%% Copyright Ericsson AB 1999-2010. All Rights Reserved.
+%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% %CopyrightEnd%
%%
@@ -99,20 +99,22 @@
%% N.B. Update cancel/1 with '_' when a new field is added
-record(reply,
{
- trans_id,
- local_mid,
- state = prepare, % prepare | eval_request | waiting_for_ack | aborted
- pending_timer_ref,
- handler = undefined, % pid of the proc executing the callback func
- timer_ref,
- version,
- %% bytes: Sent reply data: not acknowledged
- bytes, % binary() | [{integer(), binary(), timer_ref()}]
- ack_action, % discard_ack | {handle_ack, Data}
- send_handle,
- %% segments: Not sent reply data (segments)
- segments = [] % [{integer(), binary()}]
- }).
+ trans_id,
+ local_mid,
+ state = prepare, % prepare | eval_request | waiting_for_ack | aborted
+ pending_timer_ref,
+ handler = undefined, % pid of the proc executing the callback func
+ timer_ref,
+ version,
+ %% bytes: Sent reply data: not acknowledged
+ bytes, % binary() | [{integer(), binary(), timer_ref()}]
+ ack_action, % discard_ack | {handle_ack, Data}
+ send_handle,
+ %% segments: Not sent reply data (segments)
+ segments = [], % [{integer(), binary()}]
+ user_mod,
+ user_args
+ }).
-record(trans_id,
{
@@ -1203,7 +1205,7 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) ->
LocalMid = ConnHandle#megaco_conn_handle.local_mid,
TransId = to_remote_trans_id(ConnData),
?rt2("prepare request", [LocalMid, TransId]),
- case megaco_monitor:lookup_reply(TransId) of
+ case lookup_reply(ConnData, TransId) of
[] ->
?rt3("brand new request"),
@@ -1222,18 +1224,22 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) ->
#conn_data{send_handle = SendHandle,
pending_timer = InitTimer,
- protocol_version = Version} = ConnData,
+ protocol_version = Version,
+ user_mod = UserMod,
+ user_args = UserArgs} = ConnData,
{WaitFor, CurrTimer} = megaco_timer:init(InitTimer),
M = ?MODULE,
F = pending_timeout,
- A = [ConnHandle, TransId, CurrTimer],
+ A = [ConnHandle, TransId, CurrTimer],
PendingRef = megaco_monitor:apply_after(M, F, A, WaitFor),
Rep = #reply{send_handle = SendHandle,
trans_id = TransId,
local_mid = LocalMid,
pending_timer_ref = PendingRef,
handler = self(),
- version = Version},
+ version = Version,
+ user_mod = UserMod,
+ user_args = UserArgs},
case megaco_monitor:insert_reply_new(Rep) of
true ->
prepare_normal_trans(ConnData, Rest, AckList,
@@ -1250,9 +1256,14 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) ->
Extra)
end;
- [#reply{state = State,
+ %% We can ignore the Converted value here as we *know*
+ %% conn-data to be correct (not faked), so even if
+ %% the record was converted, it will now have correct
+ %% values for user_mod and user_args.
+ {_Converted,
+ #reply{state = State,
handler = Pid,
- pending_timer_ref = Ref} = Rep]
+ pending_timer_ref = Ref} = Rep}
when (State =:= prepare) orelse (State =:= eval_request) ->
?rt2("request resend", [State, Pid, Ref]),
@@ -1364,9 +1375,14 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) ->
end;
- [#reply{state = waiting_for_ack,
+ %% We can ignore the Converted value here as we *know*
+ %% conn-data to be correct (not faked), so even if
+ %% the record was converted, it will now have correct
+ %% values for user_mod and user_args.
+ {_Converted,
+ #reply{state = waiting_for_ack,
bytes = Bin,
- version = Version} = Rep] ->
+ version = Version} = Rep} ->
?rt3("request resend when waiting for ack"),
%% We have already sent a reply, but the receiver
@@ -1384,7 +1400,13 @@ prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) ->
end,
prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra);
- [#reply{state = aborted} = Rep] ->
+
+ %% We can ignore the Converted value here as we *know*
+ %% conn-data to be correct (not faked), so even if
+ %% the record was converted, it will now have correct
+ %% values for user_mod and user_args.
+ {_Converted,
+ #reply{state = aborted} = Rep} ->
?rt3("request resend when already in aborted state"),
%% OTP-4956:
@@ -1424,15 +1446,15 @@ prepare_ack(ConnData, [], Rest, AckList, ReqList, Extra) ->
do_prepare_ack(ConnData, T, AckList) ->
TransId = to_remote_trans_id(ConnData),
- case megaco_monitor:lookup_reply(TransId) of
+ case lookup_reply(ConnData, TransId) of
[] ->
%% The reply has already been garbage collected. Ignore.
?report_trace(ConnData, "discard ack (no receiver)", [T]),
AckList;
- [Rep] when Rep#reply.state =:= waiting_for_ack ->
+ {_Converted, Rep} when Rep#reply.state =:= waiting_for_ack ->
%% Don't care about Msg and Rep version diff
[{ConnData, Rep, T} | AckList];
- [_Rep] ->
+ {_Converted, _Rep} ->
%% Protocol violation from the sender of this ack
?report_important(ConnData, "<ERROR> discard trans",
[T, {error, "got ack before reply was sent"}]),
@@ -1657,8 +1679,8 @@ handle_request(ConnData, TransId, T, Extra) ->
%% Furthermore, the reply timer has not been started,
%% so do the cleanup now
?rt1(ConnData, "pending limit already passed", [TransId]),
- case megaco_monitor:lookup_reply(TransId) of
- [Rep] ->
+ case lookup_reply(ConnData, TransId) of
+ {_Converted, Rep} ->
cancel_reply(ConnData, Rep, aborted);
_ ->
ok
@@ -1671,8 +1693,8 @@ do_handle_request(_, ignore, _ConnData, _TransId) ->
ignore;
do_handle_request(_, ignore_trans_request, ConnData, TransId) ->
?rt1(ConnData, "ignore trans request: don't reply", [TransId]),
- case megaco_monitor:lookup_reply(TransId) of
- [#reply{} = Rep] ->
+ case lookup_reply(ConnData, TransId) of
+ {_Converted, #reply{} = Rep} ->
cancel_reply(ConnData, Rep, ignore);
_ ->
ignore
@@ -1689,8 +1711,8 @@ do_handle_request({pending, RequestData}, _SendReply, _ConnData, _) ->
do_handle_request(AckAction, {ok, Bin}, ConnData, TransId)
when is_binary(Bin) ->
?rt1(ConnData, "handle request - ok", [AckAction, TransId]),
- case megaco_monitor:lookup_reply(TransId) of
- [#reply{pending_timer_ref = PendingRef} = Rep] ->
+ case lookup_reply(ConnData, TransId) of
+ {_Converted, #reply{pending_timer_ref = PendingRef} = Rep} ->
#conn_data{reply_timer = InitTimer,
conn_handle = ConnHandle} = ConnData,
@@ -1729,8 +1751,8 @@ do_handle_request(AckAction, {ok, {Sent, NotSent}}, ConnData, TransId)
?rt1(ConnData, "handle request - ok [segmented reply]",
[AckAction, TransId]),
- case megaco_monitor:lookup_reply(TransId) of
- [#reply{pending_timer_ref = PendingRef} = Rep] ->
+ case lookup_reply(ConnData, TransId) of
+ {_Converted, #reply{pending_timer_ref = PendingRef} = Rep} ->
%% d("do_handle_request -> found reply record:"
%% "~n Rep: ~p", [Rep]),
@@ -1772,8 +1794,8 @@ do_handle_request(AckAction, {ok, {Sent, NotSent}}, ConnData, TransId)
end;
do_handle_request(_, {error, aborted}, ConnData, TransId) ->
?report_trace(ConnData, "aborted during our absence", [TransId]),
- case megaco_monitor:lookup_reply(TransId) of
- [Rep] ->
+ case lookup_reply(ConnData, TransId) of
+ {_Converted, Rep} ->
cancel_reply(ConnData, Rep, aborted);
_ ->
ok
@@ -1781,8 +1803,8 @@ do_handle_request(_, {error, aborted}, ConnData, TransId) ->
ignore;
do_handle_request(AckAction, {error, Reason}, ConnData, TransId) ->
?report_trace(ConnData, "error", [TransId, Reason]),
- case megaco_monitor:lookup_reply(TransId) of
- [Rep] ->
+ case lookup_reply(ConnData, TransId) of
+ {_Converted, Rep} ->
Rep2 = Rep#reply{state = waiting_for_ack,
ack_action = AckAction},
cancel_reply(ConnData, Rep2, Reason);
@@ -2819,9 +2841,10 @@ handle_segment_reply(CD,
{segment_no, SN},
{segmentation_complete, SC}]),
TransId2 = to_remote_trans_id(CD#conn_data{serial = TransId}),
- case megaco_monitor:lookup_reply(TransId2) of
- [#reply{bytes = Sent,
- segments = []} = Rep] when is_list(Sent) ->
+ case lookup_reply(CD, TransId2) of
+ {_Converted,
+ #reply{bytes = Sent,
+ segments = []} = Rep} when is_list(Sent) ->
?rt2("no unsent segments", [Sent]),
handle_segment_reply_callback(CD, TransId, SN, SC, Extra),
case lists:keysearch(SN, 1, Sent) of
@@ -2845,8 +2868,9 @@ handle_segment_reply(CD,
ok
end;
- [#reply{bytes = Sent,
- segments = NotSent}] when is_list(Sent) andalso
+ {_Converted,
+ #reply{bytes = Sent,
+ segments = NotSent}} when is_list(Sent) andalso
is_list(NotSent) ->
?rt2("unsent segments", [Sent, NotSent]),
handle_segment_reply_callback(CD, TransId, SN, SC, Extra),
@@ -2883,7 +2907,8 @@ handle_segment_reply(CD,
ok
end;
- [#reply{state = State}] ->
+ {_Converted,
+ #reply{state = State}} ->
%% We received a segment reply for a segmented reply we have
%% not yet sent? This is either some sort of race condition
%% or the "the other side" is really confused.
@@ -3012,7 +3037,7 @@ handle_ack_callback(ConnData, AckStatus, {handle_ack, AckData}, T, Extra) ->
_ ->
[ConnHandle, Version, AckStatus, AckData, Extra | UserArgs]
end,
- Res = (catch apply(UserMod, handle_trans_ack, Args)),
+ Res = (catch handle_callback(ConnData, UserMod, handle_trans_ack, Args)),
?report_debug(ConnData, "return: trans ack", [T, AckData, {return, Res}]),
case Res of
ok ->
@@ -3024,6 +3049,14 @@ handle_ack_callback(ConnData, AckStatus, {handle_ack, AckData}, T, Extra) ->
Res.
+handle_callback(ConnData, undefined = _UserMod, Func, Args) ->
+ ?report_important(ConnData, "callback: unknown callback module",
+ [{func, Func}, {args, Args}]),
+ ok;
+handle_callback(_ConnData, UserMod, Func, Args) ->
+ (catch apply(UserMod, Func, Args)).
+
+
handle_message_error(ConnData, _Error, _Extra)
when ConnData#conn_data.monitor_ref == undefined_monitor_ref ->
%% May occur if another process already has setup a
@@ -4370,11 +4403,15 @@ receive_reply_remote(ConnData, UserReply, Extra) ->
return_unexpected_trans_reply(ConnData, TransId, UserReply, Extra)
end.
-cancel_reply(ConnData, #reply{state = waiting_for_ack} = Rep, Reason) ->
+cancel_reply(ConnData, #reply{state = waiting_for_ack,
+ user_mod = UserMod,
+ user_args = UserArgs} = Rep, Reason) ->
?report_trace(ignore, "cancel reply [waiting_for_ack]", [Rep]),
megaco_monitor:cancel_apply_after(Rep#reply.pending_timer_ref),
Serial = (Rep#reply.trans_id)#trans_id.serial,
- ConnData2 = ConnData#conn_data{serial = Serial},
+ ConnData2 = ConnData#conn_data{serial = Serial,
+ user_mod = UserMod,
+ user_args = UserArgs},
T = #'TransactionAck'{firstAck = Serial},
Extra = ?default_user_callback_extra,
handle_ack(ConnData2, {error, Reason}, Rep, T, Extra);
@@ -4558,16 +4595,30 @@ reply_timeout(ConnHandle, TransId, {_, timeout}) ->
reply_timeout(ConnHandle, TransId, Timer) ->
?report_trace(ConnHandle, "reply timeout", [Timer, TransId]),
- case megaco_monitor:lookup_reply(TransId) of
+ case lookup_reply(undefined, TransId) of
[] ->
reply_not_found_ignore;
- [#reply{state = waiting_for_ack,
- ack_action = {handle_ack, _}} = Rep] ->
+ {Converted,
+ #reply{state = waiting_for_ack,
+ ack_action = {handle_ack, _}} = Rep} ->
case megaco_config:lookup_local_conn(ConnHandle) of
[CD] when (CD#conn_data.cancel =:= true) ->
cancel_in_progress_ignore;
- [CD] ->
+ [CD] when (Converted =:= true) ->
+ incNumTimerRecovery(ConnHandle),
+ %% When we did the reply record lookup, we had no
+ %% conn_data record, and the reply record was
+ %% converted. This means that the reply record
+ %% has no valid info about user_mod or user_args.
+ %% Therefor, the user_mod and user_args of the
+ %% conn_data record is better then nothing.
+ #conn_data{user_mod = UserMod,
+ user_args = UserArgs} = CD,
+ Rep2 = Rep#reply{user_mod = UserMod,
+ user_args = UserArgs},
+ do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep2);
+ [CD] when (Converted =:= false) ->
incNumTimerRecovery(ConnHandle),
do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep);
[] ->
@@ -4576,10 +4627,25 @@ reply_timeout(ConnHandle, TransId, Timer) ->
do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep)
end;
- [#reply{state = waiting_for_ack,
- bytes = Sent} = Rep] when is_list(Sent) ->
+ {Converted,
+ #reply{state = waiting_for_ack,
+ bytes = Sent} = Rep} when is_list(Sent) ->
case megaco_config:lookup_local_conn(ConnHandle) of
- [ConnData] ->
+ [ConnData] when (Converted =:= true) ->
+ incNumTimerRecovery(ConnHandle),
+ %% When we did the reply record lookup, we had no
+ %% conn_data record, and the reply record was
+ %% converted. This means that the reply record
+ %% has no valid info about user_mod or user_args.
+ %% Therefor, the user_mod and user_args of the
+ %% conn_data record is better then nothing.
+ #conn_data{user_mod = UserMod,
+ user_args = UserArgs} = ConnData,
+ Rep2 = Rep#reply{user_mod = UserMod,
+ user_args = UserArgs},
+ do_reply_timeout(ConnHandle, TransId, ConnData,
+ Timer, Rep2);
+ [ConnData] when (Converted =:= false) ->
incNumTimerRecovery(ConnHandle),
do_reply_timeout(ConnHandle, TransId, ConnData,
Timer, Rep);
@@ -4590,10 +4656,12 @@ reply_timeout(ConnHandle, TransId, Timer) ->
Timer, Rep)
end;
- [#reply{state = waiting_for_ack} = Rep] ->
+ {_Converted,
+ #reply{state = waiting_for_ack} = Rep} ->
do_reply_timeout(ConnHandle, TransId, Timer, Rep);
- [#reply{state = aborted} = Rep] ->
+ {_Converted,
+ #reply{state = aborted} = Rep} ->
do_reply_timeout(ConnHandle, TransId, Timer, Rep);
_ ->
@@ -4686,22 +4754,41 @@ handle_reply_timer_timeout(ConnHandle, TransId) ->
?report_trace(ConnHandle, "handle reply timeout", [timeout, TransId]),
incNumTimerRecovery(ConnHandle),
%% OTP-4378
- case megaco_monitor:lookup_reply(TransId) of
- [#reply{state = waiting_for_ack} = Rep] ->
+ case lookup_reply(undefined, TransId) of
+ {Converted,
+ #reply{state = waiting_for_ack} = Rep} ->
Serial = (Rep#reply.trans_id)#trans_id.serial,
- ConnData =
+ {Rep2, ConnData} =
case megaco_config:lookup_local_conn(ConnHandle) of
- [ConnData0] ->
- ConnData0;
- [] ->
- fake_conn_data(ConnHandle)
+ [ConnData0] when (Converted =:= false) ->
+ #reply{user_mod = UserMod,
+ user_args = UserArgs} = Rep,
+ {Rep,
+ ConnData0#conn_data{user_mod = UserMod,
+ user_args = UserArgs}};
+ [ConnData0] when (Converted =:= true) ->
+ {Rep#reply{user_mod = ConnData0#conn_data.user_mod,
+ user_args = ConnData0#conn_data.user_args},
+ ConnData0};
+ [] when (Converted =:= false) ->
+ ConnData0 = fake_conn_data(ConnHandle),
+ #reply{user_mod = UserMod,
+ user_args = UserArgs} = Rep,
+ {Rep,
+ ConnData0#conn_data{user_mod = UserMod,
+ user_args = UserArgs}};
+ [] when (Converted =:= true) ->
+ %% We have no valid info about user_mod and user_args
+ {Rep, fake_conn_data(ConnHandle)}
end,
ConnData2 = ConnData#conn_data{serial = Serial},
T = #'TransactionAck'{firstAck = Serial},
Extra = ?default_user_callback_extra,
- handle_ack(ConnData2, {error, timeout}, Rep, T, Extra);
- [#reply{pending_timer_ref = Ref, % aborted?
- bytes = SegSent}] -> % may be a binary
+ handle_ack(ConnData2, {error, timeout}, Rep2, T, Extra);
+
+ {_Converted,
+ #reply{pending_timer_ref = Ref, % aborted?
+ bytes = SegSent}} -> % may be a binary
megaco_monitor:cancel_apply_after(Ref),
cancel_segment_timers(SegSent),
megaco_monitor:delete_reply(TransId),
@@ -4803,9 +4890,10 @@ pending_timeout(ConnHandle, TransId, Timer) ->
handle_pending_timeout(CD, TransId, Timer) ->
?report_trace(CD, "handle pending timeout", []),
- case megaco_monitor:lookup_reply(TransId) of
- [#reply{state = State,
- handler = Pid} = Rep] when (State =:= prepare) orelse
+ case lookup_reply(CD, TransId) of
+ {_Converted,
+ #reply{state = State,
+ handler = Pid} = Rep} when (State =:= prepare) orelse
(State =:= eval_request) ->
#conn_data{sent_pending_limit = Limit,
@@ -4885,12 +4973,14 @@ handle_pending_timeout(CD, TransId, Timer) ->
[] ->
reply_not_found; % Trace ??
- [#reply{state = waiting_for_ack}] ->
+ {_Converted,
+ #reply{state = waiting_for_ack}} ->
%% The reply has already been sent
%% No need for any pending trans reply
reply_has_been_sent;
- [#reply{state = aborted} = Rep] ->
+ {_Converted,
+ #reply{state = aborted} = Rep} ->
%% glitch, but cleanup just the same
cancel_reply(CD, Rep, aborted),
reply_aborted_state
@@ -5096,7 +5186,65 @@ make_transaction_reply(_, TransId, IAR, TransRes) ->
%%-----------------------------------------------------------------
+%% This function is used as a wrapper for reply-record lookups.
+%% The intention is that during upgrade, this function
+%% can perform on-the-fly conversions of reply-records.
+lookup_reply(CD, TransId) ->
+ case megaco_monitor:lookup_reply(TransId) of
+ [#reply{} = Rep] ->
+ {false, Rep};
+
+ %% Old (pre-3.13.1) version of the record => Convert to new version
+ [{reply, TransId,
+ LocalMid, State, PendingTmrRef, Handler, TimerRef,
+ Version, Bytes, AckAction, SendHandle, Segments}]
+ when is_record(CD, conn_data) ->
+ #conn_data{user_mod = UserMod,
+ user_args = UserArgs} = CD,
+ Rep = #reply{trans_id = TransId,
+ local_mid = LocalMid,
+ state = State,
+ pending_timer_ref = PendingTmrRef,
+ handler = Handler,
+ timer_ref = TimerRef,
+ version = Version,
+ bytes = Bytes,
+ ack_action = AckAction,
+ send_handle = SendHandle,
+ segments = Segments,
+ user_mod = UserMod,
+ user_args = UserArgs},
+ {true, Rep};
+
+ %% Old (pre-3.13.1) version of the record => Convert to new version
+ [{reply, TransId,
+ LocalMid, State, PendingTmrRef, Handler, TimerRef,
+ Version, Bytes, AckAction, SendHandle, Segments}] ->
+ %% ConnData is not known here, so ignore for now
+ Rep = #reply{trans_id = TransId,
+ local_mid = LocalMid,
+ state = State,
+ pending_timer_ref = PendingTmrRef,
+ handler = Handler,
+ timer_ref = TimerRef,
+ version = Version,
+ bytes = Bytes,
+ ack_action = AckAction,
+ send_handle = SendHandle,
+ segments = Segments},
+ {true, Rep};
+
+ Else ->
+ Else
+ end.
+
+
+%%-----------------------------------------------------------------
+
%%-----------------------------------------------------------------
+%% info_msg(F, A) ->
+%% ?megaco_info(F, A).
+
warning_msg(F, A) ->
?megaco_warning(F, A).
@@ -5131,7 +5279,7 @@ error_msg(F, A) ->
%% Time in milli seconds
t() ->
- {A,B,C} = erlang:now(),
+ {A,B,C} = os:timestamp(),
A*1000000000+B*1000+(C div 1000).
@@ -5156,3 +5304,4 @@ incNum(Cnt) ->
Old
end.
+