aboutsummaryrefslogtreecommitdiffstats
path: root/lib/megaco/src/engine/megaco_messenger.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/megaco/src/engine/megaco_messenger.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/megaco/src/engine/megaco_messenger.erl')
-rw-r--r--lib/megaco/src/engine/megaco_messenger.erl5158
1 files changed, 5158 insertions, 0 deletions
diff --git a/lib/megaco/src/engine/megaco_messenger.erl b/lib/megaco/src/engine/megaco_messenger.erl
new file mode 100644
index 0000000000..a9e4fd67b2
--- /dev/null
+++ b/lib/megaco/src/engine/megaco_messenger.erl
@@ -0,0 +1,5158 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1999-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%%----------------------------------------------------------------------
+%% Purpose: Send and process a (sequence of) Megaco/H.248 transactions
+%%----------------------------------------------------------------------
+
+-module(megaco_messenger).
+
+%% Application internal export
+-export([
+ process_received_message/4, process_received_message/5,
+ receive_message/4, receive_message/5,
+ connect/4, connect/5,
+ disconnect/2,
+ encode_actions/3,
+ call/3,
+ cast/3,
+ cancel/2,
+ request_timeout/2,
+ request_keep_alive_timeout/2,
+ pending_timeout/3,
+ reply_timeout/3,
+ segment_timeout/3,
+ %% segment_reply_timeout/4,
+
+ test_request/5,
+ test_reply/5
+ ]).
+
+%% MIB stat functions
+-export([
+ get_stats/0, get_stats/1, get_stats/2,
+ reset_stats/0, reset_stats/1
+ ]).
+
+%% Misc functions
+-export([
+ cleanup/2,
+ which_requests/1, which_replies/1
+ ]).
+
+%% Module internal export
+-export([
+ process_received_message/6,
+ handle_request/2,
+ handle_long_request/2,
+ connect_remote/3,
+ disconnect_local/2,
+ disconnect_remote/3,
+ send_request_remote/4,
+ receive_reply_remote/2, receive_reply_remote/3
+ ]).
+
+-include_lib("megaco/include/megaco.hrl").
+-include("megaco_message_internal.hrl").
+-include_lib("megaco/src/app/megaco_internal.hrl").
+
+%% N.B. Update cancel/1 with '_' when a new field is added
+-record(request,
+ {trans_id,
+ remote_mid,
+ timer_ref, % {short, Ref} | {long, Ref}
+ init_timer,
+ init_long_timer,
+ curr_timer,
+ version,
+ bytes, % {send, Data} | {no_send, Data}, Data = binary() | tuple()
+ send_handle,
+ user_mod,
+ user_args,
+ reply_action, % call | cast
+ reply_data,
+ seg_recv = [], % [integer()] (received segments)
+ init_seg_timer,
+ seg_timer_ref,
+ keep_alive_timer, % plain | integer() >= 0
+ keep_alive_ref % undefined | ref()
+ }).
+
+
+%% N.B. Update cancel/1 with '_' when a new field is added
+-record(reply,
+ {
+ trans_id,
+ local_mid,
+ state = prepare, % prepare | eval_request | waiting_for_ack | aborted
+ pending_timer_ref,
+ handler = undefined, % pid of the proc executing the callback func
+ timer_ref,
+ version,
+ %% bytes: Sent reply data: not acknowledged
+ bytes, % binary() | [{integer(), binary(), timer_ref()}]
+ ack_action, % discard_ack | {handle_ack, Data}
+ send_handle,
+ %% segments: Not sent reply data (segments)
+ segments = [] % [{integer(), binary()}]
+ }).
+
+-record(trans_id,
+ {
+ mid,
+ serial
+ }).
+
+
+-ifdef(MEGACO_TEST_CODE).
+-define(SIM(Other,Where),
+ fun(Afun,Bfun) ->
+ Kfun = {?MODULE,Bfun},
+ case (catch ets:lookup(megaco_test_data, Kfun)) of
+ [{Kfun,Cfun}] ->
+ Cfun(Afun);
+ _ ->
+ Afun
+ end
+ end(Other,Where)).
+-define(TC_AWAIT_CANCEL_EVENT(),
+ case megaco_tc_controller:lookup(block_on_cancel) of
+ {value, {Tag, Pid}} when is_pid(Pid) ->
+ Pid ! {Tag, self()},
+ receive
+ {Tag, Pid} ->
+ ok
+ end;
+ {value, {sleep, To}} when is_integer(To) andalso (To > 0) ->
+ receive after To -> ok end;
+ _ ->
+ ok
+ end).
+-define(TC_AWAIT_REPLY_EVENT(Info),
+ case megaco_tc_controller:lookup(block_on_reply) of
+ {value, {Tag, Pid}} when is_pid(Pid) ->
+ Pid ! {Tag, self(), Info},
+ receive
+ {Tag, Pid} ->
+ ok
+ end;
+ _Whatever ->
+ %% io:format("Whatever: ~p~n", [Whatever]),
+ ok
+ end).
+-else.
+-define(SIM(Other,Where),Other).
+-define(TC_AWAIT_CANCEL_EVENT(),ok).
+-define(TC_AWAIT_REPLY_EVENT(_),ok).
+-endif.
+
+
+-define(report_pending_limit_exceeded(ConnData),
+ ?report_important(ConnData, "<ERROR> pending limit exceeded", [])).
+
+-ifdef(megaco_extended_trace).
+-define(rt1(T,F,A),?report_trace(T,F,A)).
+-define(rt2(F,A), ?rt1(ignore,F,A)).
+-define(rt3(F), ?rt2(F,[])).
+-else.
+-define(rt1(T,F,A),ok).
+-define(rt2(F,A), ok).
+-define(rt3(F), ok).
+-endif.
+
+
+%%----------------------------------------------------------------------
+%% SNMP statistics handling functions
+%%----------------------------------------------------------------------
+
+%%-----------------------------------------------------------------
+%% Func: get_stats/0, get_stats/1, get_stats/2
+%% Description: Retreive statistics (counters) for TCP
+%%-----------------------------------------------------------------
+
+get_stats() ->
+ megaco_stats:get_stats(megaco_stats).
+
+get_stats(ConnHandleOrCounter) ->
+ megaco_stats:get_stats(megaco_stats, ConnHandleOrCounter).
+
+get_stats(ConnHandle, Counter) ->
+ megaco_stats:get_stats(megaco_stats, ConnHandle, Counter).
+
+
+%%-----------------------------------------------------------------
+%% Func: reset_stats/0, reaet_stats/1
+%% Description: Reset statistics (counters)
+%%-----------------------------------------------------------------
+
+reset_stats() ->
+ megaco_stats:reset_stats(megaco_stats).
+
+reset_stats(ConnHandleOrCounter) ->
+ megaco_stats:reset_stats(megaco_stats, ConnHandleOrCounter).
+
+
+
+%%----------------------------------------------------------------------
+%% cleanup utility functions
+%%----------------------------------------------------------------------
+
+cleanup(#megaco_conn_handle{local_mid = LocalMid}, Force)
+ when (Force =:= true) orelse (Force =:= false) ->
+ Pat = #reply{trans_id = '$1',
+ local_mid = LocalMid,
+ state = '$2',
+ _ = '_'},
+ do_cleanup(Pat, Force);
+cleanup(LocalMid, Force)
+ when (Force =:= true) orelse (Force =:= false) ->
+ Pat = #reply{trans_id = '$1',
+ local_mid = LocalMid,
+ state = '$2',
+ _ = '_'},
+ do_cleanup(Pat, Force).
+
+do_cleanup(Pat, Force) ->
+ Match = megaco_monitor:which_replies(Pat),
+ Reps = [{V1, V2} || [V1, V2] <- Match],
+ do_cleanup2(Reps, Force).
+
+do_cleanup2([], _) ->
+ ok;
+do_cleanup2([{TransId, aborted}|T], Force = false) ->
+ megaco_monitor:delete_reply(TransId),
+ do_cleanup2(T, Force);
+do_cleanup2([_|T], Force = false) ->
+ do_cleanup2(T, Force);
+do_cleanup2([{TransId, _State}|T], Force = true) ->
+ megaco_monitor:delete_reply(TransId),
+ do_cleanup2(T, Force).
+
+
+%%----------------------------------------------------------------------
+%% which_requests and which_replies utility functions
+%%----------------------------------------------------------------------
+
+which_requests(#megaco_conn_handle{local_mid = LocalMid,
+ remote_mid = RemoteMid}) ->
+ Pat1 = #trans_id{mid = LocalMid,
+ serial = '$1', _ = '_'},
+ Pat2 = #request{trans_id = Pat1,
+ remote_mid = RemoteMid,
+ _ = '_'},
+ Match = megaco_monitor:which_requests(Pat2),
+ [S || [S] <- Match];
+which_requests(LocalMid) ->
+ Pat1 = #trans_id{mid = LocalMid,
+ serial = '$1', _ = '_'},
+ Pat2 = #request{trans_id = Pat1,
+ remote_mid = '$2', _ = '_'},
+ Match0 = megaco_monitor:which_requests(Pat2),
+ Match1 = [{mk_ch(LocalMid, V2), V1} || [V1, V2] <- Match0],
+ which_requests1(lists:sort(Match1)).
+
+which_requests1([]) ->
+ [];
+which_requests1([{CH, S}|T]) ->
+ which_requests2(T, CH, [S], []).
+
+which_requests2([], CH, Serials, Reqs) ->
+ lists:reverse([{CH, Serials}|Reqs]);
+which_requests2([{CH, S}|T], CH, Serials, Reqs) ->
+ which_requests2(T, CH, [S|Serials], Reqs);
+which_requests2([{CH1, S}|T], CH2, Serials, Reqs) ->
+ which_requests2(T, CH1, [S], [{CH2, lists:reverse(Serials)}| Reqs]).
+
+
+which_replies(#megaco_conn_handle{local_mid = LocalMid,
+ remote_mid = RemoteMid}) ->
+ Pat1 = #trans_id{mid = RemoteMid,
+ serial = '$1', _ = '_'},
+ Pat2 = #reply{trans_id = Pat1,
+ local_mid = LocalMid,
+ state = '$2',
+ handler = '$3', _ = '_'},
+ Match = megaco_monitor:which_replies(Pat2),
+ [{V1, V2, V3} || [V1, V2, V3] <- Match];
+which_replies(LocalMid) ->
+ Pat1 = #trans_id{mid = '$1',
+ serial = '$2', _ = '_'},
+ Pat2 = #reply{trans_id = Pat1,
+ local_mid = LocalMid,
+ state = '$3',
+ handler = '$4', _ = '_'},
+ Match0 = megaco_monitor:which_replies(Pat2),
+ Match1 = [{mk_ch(LocalMid,V1),{V2,V3,V4}} || [V1, V2, V3, V4] <- Match0],
+ which_replies1(lists:sort(Match1)).
+
+which_replies1([]) ->
+ [];
+which_replies1([{CH, Data}|T]) ->
+ which_replies2(T, CH, [Data], []).
+
+which_replies2([], CH, Data, Reps) ->
+ lists:reverse([{CH, Data}|Reps]);
+which_replies2([{CH, Data}|T], CH, Datas, Reps) ->
+ which_replies2(T, CH, [Data|Datas], Reps);
+which_replies2([{CH1, Data}|T], CH2, Datas, Reps) ->
+ which_replies2(T, CH1, [Data], [{CH2, lists:reverse(Datas)}| Reps]).
+
+
+mk_ch(LM, RM) ->
+ #megaco_conn_handle{local_mid = LM, remote_mid = RM}.
+
+
+%%----------------------------------------------------------------------
+%% Register/unreister connections
+%%----------------------------------------------------------------------
+
+%% Returns {ok, ConnHandle} | {error, Reason}
+autoconnect(RH, RemoteMid, SendHandle, ControlPid, Extra)
+ when is_record(RH, megaco_receive_handle) ->
+ ?rt2("autoconnect", [RH, RemoteMid, SendHandle, ControlPid]),
+ case megaco_config:autoconnect(RH, RemoteMid, SendHandle, ControlPid) of
+ {ok, ConnData} ->
+ do_connect(ConnData, Extra);
+ {error, Reason} ->
+ {error, Reason}
+ end;
+autoconnect(BadHandle, _CH, _SendHandle, _ControlPid, _Extra) ->
+ {error, {bad_receive_handle, BadHandle}}.
+
+connect(RH, RemoteMid, SendHandle, ControlPid) ->
+ Extra = ?default_user_callback_extra,
+ connect(RH, RemoteMid, SendHandle, ControlPid, Extra).
+connect(RH, RemoteMid, SendHandle, ControlPid, Extra)
+ when is_record(RH, megaco_receive_handle) ->
+ ?rt2("connect", [RH, RemoteMid, SendHandle, ControlPid, Extra]),
+
+ %% The purpose of this is to have a temoporary process, to
+ %% which one can set up a monitor or link and get a
+ %% notification when process exits. The entire connect is
+ %% done in the temporary worker process.
+ %% When it exits, the connect is either successfully done
+ %% or it failed.
+
+ ConnectorFun =
+ fun() ->
+
+ ConnectResult =
+ case megaco_config:connect(RH, RemoteMid,
+ SendHandle, ControlPid) of
+ {ok, ConnData} ->
+ do_connect(ConnData, Extra);
+ {error, Reason} ->
+ {error, Reason}
+ end,
+ ?rt2("connector: connected", [self(), ConnectResult]),
+ exit({result, ConnectResult})
+ end,
+ Flag = process_flag(trap_exit, true),
+ Connector = erlang:spawn_link(ConnectorFun),
+ receive
+ {'EXIT', Connector, {result, ConnectResult}} ->
+ ?rt2("connect result: received expected connector exit signal",
+ [Connector, ConnectResult]),
+ process_flag(trap_exit, Flag),
+ ConnectResult;
+ {'EXIT', Connector, OtherReason} ->
+ ?rt2("connect exit: received unexpected connector exit signal",
+ [Connector, OtherReason]),
+ process_flag(trap_exit, Flag),
+ {error, OtherReason}
+ end;
+connect(BadHandle, _CH, _SendHandle, _ControlPid, _Extra) ->
+ {error, {bad_receive_handle, BadHandle}}.
+
+do_connect(CD, Extra) ->
+ CH = CD#conn_data.conn_handle,
+ Version = CD#conn_data.protocol_version,
+ UserMod = CD#conn_data.user_mod,
+ UserArgs = CD#conn_data.user_args,
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [CH, Version | UserArgs];
+ _ ->
+ [CH, Version, Extra | UserArgs]
+ end,
+ ?report_trace(CD, "callback: connect", [Args]),
+ Res = (catch apply(UserMod, handle_connect, Args)),
+ ?report_debug(CD, "return: connect", [{return, Res}]),
+ case Res of
+ ok ->
+ ?SIM(ok, do_connect), % do_encode),
+ monitor_process(CH, CD#conn_data.control_pid);
+ error ->
+ megaco_config:disconnect(CH),
+ {error, {connection_refused, CD, error}};
+ {error, ED} when is_record(ED,'ErrorDescriptor') ->
+ megaco_config:disconnect(CH),
+ {error, {connection_refused, CD, ED}};
+ _Error ->
+ warning_msg("connect callback failed: ~w", [Res]),
+ megaco_config:disconnect(CH),
+ {error, {connection_refused, CD, Res}}
+ end.
+
+finish_connect(#conn_data{control_pid = ControlPid} = CD)
+ when is_pid(ControlPid) andalso (node(ControlPid) =:= node()) ->
+ ?rt1(CD, "finish local connect", [ControlPid]),
+ do_finish_connect(CD);
+finish_connect(#conn_data{conn_handle = CH,
+ control_pid = ControlPid} = CD)
+ when is_pid(ControlPid) andalso (node(ControlPid) =/= node()) ->
+ ?rt1(CD, "finish remote connect", [ControlPid]),
+ RemoteNode = node(ControlPid),
+ UserMonitorPid = whereis(megaco_monitor),
+ Args = [CH, ControlPid, UserMonitorPid],
+ case rpc:call(RemoteNode, ?MODULE, connect_remote, Args) of
+ {ok, ControlMonitorPid} ->
+ do_finish_connect(CD#conn_data{control_pid = ControlMonitorPid});
+ {error, Reason} ->
+ disconnect(CH, {connect_remote, Reason}),
+ {error, Reason};
+ {badrpc, Reason} ->
+ Reason2 = {'EXIT', Reason},
+ disconnect(CH, {connect_remote, Reason2}),
+ {error, Reason2}
+ end.
+
+do_finish_connect(#conn_data{conn_handle = CH,
+ send_handle = SendHandle,
+ control_pid = ControlPid} = CD) ->
+ M = ?MODULE,
+ F = disconnect_local,
+ A = [CH],
+ MFA = {M, F, A},
+ case megaco_config:finish_connect(CH, SendHandle, ControlPid, MFA) of
+ {ok, Ref} ->
+ {ok, CD#conn_data{monitor_ref = Ref}};
+ {error, Reason} ->
+ {error, {config_update, Reason}}
+ end.
+
+
+monitor_process(CH, ControlPid)
+ when is_pid(ControlPid) andalso (node(ControlPid) =:= node()) ->
+ M = ?MODULE,
+ F = disconnect_local,
+ A = [CH],
+ Ref = megaco_monitor:apply_at_exit(M, F, A, ControlPid),
+ case megaco_config:update_conn_info(CH, monitor_ref, Ref) of
+ ok ->
+ ?SIM({ok, CH}, monitor_process_local);
+ {error, Reason} ->
+ disconnect(CH, {config_update, Reason}),
+ {error, Reason}
+ end;
+monitor_process(CH, ControlPid)
+ when is_pid(ControlPid) andalso (node(ControlPid) =/= node()) ->
+ RemoteNode = node(ControlPid),
+ UserMonitorPid = whereis(megaco_monitor),
+ Args = [CH, ControlPid, UserMonitorPid],
+ case rpc:call(RemoteNode, ?MODULE, connect_remote, Args) of
+ {ok, ControlMonitorPid} ->
+ M = ?MODULE,
+ F = disconnect_local,
+ A = [CH],
+ Ref = megaco_monitor:apply_at_exit(M, F, A, ControlMonitorPid),
+ case megaco_config:update_conn_info(CH, monitor_ref, Ref) of
+ ok ->
+ ?SIM({ok, CH}, monitor_process_remote);
+ {error, Reason} ->
+ disconnect(CH, {config_update, Reason}),
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ disconnect(CH, {connect_remote, Reason}),
+ {error, Reason};
+ {badrpc, Reason} ->
+ Reason2 = {'EXIT', Reason},
+ disconnect(CH, {connect_remote, Reason2}),
+ {error, Reason2}
+ end;
+monitor_process(CH, undefined = _ControlPid) ->
+ %% We have to do this later (setting up the monitor),
+ %% when the first message arrives. The 'connected' atom is
+ %% the indication for the first arriving message to finish
+ %% the connect.
+ %% This may be the case when an MGC performs a pre-connect
+ %% in order to speed up the handling of an (expected) connecting
+ %% MG.
+ case megaco_config:update_conn_info(CH, monitor_ref, connected) of
+ ok ->
+ ?SIM({ok, CH}, monitor_process_local);
+ {error, Reason} ->
+ disconnect(CH, {config_update, Reason}),
+ {error, Reason}
+ end.
+
+connect_remote(CH, ControlPid, UserMonitorPid)
+ when node(ControlPid) =:= node() andalso node(UserMonitorPid) =/= node() ->
+ case megaco_config:lookup_local_conn(CH) of
+ [_ConnData] ->
+ UserNode = node(UserMonitorPid),
+ M = ?MODULE,
+ F = disconnect_remote,
+ A = [CH, UserNode],
+ Ref = megaco_monitor:apply_at_exit(M, F, A, UserMonitorPid),
+ case megaco_config:connect_remote(CH, UserNode, Ref) of
+ ok ->
+ ControlMonitorPid = whereis(megaco_monitor),
+ ?SIM({ok, ControlMonitorPid}, connect_remote);
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ [] ->
+ {error, {no_connection, CH}}
+ end.
+
+cancel_apply_at_exit({connecting, _ConnectorPid}) ->
+ ok;
+cancel_apply_at_exit(connected) ->
+ ok;
+cancel_apply_at_exit(ControlRef) ->
+ megaco_monitor:cancel_apply_at_exit(ControlRef).
+
+node_of_control_pid(Pid) when is_pid(Pid) ->
+ node(Pid);
+node_of_control_pid(_) ->
+ node().
+
+disconnect(ConnHandle, DiscoReason)
+ when is_record(ConnHandle, megaco_conn_handle) ->
+ case megaco_config:disconnect(ConnHandle) of
+ {ok, ConnData, RemoteConnData} ->
+ ControlRef = ConnData#conn_data.monitor_ref,
+ cancel_apply_at_exit(ControlRef),
+ handle_disconnect_callback(ConnData, DiscoReason),
+ ControlNode = node_of_control_pid(ConnData#conn_data.control_pid),
+ case ControlNode =:= node() of
+ true ->
+ %% Propagate to remote users
+ CancelFun =
+ fun(RCD) ->
+ UserRef = RCD#remote_conn_data.monitor_ref,
+ cancel_apply_at_exit(UserRef),
+ RCD#remote_conn_data.user_node
+ end,
+ Nodes = lists:map(CancelFun, RemoteConnData),
+ %% io:format("NODES: ~p~n", [Nodes]),
+ M = ?MODULE,
+ F = disconnect,
+ A = [ConnHandle, DiscoReason],
+ case rpc:multicall(Nodes, M, F, A) of
+ {Res, []} ->
+ Check = fun(ok) -> false;
+ ({error, {no_connection, _CH}}) -> false;
+ (_) -> true
+ end,
+ case lists:filter(Check, Res) of
+ [] ->
+ ok;
+ Bad ->
+ {error, {remote_disconnect_error, ConnHandle, Bad}}
+ end;
+ {_Res, Bad} ->
+ {error, {remote_disconnect_crash, ConnHandle, Bad}}
+ end;
+ false when (RemoteConnData =:= []) ->
+ %% Propagate to remote control node
+ M = ?MODULE,
+ F = disconnect_remote,
+ A = [DiscoReason, ConnHandle, node()],
+ case rpc:call(ControlNode, M, F, A) of
+ {badrpc, Reason} ->
+ {error, {'EXIT', Reason}};
+ Other ->
+ Other
+ end
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end;
+disconnect(BadHandle, Reason) ->
+ {error, {bad_conn_handle, BadHandle, Reason}}.
+
+disconnect_local(Reason, ConnHandle) ->
+ disconnect(ConnHandle, {no_controlling_process, Reason}).
+
+disconnect_remote(_Reason, ConnHandle, UserNode) ->
+ case megaco_config:disconnect_remote(ConnHandle, UserNode) of
+ [RCD] ->
+ Ref = RCD#remote_conn_data.monitor_ref,
+ cancel_apply_at_exit(Ref),
+ ok;
+ [] ->
+ {error, {no_connection, ConnHandle}}
+ end.
+
+
+%%----------------------------------------------------------------------
+%% Handle incoming message
+%%----------------------------------------------------------------------
+
+receive_message(ReceiveHandle, ControlPid, SendHandle, Bin) ->
+ Extra = ?default_user_callback_extra,
+ receive_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra).
+
+receive_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra) ->
+ Opts = [link , {min_heap_size, 5000}],
+ spawn_opt(?MODULE,
+ process_received_message,
+ [ReceiveHandle, ControlPid, SendHandle, Bin, self(), Extra], Opts),
+ ok.
+
+%% This function is called via the spawn_opt function with the link
+%% option, therefor the unlink before the exit.
+process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Receiver,
+ Extra) ->
+ process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra),
+ unlink(Receiver),
+ exit(normal).
+
+process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin) ->
+ Extra = ?default_user_callback_extra,
+ process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra).
+
+process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra) ->
+ Flag = process_flag(trap_exit, true),
+ case prepare_message(ReceiveHandle, SendHandle, Bin, ControlPid, Extra) of
+ {ok, ConnData, MegaMsg} when is_record(MegaMsg, 'MegacoMessage') ->
+ ?rt1(ConnData, "message prepared", [MegaMsg]),
+ Mess = MegaMsg#'MegacoMessage'.mess,
+ case Mess#'Message'.messageBody of
+ {transactions, Transactions} ->
+ {AckList, ReqList} =
+ prepare_trans(ConnData, Transactions, [], [], Extra),
+ handle_acks(AckList, Extra),
+ case ReqList of
+ [] ->
+ ?rt3("no transaction requests"),
+ ignore;
+ [Req|Reqs] when (ConnData#conn_data.threaded =:= true) ->
+ ?rt3("handle requests (spawned)"),
+ lists:foreach(
+ fun(R) ->
+ spawn(?MODULE, handle_request, [R, Extra])
+ end,
+ Reqs),
+ handle_request(Req, Extra);
+ _ ->
+ ?rt3("handle requests"),
+ case handle_requests(ReqList, [], Extra) of
+ [] ->
+ ignore;
+ [LongRequest | More] ->
+ lists:foreach(
+ fun(LR) ->
+ spawn(?MODULE, handle_long_request, [LR, Extra])
+ end,
+ More),
+ handle_long_request(LongRequest, Extra)
+ end
+ end;
+ {messageError, Error} ->
+ handle_message_error(ConnData, Error, Extra)
+ end;
+ {silent_fail, ConnData, {_Code, Reason, Error}} ->
+ ?report_debug(ConnData, Reason, [no_reply, Error]),
+ ignore;
+ {verbose_fail, ConnData, {Code, Reason, Error}} ->
+ ?report_debug(ConnData, Reason, [Error]),
+ send_message_error(ConnData, Code, Reason)
+ end,
+ process_flag(trap_exit, Flag),
+ ok.
+
+prepare_message(RH, SH, Bin, Pid, Extra)
+ when is_record(RH, megaco_receive_handle) andalso is_pid(Pid) ->
+ ?report_trace(RH, "receive bytes", [{bytes, Bin}]),
+ EncodingMod = RH#megaco_receive_handle.encoding_mod,
+ EncodingConfig = RH#megaco_receive_handle.encoding_config,
+ ProtVersion = RH#megaco_receive_handle.protocol_version,
+ case (catch EncodingMod:decode_message(EncodingConfig, ProtVersion, Bin)) of
+ {ok, MegaMsg} when is_record(MegaMsg, 'MegacoMessage') ->
+ ?report_trace(RH, "receive message", [{message, MegaMsg}]),
+ Mess = MegaMsg#'MegacoMessage'.mess,
+ RemoteMid = Mess#'Message'.mId,
+ Version = Mess#'Message'.version,
+ LocalMid = RH#megaco_receive_handle.local_mid,
+ CH = #megaco_conn_handle{local_mid = LocalMid,
+ remote_mid = RemoteMid},
+ case megaco_config:lookup_local_conn(CH) of
+
+ %%
+ %% Message is not of the negotiated version
+ %%
+
+ [#conn_data{protocol_version = NegVersion,
+ strict_version = true} = ConnData]
+ when NegVersion =/= Version ->
+ %% Use already established connection,
+ %% but incorrect version
+ ?rt1(ConnData, "not negotiated version", [Version]),
+ Error = {error, {not_negotiated_version,
+ NegVersion, Version}},
+ handle_syntax_error_callback(RH, ConnData,
+ prepare_error(Error),
+ Extra);
+
+
+ [ConnData] ->
+
+ %%
+ %% Use an already established connection
+ %%
+ %% This *may* have been set up in the
+ %% "non-official" way, so we may need to
+ %% create the monitor to the control process
+ %% and store the SendHandle (which is normally
+ %% done when creating the "temporary" connection).
+ %%
+
+ ?rt1(ConnData, "use already established connection", []),
+ ConnData2 = ConnData#conn_data{send_handle = SH,
+ control_pid = Pid,
+ protocol_version = Version},
+ check_message_auth(CH, ConnData2, MegaMsg, Bin);
+
+ [] ->
+ %% Setup a temporary connection
+ ?rt3("setup a temporary connection"),
+ case autoconnect(RH, RemoteMid, SH, Pid, Extra) of
+ {ok, _} ->
+ do_prepare_message(RH, CH, SH, MegaMsg, Pid, Bin);
+ {error, {already_connected, _ConnHandle}} ->
+ do_prepare_message(RH, CH, SH, MegaMsg, Pid, Bin);
+ {error, {connection_refused, ConnData, Reason}} ->
+ Error = prepare_error({error, {connection_refused, Reason}}),
+ {verbose_fail, ConnData, Error};
+ {error, Reason} ->
+ ConnData = fake_conn_data(RH, RemoteMid, SH, Pid),
+ ConnData2 = ConnData#conn_data{protocol_version = Version},
+ Error = prepare_error({error, Reason}),
+ {verbose_fail, ConnData2, Error}
+ end
+ end;
+ Error ->
+ ?rt2("decode error", [Error]),
+ ConnData = handle_decode_error(Error,
+ RH, SH, Bin, Pid,
+ EncodingMod,
+ EncodingConfig,
+ ProtVersion),
+ handle_syntax_error_callback(RH, ConnData, prepare_error(Error), Extra)
+ end;
+prepare_message(RH, SendHandle, _Bin, ControlPid, _Extra) ->
+ ConnData = fake_conn_data(RH, SendHandle, ControlPid),
+ Error = prepare_error({'EXIT', {bad_receive_handle, RH}}),
+ {verbose_fail, ConnData, Error}.
+
+
+handle_decode_error({error, {unsupported_version, _}},
+ #megaco_receive_handle{local_mid = LocalMid} = RH, SH,
+ Bin, Pid,
+ EM, EC, V) ->
+ case (catch EM:decode_mini_message(EC, V, Bin)) of
+ {ok, #'MegacoMessage'{mess = #'Message'{version = _Ver,
+ mId = RemoteMid}}} ->
+ ?rt2("erroneous message received", [SH, RemoteMid, _Ver]),
+ CH = #megaco_conn_handle{local_mid = LocalMid,
+ remote_mid = RemoteMid},
+ incNumErrors(CH),
+ %% We cannot put the version into conn-data, that will
+ %% make the resulting error message impossible to sent
+ %% (unsupported version)
+ case megaco_config:lookup_local_conn(CH) of
+ [ConnData] ->
+ ?rt3("known to us"),
+ ConnData#conn_data{send_handle = SH};
+ [] ->
+ ?rt3("unknown to us"),
+ ConnData = fake_conn_data(RH, SH, Pid),
+ ConnData#conn_data{conn_handle = CH}
+ end;
+
+ _ ->
+ ?rt2("erroneous message received", [SH]),
+ incNumErrors(),
+ fake_conn_data(RH, SH, Pid)
+ end;
+
+handle_decode_error(_,
+ #megaco_receive_handle{local_mid = LocalMid} = RH, SH,
+ Bin, Pid,
+ EM, EC, V) ->
+ case (catch EM:decode_mini_message(EC, V, Bin)) of
+ {ok, #'MegacoMessage'{mess = #'Message'{version = Ver,
+ mId = RemoteMid}}} ->
+ ?rt2("erroneous message received", [SH, Ver, RemoteMid]),
+ CH = #megaco_conn_handle{local_mid = LocalMid,
+ remote_mid = RemoteMid},
+ incNumErrors(CH),
+ case megaco_config:lookup_local_conn(CH) of
+ [ConnData] ->
+ ?rt3("known to us"),
+ ConnData#conn_data{send_handle = SH,
+ protocol_version = Ver};
+ [] ->
+ ?rt3("unknown to us"),
+ ConnData = fake_conn_data(RH, SH, Pid),
+ ConnData#conn_data{conn_handle = CH,
+ protocol_version = Ver}
+ end;
+
+ _ ->
+ ?rt2("erroneous message received", [SH]),
+ incNumErrors(),
+ fake_conn_data(RH, SH, Pid)
+ end.
+
+
+do_prepare_message(RH, CH, SendHandle, MegaMsg, ControlPid, Bin) ->
+ case megaco_config:lookup_local_conn(CH) of
+ [ConnData] ->
+ case check_message_auth(CH, ConnData, MegaMsg, Bin) of
+ {ok, ConnData2, MegaMsg} ->
+ %% Let the connection be permanent
+ {ok, ConnData2, MegaMsg};
+ {ReplyTag, ConnData, Reason} ->
+ %% Remove the temporary connection
+ disconnect(CH, {bad_auth, Reason}),
+ {ReplyTag, ConnData, Reason}
+ end;
+ [] ->
+ Reason = no_connection,
+ disconnect(CH, Reason),
+ RemoteMid = CH#megaco_conn_handle.remote_mid,
+ ConnData = fake_conn_data(RH, RemoteMid, SendHandle, ControlPid),
+ Error = prepare_error({error, Reason}),
+ {silent_fail, ConnData, Error}
+ end.
+
+check_message_auth(_ConnHandle, ConnData, MegaMsg, Bin) ->
+ MsgAuth = MegaMsg#'MegacoMessage'.authHeader,
+ Mess = MegaMsg#'MegacoMessage'.mess,
+ Version = Mess#'Message'.version,
+ ConnData2 = ConnData#conn_data{protocol_version = Version},
+ ConnAuth = ConnData2#conn_data.auth_data,
+ ?report_trace(ConnData2, "check message auth", [{bytes, Bin}]),
+ if
+ (MsgAuth =:= asn1_NOVALUE) andalso (ConnAuth =:= asn1_NOVALUE) ->
+ ?SIM({ok, ConnData2, MegaMsg}, check_message_auth);
+ true ->
+ ED = #'ErrorDescriptor'{errorCode = ?megaco_unauthorized,
+ errorText = "Autentication is not supported"},
+ {verbose_fail, ConnData2, prepare_error({error, ED})}
+ end.
+
+handle_syntax_error_callback(ReceiveHandle, ConnData, PrepError, Extra) ->
+ {Code, Reason, Error} = PrepError,
+ ErrorDesc = #'ErrorDescriptor'{errorCode = Code, errorText = Reason},
+ Version =
+ case Error of
+ {error, {unsupported_version, UV}} ->
+ UV;
+ _ ->
+ ConnData#conn_data.protocol_version
+ end,
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ ?report_trace(ReceiveHandle, "callback: syntax error", [ErrorDesc, Error]),
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ReceiveHandle, Version, ErrorDesc | UserArgs];
+ _ ->
+ [ReceiveHandle, Version, ErrorDesc, Extra | UserArgs]
+ end,
+ Res = (catch apply(UserMod, handle_syntax_error, Args)),
+ ?report_debug(ReceiveHandle, "return: syntax error",
+ [{return, Res}, ErrorDesc, Error]),
+ case Res of
+ reply ->
+ {verbose_fail, ConnData, PrepError};
+ {reply,#'ErrorDescriptor'{errorCode = Code1, errorText = Reason1}} ->
+ {verbose_fail, ConnData, {Code1,Reason1,Error}};
+ no_reply ->
+ {silent_fail, ConnData, PrepError};
+ {no_reply,#'ErrorDescriptor'{errorCode=Code2,errorText=Reason2}} ->
+ {silent_fail, ConnData, {Code2,Reason2,Error}}; %%% OTP-????
+ _ ->
+ warning_msg("syntax error callback failed: ~w", [Res]),
+ {verbose_fail, ConnData, PrepError}
+ end.
+
+fake_conn_data(CH) when is_record(CH, megaco_conn_handle) ->
+ case (catch megaco_config:conn_info(CH, receive_handle)) of
+ RH when is_record(RH, megaco_receive_handle) ->
+ RemoteMid = CH#megaco_conn_handle.remote_mid,
+ ConnData =
+ fake_conn_data(RH, RemoteMid, no_send_handle, no_control_pid),
+ ConnData#conn_data{conn_handle = CH};
+ {'EXIT', _} ->
+ UserMid = CH#megaco_conn_handle.local_mid,
+ case catch megaco_config:user_info(UserMid, receive_handle) of
+ {'EXIT', _} -> % No such user
+ #conn_data{conn_handle = CH,
+ serial = undefined_serial,
+ control_pid = no_control_pid,
+ monitor_ref = undefined_monitor_ref,
+ send_mod = no_send_mod,
+ send_handle = no_send_handle,
+ encoding_mod = no_encoding_mod,
+ encoding_config = no_encoding_config,
+ reply_action = undefined,
+ sent_pending_limit = infinity,
+ recv_pending_limit = infinity};
+ RH ->
+ ConnData =
+ fake_conn_data(RH, no_send_handle, no_control_pid),
+ ConnData#conn_data{conn_handle = CH}
+ end
+ end.
+
+fake_conn_data(RH, SendHandle, ControlPid) ->
+ fake_conn_data(RH, unknown_remote_mid, SendHandle, ControlPid).
+
+fake_conn_data(RH, RemoteMid, SendHandle, ControlPid) ->
+ case catch megaco_config:init_conn_data(RH, RemoteMid, SendHandle, ControlPid) of
+ {'EXIT', _} -> % No such user
+ fake_user_data(RH, RemoteMid, SendHandle, ControlPid);
+ ConnData ->
+ ConnData
+ end.
+
+fake_user_data(RH, RemoteMid, SendHandle, ControlPid) ->
+ LocalMid = RH#megaco_receive_handle.local_mid,
+ RH2 = RH#megaco_receive_handle{local_mid = default},
+ case catch megaco_config:init_conn_data(RH2, RemoteMid, SendHandle, ControlPid) of
+ {'EXIT', _} -> % Application stopped?
+ ConnHandle = #megaco_conn_handle{local_mid = LocalMid,
+ remote_mid = RemoteMid},
+ EncodingMod = RH#megaco_receive_handle.encoding_mod,
+ EncodingConfig = RH#megaco_receive_handle.encoding_config,
+ SendMod = RH#megaco_receive_handle.send_mod,
+ #conn_data{conn_handle = ConnHandle,
+ serial = undefined_serial,
+ control_pid = ControlPid,
+ monitor_ref = undefined_monitor_ref,
+ send_mod = SendMod,
+ send_handle = SendHandle,
+ encoding_mod = EncodingMod,
+ encoding_config = EncodingConfig,
+ reply_action = undefined,
+ sent_pending_limit = infinity,
+ recv_pending_limit = infinity};
+ ConnData ->
+ ConnData
+ end.
+
+prepare_error(Error) ->
+ case Error of
+ {error, ED} when is_record(ED, 'ErrorDescriptor') ->
+ Code = ED#'ErrorDescriptor'.errorCode,
+ Reason = ED#'ErrorDescriptor'.errorText,
+ {Code, Reason, Error};
+ {error, [{reason, {bad_token, [BadToken, _Acc]}, Line}]} when is_integer(Line) ->
+ Reason =
+ lists:flatten(
+ io_lib:format("Illegal token (~p) on line ~w", [BadToken, Line])),
+ Code = ?megaco_bad_request,
+ {Code, Reason, Error};
+ {error, [{reason, {bad_token, _}, Line}]} when is_integer(Line) ->
+ Reason = lists:concat(["Illegal token on line ", Line]),
+ Code = ?megaco_bad_request,
+ {Code, Reason, Error};
+ {error, [{reason, {Line, _ParserMod, RawReasonString}} | _]} when is_integer(Line) andalso is_list(RawReasonString) ->
+ Reason =
+ case RawReasonString of
+ [[$s, $y, $n, $t, $a, $x | _], TokenString] ->
+ lists:flatten(
+ io_lib:format("Syntax error on line ~w before token ~s", [Line, TokenString]));
+ _ ->
+ lists:flatten(io_lib:format("Syntax error on line ~w", [Line]))
+ end,
+ Code = ?megaco_bad_request,
+ {Code, Reason, Error};
+ {error, [{reason, {Line, _, _}} | _]} when is_integer(Line) ->
+ Reason = lists:concat(["Syntax error on line ", Line]),
+ Code = ?megaco_bad_request,
+ {Code, Reason, Error};
+ {error, {connection_refused, ED}} when is_record(ED,'ErrorDescriptor') ->
+ Code = ED#'ErrorDescriptor'.errorCode,
+ Reason = ED#'ErrorDescriptor'.errorText,
+ {Code, Reason, Error};
+ {error, {connection_refused, _}} ->
+ Reason = "Connection refused by user",
+ Code = ?megaco_unauthorized,
+ {Code, Reason, Error};
+ {error, {unsupported_version, V}} ->
+ Reason =
+ lists:flatten(io_lib:format("Unsupported version: ~w",[V])),
+ Code = ?megaco_version_not_supported,
+ {Code, Reason, Error};
+ {error, {not_negotiated_version, NegV, MsgV}} ->
+ Reason =
+ lists:flatten(
+ io_lib:format("Not negotiated version: ~w [negotiated ~w]",
+ [MsgV, NegV])),
+ Code = ?megaco_version_not_supported,
+ {Code, Reason, Error};
+ {error, _} ->
+ Reason = "Syntax error",
+ Code = ?megaco_bad_request,
+ {Code, Reason, Error};
+ {ok, MegaMsg} when is_record(MegaMsg, 'MegacoMessage') ->
+ Reason = "MID does not match config",
+ Code = ?megaco_incorrect_identifier,
+ {Code, Reason, Error};
+ _ ->
+ Reason = "Fatal syntax error",
+ Code = ?megaco_internal_gateway_error,
+ {Code, Reason, Error}
+ end.
+
+prepare_trans(_ConnData, [], AckList, ReqList, _Extra) ->
+ ?SIM({AckList, ReqList}, prepare_trans_done);
+
+prepare_trans(ConnData, Trans, AckList, ReqList, Extra)
+ when ConnData#conn_data.monitor_ref =:= undefined_auto_monitor_ref ->
+
+ ?rt3("prepare_trans - autoconnect"),
+
+ %% <BUGBUG>
+ %% Do we need something here, if we send more then one
+ %% trans per message?
+ %% </BUGBUG>
+
+ %% May occur if another process has already setup a
+ %% temporary connection, but the handle_connect callback
+ %% function has not yet returned before the eager MG
+ %% re-sends its initial service change message.
+
+ prepare_autoconnecting_trans(ConnData, Trans, AckList, ReqList, Extra);
+
+prepare_trans(#conn_data{monitor_ref = connected} = ConnData,
+ Trans, AckList, ReqList, Extra) ->
+
+ ?rt3("prepare_trans - connected"),
+
+ %%
+ %% This will happen when the "MGC" user performs a "pre" connect,
+ %% instead of waiting for the auto-connect (which normally
+ %% happen when the MGC receives the first message from the
+ %% MG).
+ %%
+
+ %%
+ %% The monitor_ref will have this value when the pre-connect
+ %% is complete, so we finish it here and then continue with the
+ %% normal transaction prepare.
+ %%
+
+ case finish_connect(ConnData) of
+ {ok, CD} ->
+ prepare_normal_trans(CD, Trans, AckList, ReqList, Extra);
+ {error, Reason} ->
+ disconnect(ConnData#conn_data.conn_handle, Reason),
+ {[], []}
+ end;
+
+prepare_trans(#conn_data{monitor_ref = {connecting, _}} = _ConnData,
+ _Trans, _AckList, _ReqList, _Extra) ->
+
+ ?rt3("prepare_trans - connecting"),
+
+ %%
+ %% This will happen when the "MGC" user performs a "pre" connect,
+ %% instead of waiting for the auto-connect (which normally
+ %% happen when the MGC receives the first message from the
+ %% MG).
+ %%
+
+ %%
+ %% The monitor_ref will have this value when the pre-connect
+ %% is in progress. We drop (ignore) this message and hope the
+ %% other side (MG) will resend.
+ %%
+
+ %% prepare_connecting_trans(ConnData, Trans, AckList, ReqList, Extra);
+ {[], []};
+
+prepare_trans(ConnData, Trans, AckList, ReqList, Extra) ->
+
+ ?rt3("prepare_trans - normal"),
+
+ %% Handle transaction in the normal case
+
+ prepare_normal_trans(ConnData, Trans, AckList, ReqList, Extra).
+
+
+prepare_autoconnecting_trans(_ConnData, [], AckList, ReqList, _Extra) ->
+ ?SIM({AckList, ReqList}, prepare_autoconnecting_trans_done);
+
+prepare_autoconnecting_trans(ConnData, [Trans | Rest], AckList, ReqList, Extra) ->
+ ?rt1(ConnData, "[autoconnecting] prepare trans", [Trans]),
+ case Trans of
+ {transactionRequest, T} when is_record(T, 'TransactionRequest') ->
+
+ Serial = T#'TransactionRequest'.transactionId,
+ ConnData2 = ConnData#conn_data{serial = Serial},
+ ?report_trace(ConnData2, "Pending handle_connect", [T]),
+
+ %% ------------------------------------------
+ %%
+ %% Check pending limit
+ %%
+ %% ------------------------------------------
+
+ Limit = ConnData#conn_data.sent_pending_limit,
+ TransId = to_remote_trans_id(ConnData2),
+ case check_and_maybe_incr_pending_limit(Limit, sent, TransId) of
+ ok ->
+ send_pending(ConnData2);
+ error ->
+ %% Pending limit:
+ %% In this (granted, highly hypothetical case)
+ %% we would make the user very confused if we
+ %% called the abort callback function, since
+ %% the request callback function has not yet
+ %% been called. Alas, we skip this call here.
+ send_pending_limit_error(ConnData);
+ aborted ->
+ ignore
+ end,
+ prepare_autoconnecting_trans(ConnData2, Rest, AckList, ReqList,
+ Extra);
+ _ ->
+ prepare_autoconnecting_trans(ConnData, Rest, AckList, ReqList,
+ Extra)
+ end.
+
+
+%% =================================================================
+%%
+%% Note that the TransactionReply record was changed i v3 (two
+%% new fields where added), and since we don't know which version,
+%% we cannot use the record definition of TransactionReply.
+%% Instead we transform the record into our own internal format
+%% #megaco_transaction_reply{}
+%%
+%% =================================================================
+
+prepare_normal_trans(_ConnData, [], AckList, ReqList, _Extra) ->
+ ?SIM({AckList, ReqList}, prepare_normal_trans_done);
+
+prepare_normal_trans(ConnData, [Trans | Rest], AckList, ReqList, Extra) ->
+ ?rt1(ConnData, "prepare [normal] trans", [Trans]),
+ case Trans of
+ {transactionRequest, #'TransactionRequest'{transactionId = asn1_NOVALUE}} ->
+ ConnData2 = ConnData#conn_data{serial = 0},
+ Code = ?megaco_bad_request,
+ Reason = "Syntax error in message: transaction id missing",
+ send_trans_error(ConnData2, Code, Reason),
+ prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra);
+ {transactionRequest, T} when is_record(T, 'TransactionRequest') ->
+ Serial = T#'TransactionRequest'.transactionId,
+ ConnData2 = ConnData#conn_data{serial = Serial},
+ prepare_request(ConnData2, T, Rest, AckList, ReqList, Extra);
+ {transactionPending, T} when is_record(T, 'TransactionPending') ->
+ Serial = T#'TransactionPending'.transactionId,
+ ConnData2 = ConnData#conn_data{serial = Serial},
+ handle_pending(ConnData2, T, Extra),
+ prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra);
+ {transactionReply, T} when is_tuple(T) andalso
+ (element(1, T) == 'TransactionReply') ->
+ T2 = transform_transaction_reply_dec(T),
+ Serial = T2#megaco_transaction_reply.transactionId,
+ ConnData2 = ConnData#conn_data{serial = Serial},
+ handle_reply(ConnData2, T2, Extra),
+ prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra);
+ {transactionResponseAck, List} when is_list(List) ->
+ prepare_ack(ConnData, List, Rest, AckList, ReqList, Extra);
+ {segmentReply, SR} when is_record(SR, 'SegmentReply') ->
+ handle_segment_reply(ConnData, SR, Extra),
+ prepare_normal_trans(ConnData, Rest, AckList, ReqList, Extra)
+
+ end.
+
+
+prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) ->
+ ?rt2("prepare request", [T]),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ LocalMid = ConnHandle#megaco_conn_handle.local_mid,
+ TransId = to_remote_trans_id(ConnData),
+ ?rt2("prepare request", [LocalMid, TransId]),
+ case megaco_monitor:lookup_reply(TransId) of
+ [] ->
+ ?rt3("brand new request"),
+
+ %% Brand new request
+
+ %% Check pending limit:
+ %%
+ %% We should actually check the pending limit here
+ %% but since we have to do it later in the
+ %% handle_request function (just before we call
+ %% the handle_trans_request callback function) we
+ %% can just as well wait (this is after all a very
+ %% unlikely case: see function prepare_trans when
+ %% monitor_ref == undefined_auto_monitor_ref).
+ %%
+
+ #conn_data{send_handle = SendHandle,
+ pending_timer = InitTimer,
+ protocol_version = Version} = ConnData,
+ {WaitFor, CurrTimer} = megaco_timer:init(InitTimer),
+ M = ?MODULE,
+ F = pending_timeout,
+ A = [ConnHandle, TransId, CurrTimer],
+ PendingRef = megaco_monitor:apply_after(M, F, A, WaitFor),
+ Rep = #reply{send_handle = SendHandle,
+ trans_id = TransId,
+ local_mid = LocalMid,
+ pending_timer_ref = PendingRef,
+ handler = self(),
+ version = Version},
+ case megaco_monitor:insert_reply_new(Rep) of
+ true ->
+ prepare_normal_trans(ConnData, Rest, AckList,
+ [{ConnData, TransId, T} | ReqList],
+ Extra);
+ false ->
+ %% Oups - someone got there before we did...
+ ?report_debug(ConnData,
+ "prepare request: conflicting requests",
+ [TransId]),
+ send_pending(ConnData),
+ megaco_monitor:cancel_apply_after(PendingRef),
+ prepare_normal_trans(ConnData, Rest, AckList, ReqList,
+ Extra)
+ end;
+
+ [#reply{state = State,
+ handler = Pid,
+ pending_timer_ref = Ref} = Rep]
+ when (State =:= prepare) orelse (State =:= eval_request) ->
+
+ ?rt2("request resend", [State, Pid, Ref]),
+
+ %% Pending limit:
+ %% We are still preparing/evaluating the request
+ %% Check if the pending limit has been exceeded...
+ %% If the pending limit is _not_ exceeded then
+ %% we shall send a pending (and actually restart
+ %% the pending timer, but that we cannot do).
+ %% Don't care about Msg and Rep version diff
+
+ #conn_data{sent_pending_limit = Limit} = ConnData,
+
+ case check_and_maybe_incr_pending_limit(Limit, sent, TransId) of
+ ok ->
+
+ %% ------------------------------------------
+ %%
+ %% Pending limit not exceeded
+ %%
+ %% 1) Increment number of pendings sent
+ %% (done in the check function above)
+ %% 2) Send pending message
+ %% (We should really restart the pending
+ %% timer, but we have no way of doing that).
+ %%
+ %% ------------------------------------------
+
+ send_pending(ConnData),
+ prepare_normal_trans(ConnData, Rest, AckList, ReqList,
+ Extra);
+
+
+ error ->
+
+ %% -------------------------------------------
+ %%
+ %% Pending limit exceeded
+ %%
+ %% 1) Cancel pending timer
+ %% 2) Send 506 error message to other side
+ %% 3) Inform user (depends on state)
+ %% 4) Set reply in aborted state
+ %%
+ %% -------------------------------------------
+
+ %%
+ %% State == eval_request:
+ %% This means that the request is currently beeing
+ %% evaluated by the user, and the reply timer has
+ %% not yet been started.
+ %% Either:
+ %% a) The "other side" will resend (which will
+ %% trigger a pending message send) until we pass the
+ %% pending limit
+ %% b) We will send pending messages (when the pending
+ %% timer expire) until we pass the pending limit.
+ %% In any event, we cannot delete the reply record
+ %% or the pending counter in this case. Is there
+ %% a risk we accumulate aborted reply records?
+ %%
+ %% State == prepare:
+ %% The user does not know about this request
+ %% so we can safely perform cleanup.
+ %%
+ megaco_monitor:cancel_apply_after(Ref),
+ send_pending_limit_error(ConnData),
+ if
+ State == eval_request ->
+ %%
+ %% What if the user never replies?
+ %% In that case we will have a record
+ %% (and counters) that is never cleaned up...
+ NewFields =
+ [{#reply.state, aborted},
+ {#reply.pending_timer_ref, undefined}],
+ megaco_monitor:update_reply_fields(TransId,
+ NewFields),
+ handle_request_abort_callback(ConnData,
+ TransId, Pid, Extra);
+ true ->
+ %% Since the user does not know about
+ %% this call yet, it is safe to cleanup.
+ %% Should we inform?
+ Rep2 = Rep#reply{state = aborted},
+ cancel_reply(ConnData, Rep2, aborted),
+ ok
+ end,
+ prepare_normal_trans(ConnData, Rest, AckList, ReqList,
+ Extra);
+
+
+ aborted ->
+
+ %% -------------------------------------------
+ %%
+ %% Pending limit already exceeded
+ %%
+ %% Cleanup, just to make sure:
+ %% reply record & pending counter
+ %%
+ %% -------------------------------------------
+
+ Rep2 = Rep#reply{state = aborted},
+ cancel_reply(ConnData, Rep2, aborted),
+ prepare_normal_trans(ConnData, Rest, AckList, ReqList,
+ Extra)
+
+ end;
+
+ [#reply{state = waiting_for_ack,
+ bytes = Bin,
+ version = Version} = Rep] ->
+ ?rt3("request resend when waiting for ack"),
+
+ %% We have already sent a reply, but the receiver
+ %% has obviously not got it. Resend the reply but
+ %% don't restart the reply_timer.
+ ConnData2 = ConnData#conn_data{protocol_version = Version},
+ ?report_trace(ConnData2,
+ "re-send trans reply", [T | {bytes, Bin}]),
+ case megaco_messenger_misc:send_message(ConnData2, true, Bin) of
+ {ok, _} ->
+ ok;
+ {error, Reason} ->
+ %% Pass it on to the user (via handle_ack)
+ cancel_reply(ConnData2, Rep, Reason)
+ end,
+ prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra);
+
+ [#reply{state = aborted} = Rep] ->
+ ?rt3("request resend when already in aborted state"),
+
+ %% OTP-4956:
+ %% Already aborted so ignore.
+ %% This furthermore means that the abnoxious user at the
+ %% other end has already been informed (pending-limit
+ %% passed => error descriptor sent), but keeps sending...
+ %%
+ %% Shall we perform a cleanup?
+ cancel_reply(ConnData, Rep, aborted),
+ prepare_normal_trans(ConnData, Rest, AckList, ReqList, Extra)
+ end.
+
+prepare_ack(ConnData, [TA | T], Rest, AckList, ReqList, Extra)
+ when is_record(TA, 'TransactionAck') ->
+ First = TA#'TransactionAck'.firstAck,
+ Last = TA#'TransactionAck'.lastAck,
+ TA2 = TA#'TransactionAck'{lastAck = asn1_NOVALUE},
+ ConnData2 = ConnData#conn_data{serial = First},
+ AckList2 = do_prepare_ack(ConnData2, TA2, AckList),
+ if
+ Last =:= asn1_NOVALUE ->
+ prepare_ack(ConnData, T, Rest, AckList2, ReqList, Extra);
+ First < Last ->
+ TA3 = TA#'TransactionAck'{firstAck = First + 1},
+ prepare_ack(ConnData, [TA3 | T], Rest, AckList2, ReqList, Extra);
+ First =:= Last ->
+ prepare_ack(ConnData, T, Rest, AckList2, ReqList, Extra);
+ First > Last ->
+ %% Protocol violation from the sender of this ack
+ ?report_important(ConnData, "<ERROR> discard trans",
+ [TA, {error, "firstAck > lastAck"}]),
+ prepare_ack(ConnData, T, Rest, AckList2, ReqList, Extra)
+ end;
+prepare_ack(ConnData, [], Rest, AckList, ReqList, Extra) ->
+ prepare_normal_trans(ConnData, Rest, AckList, ReqList, Extra).
+
+do_prepare_ack(ConnData, T, AckList) ->
+ TransId = to_remote_trans_id(ConnData),
+ case megaco_monitor:lookup_reply(TransId) of
+ [] ->
+ %% The reply has already been garbage collected. Ignore.
+ ?report_trace(ConnData, "discard ack (no receiver)", [T]),
+ AckList;
+ [Rep] when Rep#reply.state =:= waiting_for_ack ->
+ %% Don't care about Msg and Rep version diff
+ [{ConnData, Rep, T} | AckList];
+ [_Rep] ->
+ %% Protocol violation from the sender of this ack
+ ?report_important(ConnData, "<ERROR> discard trans",
+ [T, {error, "got ack before reply was sent"}]),
+ AckList
+ end.
+
+
+increment_request_keep_alive_counter(#conn_data{conn_handle = CH}, TransId) ->
+ ?rt1(CH, "increment request keep alive counter", [TransId]),
+ megaco_config:incr_reply_counter(CH, TransId).
+
+create_or_maybe_increment_request_keep_alive_counter(
+ #conn_data{conn_handle = CH}, TransId) ->
+ ?rt1(CH, "create or maybe increment request keep alive counter",
+ [TransId]),
+ try
+ begin
+ megaco_config:cre_reply_counter(CH, TransId)
+ end
+ catch
+ _:_ ->
+ megaco_config:incr_reply_counter(CH, TransId)
+ end.
+
+
+check_and_maybe_create_pending_limit(infinity, _, _) ->
+ ok;
+check_and_maybe_create_pending_limit(Limit, Direction, TransId) ->
+ ?rt2("check and maybe create pending limit counter",
+ [Limit, Direction, TransId]),
+ try megaco_config:get_pending_counter(Direction, TransId) of
+ Val when Val =< Limit ->
+ %% Since we have no intention to increment here, it
+ %% is ok to be _at_ the limit
+ ok;
+ _ ->
+ aborted
+ catch
+ _:_ ->
+ %% Has not been created yet (connect).
+ megaco_config:cre_pending_counter(Direction, TransId, 0),
+ ok
+ end.
+
+%% check_and_maybe_create_pending_limit(infinity, _, _) ->
+%% ok;
+%% check_and_maybe_create_pending_limit(Limit, Direction, TransId) ->
+%% case (catch megaco_config:get_pending_counter(Direction, TransId)) of
+%% {'EXIT', _} ->
+%% %% Has not been created yet (connect).
+%% megaco_config:cre_pending_counter(Direction, TransId, 0),
+%% ok;
+%% Val when Val =< Limit ->
+%% %% Since we have no intention to increment here, it
+%% %% is ok to be _at_ the limit
+%% ok;
+%% _ ->
+%% aborted
+%% end.
+
+
+check_pending_limit(infinity, _, _) ->
+ {ok, 0};
+check_pending_limit(Limit, Direction, TransId) ->
+ ?rt2("check pending limit", [Direction, Limit, TransId]),
+ try megaco_config:get_pending_counter(Direction, TransId) of
+ Val when Val =< Limit ->
+ %% Since we have no intention to increment here, it
+ %% is ok to be _at_ the limit
+ ?rt2("check pending limit - ok", [Val]),
+ {ok, Val};
+ _Val ->
+ ?rt2("check pending limit - aborted", [_Val]),
+ aborted
+ catch
+ _:_ ->
+ %% This function is only called when we "know" the
+ %% counter to exist. So, the only reason that this
+ %% would happen is of the counter has been removed.
+ %% This only happen if the pending limit has been
+ %% reached. In any case, this is basically the same
+ %% as aborted!
+ ?rt2("check pending limit - exit", []),
+ aborted
+ end.
+
+%% check_pending_limit(infinity, _, _) ->
+%% {ok, 0};
+%% check_pending_limit(Limit, Direction, TransId) ->
+%% ?rt2("check pending limit", [Direction, Limit, TransId]),
+%% case (catch megaco_config:get_pending_counter(Direction, TransId)) of
+%% {'EXIT', _} ->
+%% %% This function is only called when we "know" the
+%% %% counter to exist. So, the only reason that this
+%% %% would happen is of the counter has been removed.
+%% %% This only happen if the pending limit has been
+%% %% reached. In any case, this is basically the same
+%% %% as aborted!
+%% ?rt2("check pending limit - exit", []),
+%% aborted;
+%% Val when Val =< Limit ->
+%% %% Since we have no intention to increment here, it
+%% %% is ok to be _at_ the limit
+%% ?rt2("check pending limit - ok", [Val]),
+%% {ok, Val};
+%% _Val ->
+%% ?rt2("check pending limit - aborted", [_Val]),
+%% aborted
+%% end.
+
+
+check_and_maybe_incr_pending_limit(infinity, _, _) ->
+ ok;
+check_and_maybe_incr_pending_limit(Limit, Direction, TransId) ->
+ %%
+ %% We need this kind of test to detect when we _pass_ the limit
+ %%
+ ?rt2("check and maybe incr pending limit", [Direction, Limit, TransId]),
+ try megaco_config:get_pending_counter(Direction, TransId) of
+ Val when Val > Limit ->
+ ?rt2("check and maybe incr - aborted", [Direction, Val, Limit]),
+ aborted; % Already passed the limit
+ Val ->
+ ?rt2("check and maybe incr - incr", [Direction, Val, Limit]),
+ megaco_config:incr_pending_counter(Direction, TransId),
+ if
+ Val < Limit ->
+ ok; % Still within the limit
+ true ->
+ ?rt2("check and maybe incr - error",
+ [Direction, Val, Limit]),
+ error % Passed the limit
+ end
+ catch
+ _:_ ->
+ %% Has not been created yet (connect).
+ megaco_config:cre_pending_counter(Direction, TransId, 1),
+ ok
+ end.
+
+
+%% check_and_maybe_incr_pending_limit(infinity, _, _) ->
+%% ok;
+%% check_and_maybe_incr_pending_limit(Limit, Direction, TransId) ->
+%% %%
+%% %% We need this kind of test to detect when we _pass_ the limit
+%% %%
+%% ?rt2("check and maybe incr pending limit", [Direction, Limit, TransId]),
+%% case (catch megaco_config:get_pending_counter(Direction, TransId)) of
+%% {'EXIT', _} ->
+%% %% Has not been created yet (connect).
+%% megaco_config:cre_pending_counter(Direction, TransId, 1),
+%% ok;
+%% Val when Val > Limit ->
+%% ?rt2("check and maybe incr - aborted", [Direction, Val, Limit]),
+%% aborted; % Already passed the limit
+%% Val ->
+%% ?rt2("check and maybe incr - incr", [Direction, Val, Limit]),
+%% megaco_config:incr_pending_counter(Direction, TransId),
+%% if
+%% Val < Limit ->
+%% ok; % Still within the limit
+%% true ->
+%% ?rt2("check and maybe incr - error",
+%% [Direction, Val, Limit]),
+%% error % Passed the limit
+%% end
+%% end.
+
+
+%% BUGBUG BUGBUG BUGBUG
+%%
+%% Do we know that the Rep is still valid? A previous transaction
+%% could have taken a lot of time.
+%%
+handle_request({ConnData, TransId, T}, Extra) ->
+ case handle_request(ConnData, TransId, T, Extra) of
+ {pending, _RequestData} ->
+ handle_long_request(ConnData, TransId, T, Extra);
+ Else ->
+ Else
+ end.
+
+handle_request(ConnData, TransId, T, Extra) ->
+ ?report_trace(ConnData, "handle request", [TransId, T]),
+
+ %% Pending limit:
+ %% Ok, before we begin, lets check that this request
+ %% has not been aborted. I.e. exceeded the pending
+ %% limit, so go check it...
+
+ #conn_data{sent_pending_limit = Limit} = ConnData,
+
+ case check_and_maybe_create_pending_limit(Limit, sent, TransId) of
+ ok ->
+ %% Ok so far, now update state
+ case megaco_monitor:update_reply_field(TransId,
+ #reply.state,
+ eval_request) of
+ true ->
+ Actions = T#'TransactionRequest'.actions,
+ {AckAction, SendReply} =
+ handle_request_callback(ConnData, TransId, Actions,
+ T, Extra),
+
+ %% Next step, while we where in the callback function,
+ %% the pending limit could have been exceeded, so check
+ %% it again...
+ do_handle_request(AckAction, SendReply,
+ ConnData, TransId);
+
+ false ->
+ %% Ugh?
+ ignore
+ end;
+
+ aborted ->
+ %% Pending limit
+ %% Already exceeded the limit
+ %% The user does not yet know about this request, so
+ %% don't bother telling that it has been aborted...
+ %% Furthermore, the reply timer has not been started,
+ %% so do the cleanup now
+ ?rt1(ConnData, "pending limit already passed", [TransId]),
+ case megaco_monitor:lookup_reply(TransId) of
+ [Rep] ->
+ cancel_reply(ConnData, Rep, aborted);
+ _ ->
+ ok
+ end,
+ ignore
+ end.
+
+do_handle_request(_, ignore, _ConnData, _TransId) ->
+ ?rt1(_ConnData, "ignore: don't reply", [_TransId]),
+ ignore;
+do_handle_request(_, ignore_trans_request, ConnData, TransId) ->
+ ?rt1(ConnData, "ignore trans request: don't reply", [TransId]),
+ case megaco_monitor:lookup_reply(TransId) of
+ [#reply{} = Rep] ->
+ cancel_reply(ConnData, Rep, ignore);
+ _ ->
+ ignore
+ end;
+do_handle_request({pending, _RequestData}, {aborted, ignore}, _, _) ->
+ ?rt2("handle request: pending - aborted - ignore => don't reply", []),
+ ignore;
+do_handle_request({pending, _RequestData}, {aborted, _SendReply}, _, _) ->
+ ?rt2("handle request: pending - aborted => don't reply", []),
+ ignore;
+do_handle_request({pending, RequestData}, _SendReply, _ConnData, _) ->
+ ?rt2("handle request: pending", [RequestData]),
+ {pending, RequestData};
+do_handle_request(AckAction, {ok, Bin}, ConnData, TransId)
+ when is_binary(Bin) ->
+ ?rt1(ConnData, "handle request - ok", [AckAction, TransId]),
+ case megaco_monitor:lookup_reply(TransId) of
+ [#reply{pending_timer_ref = PendingRef} = Rep] ->
+
+ #conn_data{reply_timer = InitTimer,
+ conn_handle = ConnHandle} = ConnData,
+
+ %% Pending limit update:
+ %% - Cancel the pending timer, if running
+ %% - Delete the pending counter
+ %%
+
+ megaco_monitor:cancel_apply_after(PendingRef),
+ megaco_config:del_pending_counter(sent, TransId),
+
+ Method = timer_method(AckAction),
+ {WaitFor, CurrTimer} = megaco_timer:init(InitTimer),
+ OptBin = opt_garb_binary(CurrTimer, Bin),
+ M = ?MODULE,
+ F = reply_timeout,
+ A = [ConnHandle, TransId, CurrTimer],
+ Ref = megaco_monitor:apply_after(Method, M, F, A,
+ WaitFor),
+ Rep2 = Rep#reply{pending_timer_ref = undefined,
+ handler = undefined,
+ bytes = OptBin,
+ state = waiting_for_ack,
+ timer_ref = Ref,
+ ack_action = AckAction},
+ megaco_monitor:insert_reply(Rep2), % Timing problem?
+ ignore;
+
+ _ ->
+ %% Been removed already?
+ ignore
+ end;
+do_handle_request(AckAction, {ok, {Sent, NotSent}}, ConnData, TransId)
+ when is_list(Sent) andalso is_list(NotSent) ->
+ ?rt1(ConnData, "handle request - ok [segmented reply]",
+ [AckAction, TransId]),
+
+ case megaco_monitor:lookup_reply(TransId) of
+ [#reply{pending_timer_ref = PendingRef} = Rep] ->
+
+ %% d("do_handle_request -> found reply record:"
+ %% "~n Rep: ~p", [Rep]),
+
+ #conn_data{reply_timer = InitTimer,
+ conn_handle = ConnHandle} = ConnData,
+
+ %% Pending limit update:
+ %% - Cancel the pending timer, if running
+ %% - Delete the pending counter
+ %%
+
+ megaco_monitor:cancel_apply_after(PendingRef),
+ megaco_config:del_pending_counter(sent, TransId),
+
+ Method = timer_method(AckAction),
+ {WaitFor, CurrTimer} = megaco_timer:init(InitTimer),
+ Garb = fun(Bin) -> opt_garb_binary(CurrTimer, Bin) end,
+ OptBins = [{SN, Garb(Bin), undefined} || {SN, Bin} <- Sent],
+
+ M = ?MODULE,
+ F = reply_timeout,
+ A = [ConnHandle, TransId, CurrTimer],
+ Ref = megaco_monitor:apply_after(Method, M, F, A, WaitFor),
+
+ Rep2 = Rep#reply{pending_timer_ref = undefined,
+ handler = undefined,
+ bytes = OptBins,
+ state = waiting_for_ack,
+ timer_ref = Ref,
+ ack_action = AckAction,
+ segments = NotSent},
+ megaco_monitor:insert_reply(Rep2), % Timing problem?
+
+ ignore;
+ _ ->
+ %% Been removed already?
+ ignore
+ end;
+do_handle_request(_, {error, aborted}, ConnData, TransId) ->
+ ?report_trace(ConnData, "aborted during our absence", [TransId]),
+ case megaco_monitor:lookup_reply(TransId) of
+ [Rep] ->
+ cancel_reply(ConnData, Rep, aborted);
+ _ ->
+ ok
+ end,
+ ignore;
+do_handle_request(AckAction, {error, Reason}, ConnData, TransId) ->
+ ?report_trace(ConnData, "error", [TransId, Reason]),
+ case megaco_monitor:lookup_reply(TransId) of
+ [Rep] ->
+ Rep2 = Rep#reply{state = waiting_for_ack,
+ ack_action = AckAction},
+ cancel_reply(ConnData, Rep2, Reason);
+ _ ->
+ ok
+ end,
+ ignore;
+do_handle_request(AckAction, SendReply, ConnData, TransId) ->
+ ?report_trace(ConnData, "unknown send trans reply result",
+ [TransId, AckAction, SendReply]),
+ ignore.
+
+
+handle_requests([{ConnData, TransId, T} | Rest], Pending, Extra) ->
+ ?rt2("handle requests", [TransId]),
+ case handle_request(ConnData, TransId, T, Extra) of
+ {pending, RequestData} ->
+ handle_requests(Rest, [{ConnData,TransId,RequestData} | Pending], Extra);
+ _ ->
+ handle_requests(Rest, Pending, Extra)
+ end;
+handle_requests([], Pending, _Extra) ->
+ ?rt2("handle requests - done", [Pending]),
+ Pending.
+
+%% opt_garb_binary(timeout, _Bin) -> garb_binary; % Need msg at restart of timer
+opt_garb_binary(_Timer, Bin) -> Bin.
+
+timer_method(discard_ack) ->
+ apply_method;
+timer_method(_) ->
+ spawn_method.
+
+
+handle_long_request({ConnData, TransId, RequestData}, Extra) ->
+
+ ?rt2("handle long request", [TransId, RequestData]),
+
+ %% Pending limit:
+ %% We need to check the pending limit, in case it was
+ %% exceeded before we got this far...
+ %% We dont need to be able to create the counter here,
+ %% since that was done in the handle_request function.
+
+ #conn_data{sent_pending_limit = Limit} = ConnData,
+
+ case check_pending_limit(Limit, sent, TransId) of
+ {ok, _} ->
+ handle_long_request(ConnData, TransId, RequestData, Extra);
+ _ ->
+ %% Already exceeded the limit
+ ignore
+ end.
+
+handle_long_request(ConnData, TransId, RequestData, Extra) ->
+ ?report_trace(ConnData, "callback: trans long request",
+ [TransId, {request_data, RequestData}]),
+
+ %% Attempt to update the handler field for this reply record
+ %% (if there is one).
+ case megaco_monitor:update_reply_field(TransId, #reply.handler, self()) of
+ true ->
+ {AckAction, Res} =
+ handle_long_request_callback(ConnData, TransId,
+ RequestData, Extra),
+ do_handle_long_request(AckAction, Res, ConnData, TransId);
+ false ->
+ %% Been removed already?
+ ignore
+ end.
+
+
+do_handle_long_request(AckAction, {ok, Bin}, ConnData, TransId) ->
+ case megaco_monitor:lookup_reply_field(TransId, #reply.trans_id) of
+ {ok, _} ->
+ Method = timer_method(AckAction),
+ InitTimer = ConnData#conn_data.reply_timer,
+ {WaitFor, CurrTimer} = megaco_timer:init(InitTimer),
+ OptBin = opt_garb_binary(CurrTimer, Bin),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ M = ?MODULE,
+ F = reply_timeout,
+ A = [ConnHandle, TransId, CurrTimer],
+ Ref = megaco_monitor:apply_after(Method, M, F, A, WaitFor),
+ NewFields =
+ [{#reply.bytes, OptBin},
+ {#reply.state, waiting_for_ack},
+ {#reply.timer_ref, Ref},
+ {#reply.ack_action, AckAction}],
+ megaco_monitor:update_reply_fields(TransId, NewFields); % Timing problem?
+ _ ->
+ %% Been removed already?
+ ignore
+ end;
+do_handle_long_request(_, {error, Reason}, ConnData, TransId) ->
+ ?report_trace(ConnData, "send trans reply", [TransId, {error, Reason}]),
+ ignore.
+
+handle_request_abort_callback(ConnData, TransId, Pid) ->
+ Extra = ?default_user_callback_extra,
+ handle_request_abort_callback(ConnData, TransId, Pid, Extra).
+
+handle_request_abort_callback(ConnData, TransId, Pid, Extra) ->
+ ?report_trace(ConnData, "callback: trans request aborted", [TransId, Pid]),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ Version = ConnData#conn_data.protocol_version,
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ Serial = TransId#trans_id.serial,
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ConnHandle, Version, Serial, Pid | UserArgs];
+ _ ->
+ [ConnHandle, Version, Serial, Pid, Extra | UserArgs]
+ end,
+ Res = (catch apply(UserMod, handle_trans_request_abort, Args)),
+ ?report_debug(ConnData, "return: trans request aborted",
+ [TransId, {return, Res}]),
+ case Res of
+ ok ->
+ ok;
+ _ ->
+ warning_msg("transaction request abort callback failed: ~w",
+ [Res]),
+ ok
+ end.
+
+handle_request_callback(ConnData, TransId, Actions, T, Extra) ->
+ ?report_trace(ConnData, "callback: trans request", [T]),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ Version = ConnData#conn_data.protocol_version,
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ConnHandle, Version, Actions | UserArgs];
+ _ ->
+ [ConnHandle, Version, Actions, Extra | UserArgs]
+ end,
+ Res = (catch apply(UserMod, handle_trans_request, Args)),
+ ?report_debug(ConnData, "return: trans request", [T, {return, Res}]),
+ case Res of
+ ignore -> %% NOTE: Only used for testing!!
+ {discard_ack, ignore};
+
+ ignore_trans_request ->
+ {discard_ack, ignore_trans_request};
+
+ {discard_ack, Replies} when is_list(Replies) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], asn1_NOVALUE),
+ {discard_ack, SendReply};
+ {discard_ack, Error} when is_record(Error, 'ErrorDescriptor') ->
+ Reply = {transactionError, Error},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], asn1_NOVALUE),
+ {discard_ack, SendReply};
+ {discard_ack, Replies, SendOpts} when is_list(Replies) andalso
+ is_list(SendOpts) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, asn1_NOVALUE),
+ {discard_ack, SendReply};
+ {discard_ack, Error, SendOpts}
+ when is_record(Error, 'ErrorDescriptor') andalso
+ is_list(SendOpts) ->
+ Reply = {transactionError, Error},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, asn1_NOVALUE),
+ {discard_ack, SendReply};
+
+ {{handle_pending_ack, AckData}, Replies} when is_list(Replies) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], when_pending_sent),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_pending_ack, AckData}, Error}
+ when is_record(Error, 'ErrorDescriptor') ->
+ Reply = {transactionError, Error},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], when_pending_sent),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_pending_ack, AckData}, Replies, SendOpts}
+ when is_list(Replies) andalso
+ is_list(SendOpts) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, when_pending_sent),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_pending_ack, AckData}, Error, SendOpts}
+ when is_record(Error, 'ErrorDescriptor') andalso
+ is_list(SendOpts) ->
+ Reply = {transactionError, Error},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, when_pending_sent),
+ {{handle_ack, AckData}, SendReply};
+
+ {{handle_ack, AckData}, Replies} when is_list(Replies) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], 'NULL'),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_ack, AckData}, Error}
+ when is_record(Error, 'ErrorDescriptor') ->
+ Reply = {transactionError, Error},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], 'NULL'),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_ack, AckData}, Replies, SendOpts}
+ when is_list(Replies) andalso
+ is_list(SendOpts) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, 'NULL'),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_ack, AckData}, Error, SendOpts}
+ when is_record(Error, 'ErrorDescriptor') andalso
+ is_list(SendOpts) ->
+ Reply = {transactionError, Error},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, 'NULL'),
+ {{handle_ack, AckData}, SendReply};
+
+ {{handle_sloppy_ack, AckData}, Replies} when is_list(Replies) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], asn1_NOVALUE),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_sloppy_ack, AckData}, Error}
+ when is_record(Error, 'ErrorDescriptor') ->
+ Reply = {transactionError, Error},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], asn1_NOVALUE),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_sloppy_ack, AckData}, Replies, SendOpts}
+ when is_list(Replies) andalso
+ is_list(SendOpts) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, asn1_NOVALUE),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_sloppy_ack, AckData}, Error, SendOpts}
+ when is_record(Error, 'ErrorDescriptor') andalso
+ is_list(SendOpts) ->
+ Reply = {transactionError, Error},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, asn1_NOVALUE),
+ {{handle_ack, AckData}, SendReply};
+
+ {pending, RequestData} ->
+ %% The user thinks that this request will take
+ %% quite a while to evaluate. Maybe respond with
+ %% a pending trans (depends on the pending limit)
+ SendReply = maybe_send_pending(ConnData, TransId),
+ {{pending, RequestData}, SendReply};
+
+ Error ->
+ ErrorText = atom_to_list(UserMod),
+ ED = #'ErrorDescriptor'{
+ errorCode = ?megaco_internal_gateway_error,
+ errorText = ErrorText},
+ ?report_important(ConnData,
+ "callback: <ERROR> trans request",
+ [ED, {error, Error}]),
+ error_msg("transaction request callback failed: ~w", [Error]),
+ Reply = {transactionError, ED},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], asn1_NOVALUE),
+ {discard_ack, SendReply}
+ end.
+
+handle_long_request_callback(ConnData, TransId, RequestData, Extra) ->
+ ?report_trace(ConnData, "callback: trans long request", [RequestData]),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ Version = ConnData#conn_data.protocol_version,
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ConnHandle, Version, RequestData | UserArgs];
+ _ ->
+ [ConnHandle, Version, RequestData, Extra | UserArgs]
+ end,
+ Res = (catch apply(UserMod, handle_trans_long_request, Args)),
+ ?report_debug(ConnData, "return: trans long request",
+ [{request_data, RequestData}, {return, Res}]),
+ case Res of
+ ignore ->
+ {discard_ack, ignore};
+
+ {discard_ack, Replies} when is_list(Replies) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], asn1_NOVALUE),
+ {discard_ack, SendReply};
+ {discard_ack, Replies, SendOpts} when is_list(Replies) andalso
+ is_list(SendOpts) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, asn1_NOVALUE),
+ {discard_ack, SendReply};
+
+ {{handle_ack, AckData}, Replies} when is_list(Replies) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], 'NULL'),
+ {{handle_ack, AckData}, SendReply};
+ {{handle_ack, AckData}, Replies, SendOpts} when is_list(Replies)
+ andalso
+ is_list(SendOpts) ->
+ Reply = {actionReplies, Replies},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ SendOpts, 'NULL'),
+ {{handle_ack, AckData}, SendReply};
+
+ Error ->
+ ErrorText = atom_to_list(UserMod),
+ ED = #'ErrorDescriptor'{errorCode = ?megaco_internal_gateway_error,
+ errorText = ErrorText},
+ ?report_important(ConnData, "callback: <ERROR> trans long request",
+ [ED, {error, Error}]),
+ error_msg("long transaction request callback failed: ~w", [Error]),
+ Reply = {transactionError, ED},
+ SendReply = maybe_send_reply(ConnData, TransId, Reply,
+ [], asn1_NOVALUE),
+ {discard_ack, SendReply}
+ end.
+
+handle_pending(ConnData, T, Extra) ->
+ TransId = to_local_trans_id(ConnData),
+ ?rt2("handle pending", [T, TransId]),
+ case megaco_monitor:lookup_request(TransId) of
+ [Req] ->
+
+ %% ------------------------------------------
+ %%
+ %% Check received pending limit
+ %%
+ %% ------------------------------------------
+
+ Limit = ConnData#conn_data.recv_pending_limit,
+ case check_and_maybe_incr_pending_limit(Limit,
+ recv, TransId) of
+
+ ok ->
+ %% ----------------------------------------------------
+ %%
+ %% Received pending limit not exceeded
+ %%
+ %% ----------------------------------------------------
+
+ handle_recv_pending(ConnData, TransId, Req, T);
+
+ error ->
+ %% ----------------------------------------------------
+ %%
+ %% Received pending limit exceeded
+ %%
+ %% Time to give up on this transaction
+ %% 1) Delete request record
+ %% 2) Cancel timers
+ %% 3) Delete the (receive) pending counter
+ %% 4) Inform the user (handle_trans_reply)
+ %%
+ %% ----------------------------------------------------
+
+ handle_recv_pending_error(ConnData, TransId, Req, T, Extra);
+
+
+ aborted ->
+ %% ----------------------------------------------------
+ %%
+ %% Received pending limit already exceeded
+ %%
+ %% BMK BMK BMK -- can this really happen?
+ %%
+ %% The user has already been notified about this
+ %% (see error above)
+ %%
+ %% ----------------------------------------------------
+
+ ok
+
+ end;
+
+ [] ->
+ ?report_trace(ConnData, "remote pending (no receiver)", [T]),
+ return_unexpected_trans(ConnData, T, Extra)
+ end.
+
+handle_recv_pending(#conn_data{long_request_resend = LRR,
+ conn_handle = ConnHandle} = ConnData,
+ TransId,
+ #request{timer_ref = {short, Ref},
+ init_long_timer = InitTimer}, T) ->
+
+ ?rt2("handle pending - long request", [LRR, InitTimer]),
+
+ %% The request seems to take a while,
+ %% let's reset our transmission timer.
+ %% We now know the other side has got
+ %% the request and is working on it,
+ %% so there is no need to keep the binary
+ %% message for re-transmission.
+
+ %% Start using the long timer.
+ %% We can now drop the "bytes", since we will
+ %% not resend from now on.
+
+ megaco_monitor:cancel_apply_after(Ref),
+ {WaitFor, CurrTimer} = megaco_timer:init(InitTimer),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ M = ?MODULE,
+ F = request_timeout,
+ A = [ConnHandle, TransId],
+ Ref2 = megaco_monitor:apply_after(M, F, A, WaitFor),
+ NewFields =
+ case LRR of
+ true ->
+ [{#request.timer_ref, {long, Ref2}},
+ {#request.curr_timer, CurrTimer}];
+ false ->
+ [{#request.bytes, {no_send, garb_binary}},
+ {#request.timer_ref, {long, Ref2}},
+ {#request.curr_timer, CurrTimer}]
+ end,
+ ?report_trace(ConnData, "trans pending (timer restarted)", [T]),
+ megaco_monitor:update_request_fields(TransId, NewFields); % Timing problem?
+
+handle_recv_pending(_ConnData, _TransId,
+ #request{timer_ref = {long, _Ref},
+ curr_timer = timeout}, _T) ->
+
+ ?rt3("handle pending - timeout"),
+
+ %% The request seems to take a while,
+ %% let's reset our transmission timer.
+ %% We now know the other side has got
+ %% the request and is working on it,
+ %% so there is no need to keep the binary
+ %% message for re-transmission.
+
+ %% This can happen if the timer is running for the last
+ %% time. I.e. next time it expires, will be the last.
+ %% Therefor we really do not need to do anything here.
+ %% The cleanup will be done in request_timeout.
+
+ ok;
+
+handle_recv_pending(#conn_data{conn_handle = ConnHandle} = ConnData, TransId,
+ #request{timer_ref = {long, Ref},
+ curr_timer = CurrTimer}, T) ->
+
+ ?rt2("handle pending - still waiting", [CurrTimer]),
+
+ %% The request seems to take a while,
+ %% let's reset our transmission timer.
+ %% We now know the other side has got
+ %% the request and is working on it,
+ %% so there is no need to keep the binary
+ %% message for re-transmission.
+
+ %% We just need to recalculate the timer, i.e.
+ %% increment the timer (one "slot" has been consumed).
+
+ megaco_monitor:cancel_apply_after(Ref),
+ {WaitFor, Timer2} = megaco_timer:restart(CurrTimer),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ M = ?MODULE,
+ F = request_timeout,
+ A = [ConnHandle, TransId],
+ Ref2 = megaco_monitor:apply_after(M, F, A, WaitFor),
+ NewFields =
+ [{#request.timer_ref, {long, Ref2}},
+ {#request.curr_timer, Timer2}],
+ ?report_trace(ConnData,
+ "long trans pending"
+ " (timer restarted)", [T]),
+ %% Timing problem?
+ megaco_monitor:update_request_fields(TransId, NewFields).
+
+
+handle_recv_pending_error(ConnData, TransId, Req, T, Extra) ->
+ %% 1) Delete the request record
+ megaco_monitor:delete_request(TransId),
+
+ %% 2) Possibly cancel the timer
+ case Req#request.timer_ref of
+ {_, Ref} ->
+ megaco_monitor:cancel_apply_after(Ref);
+ _ ->
+ ok
+ end,
+
+ %% 3) Delete the (receive) pending counter
+ megaco_config:del_pending_counter(recv, TransId),
+
+ %% 4) Inform the user that his/her request reached
+ %% the receive pending limit
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ UserReply = {error, exceeded_recv_pending_limit},
+ ConnData2 = ConnData#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+
+ ?report_trace(ConnData, "receive pending limit reached", [T]),
+ return_reply(ConnData2, TransId, UserReply, Extra).
+
+
+%%
+%% This _is_ a segmented message.
+%%
+%% Since this is not the last segment, we shall not send any ack.
+%% (even if three-way-handshake has been configured).
+%%
+handle_reply(
+ ConnData,
+ #megaco_transaction_reply{segmentNumber = SN,
+ segmentationComplete = asn1_NOVALUE} = T, Extra)
+ when is_integer(SN) ->
+ TransId = to_local_trans_id(ConnData),
+ ?rt2("handle segmented reply", [T, TransId, SN]),
+ case megaco_monitor:lookup_request(TransId) of
+
+ %% ---------------------------------------------------------
+ %% The first segment, so stop the request timer. No longer
+ %% needed when the segment(s) start to arrive.
+
+ [#request{timer_ref = {_Type, Ref},
+ seg_recv = [],
+ seg_timer_ref = undefined} = Req] ->
+
+ %% Don't care about Req and Rep version diff
+ ?report_trace(ConnData, "[segmented] trans reply - first seg",
+ [T]),
+
+ %% Stop the request timer
+ megaco_monitor:cancel_apply_after(Ref), %% OTP-4843
+
+ %% Acknowledge the segment
+ send_segment_reply(ConnData, SN),
+
+ %% First segment for this reply
+ NewFields =
+ [{#request.timer_ref, undefined},
+ {#request.seg_recv, [SN]}],
+ megaco_monitor:update_request_fields(TransId, NewFields),
+
+ %% Handle the reply
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ UserReply =
+ case T#megaco_transaction_reply.transactionResult of
+ {transactionError, Reason} ->
+ {error, {SN, false, Reason}};
+ {actionReplies, Replies} ->
+ {ok, {SN, false, Replies}}
+ end,
+ ConnData2 = ConnData#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(ConnData2, TransId, UserReply, Extra);
+
+
+ %% ---------------------------------------------------------
+ %% This is not the first segment.
+ %% The segment timer has not been started, so the last
+ %% segment have been received.
+ %% We must check that this is not a re-transmission!
+
+ [#request{seg_recv = Segs,
+ seg_timer_ref = undefined} = Req] ->
+ %% Don't care about Req and Rep version diff
+ ?report_trace(ConnData, "[segmented] trans reply - no seg acc",
+ [T]),
+
+ %% Acknowledge the segment
+ send_segment_reply(ConnData, SN),
+
+ %% Updated/handle received segment
+ case lists:member(SN, Segs) of
+ true ->
+ %% This is a re-transmission, so we shall not pass
+ %% it on to the user (or update the request record).
+ ok;
+ false ->
+ %% First time for this segment
+ megaco_monitor:update_request_field(TransId,
+ #request.seg_recv,
+ [ SN | Segs ]),
+
+ %% Handle the reply
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ UserReply =
+ case T#megaco_transaction_reply.transactionResult of
+ {transactionError, Reason} ->
+ {error, {SN, false, Reason}};
+ {actionReplies, Replies} ->
+ {ok, {SN, false, Replies}}
+ end,
+ ConnData2 = ConnData#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(ConnData2, TransId, UserReply, Extra)
+
+ end;
+
+
+ %% ---------------------------------------------------------
+ %% The segment timer is running!
+ %% This could be the last (out-of-order) segment!
+ %% We must check that this is not a re-transmission!
+
+ [#request{seg_recv = Segs,
+ seg_timer_ref = SegRef} = Req] ->
+ %% Don't care about Req and Rep version diff
+ ?report_trace(ConnData, "[segmented] trans reply - no seg acc",
+ [T]),
+
+ %% Acknowledge the segment
+ send_segment_reply(ConnData, SN),
+
+ %% Updated received segments
+ case lists:member(SN, Segs) of
+ true ->
+ %% This is a re-transmission
+ ok;
+ false ->
+ %% First time for this segment,
+ %% we may now have a complete set
+ Last =
+ case is_all_segments([SN | Segs]) of
+ {true, _Sorted} ->
+ megaco_monitor:cancel_apply_after(SegRef),
+ megaco_monitor:delete_request(TransId),
+ send_ack(ConnData),
+ true;
+ {false, Sorted} ->
+ megaco_monitor:update_request_field(TransId,
+ #request.seg_recv,
+ Sorted),
+ false
+ end,
+
+ %% Handle the reply
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ UserReply =
+ case T#megaco_transaction_reply.transactionResult of
+ {transactionError, Reason} ->
+ {error, {SN, Last, Reason}};
+ {actionReplies, Replies} ->
+ {ok, {SN, Last, Replies}}
+ end,
+ ConnData2 = ConnData#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(ConnData2, TransId, UserReply, Extra)
+
+ end;
+
+
+ [] ->
+ ?report_trace(ConnData, "trans reply (no receiver)", [T]),
+ return_unexpected_trans(ConnData, T, Extra)
+ end;
+
+
+%%
+%% This _is_ a segmented message and it's the last segment of the
+%% message.
+%%
+handle_reply(
+ ConnData,
+ #megaco_transaction_reply{segmentNumber = SN,
+ segmentationComplete = 'NULL'} = T, Extra)
+ when is_integer(SN) ->
+ TransId = to_local_trans_id(ConnData),
+ ?rt2("handle (last) segmented reply", [T, TransId, SN]),
+ case megaco_monitor:lookup_request(TransId) of
+
+ %% ---------------------------------------------------------
+ %% The first segment, so stop the request timer. No longer
+ %% needed when the segment(s) start to arrive.
+
+ [#request{timer_ref = {_Type, Ref},
+ seg_recv = [],
+ seg_timer_ref = undefined} = Req] ->
+
+ %% Don't care about Req and Rep version diff
+ ?report_trace(ConnData, "[segmented] trans reply - "
+ "first/complete seg", [T]),
+
+ %% Stop the request timer
+ megaco_monitor:cancel_apply_after(Ref), %% OTP-4843
+
+ %% Acknowledge the ("last") segment
+ send_segment_reply_complete(ConnData, SN),
+
+ %% It is ofcourse pointless to split
+ %% a transaction into just one segment,
+ %% but just to be sure, we handle that
+ %% case also
+ Last =
+ if
+ SN > 1 ->
+ %% More then one segment
+ %% First time for this segment
+ ConnHandle = ConnData#conn_data.conn_handle,
+ InitSegTmr = Req#request.init_seg_timer,
+ {WaitFor, CurrTimer} = megaco_timer:init(InitSegTmr),
+ M = ?MODULE,
+ F = segment_timeout,
+ A = [ConnHandle, TransId, CurrTimer],
+ SegRef =
+ megaco_monitor:apply_after(M, F, A, WaitFor),
+ NewFields =
+ [{#request.timer_ref, undefined},
+ {#request.seg_recv, [SN]},
+ {#request.seg_timer_ref, SegRef}],
+ megaco_monitor:update_request_fields(TransId, NewFields),
+ false;
+ true ->
+ %% Just one segment!
+ megaco_monitor:delete_request(TransId),
+ send_ack(ConnData),
+ true
+ end,
+
+ %% Handle the reply
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ UserReply =
+ case T#megaco_transaction_reply.transactionResult of
+ {transactionError, Reason} ->
+ {error, {SN, Last, Reason}};
+ {actionReplies, Replies} ->
+ {ok, {SN, Last, Replies}}
+ end,
+ ConnData2 = ConnData#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(ConnData2, TransId, UserReply, Extra);
+
+
+ [#request{seg_recv = Segs} = Req] ->
+ %% Don't care about Req and Rep version diff
+ ?report_trace(ConnData, "[segmented] trans reply - no seg acc",
+ [T]),
+
+ %% Acknowledge the ("last") segment
+ send_segment_reply_complete(ConnData, SN),
+
+ %% Updated received segments
+ %% This is _probably_ the last segment, but some of
+ %% the previous segments may have been lost, so we
+ %% may not have a complete set!
+ case lists:member(SN, Segs) of
+ true ->
+ %% This is a re-transmission
+ ok;
+ false ->
+ Last =
+ case is_all_segments([SN | Segs]) of
+ {true, _Sorted} ->
+ ?report_trace(ConnData,
+ "[segmented] trans reply - "
+ "complete set", [T]),
+ megaco_monitor:delete_request(TransId),
+ send_ack(ConnData),
+ true;
+ {false, Sorted} ->
+ ConnHandle = ConnData#conn_data.conn_handle,
+ InitSegTmr = Req#request.init_seg_timer,
+ {WaitFor, CurrTimer} =
+ megaco_timer:init(InitSegTmr),
+ M = ?MODULE,
+ F = segment_timeout,
+ A = [ConnHandle, TransId, CurrTimer],
+ SegRef =
+ megaco_monitor:apply_after(M, F, A,
+ WaitFor),
+ NewFields =
+ [{#request.seg_recv, Sorted},
+ {#request.seg_timer_ref, SegRef}],
+ megaco_monitor:update_request_fields(TransId, NewFields),
+ false
+ end,
+
+ %% Handle the reply
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ UserReply =
+ case T#megaco_transaction_reply.transactionResult of
+ {transactionError, Reason} ->
+ {error, {SN, Last, Reason}};
+ {actionReplies, Replies} ->
+ {ok, {SN, Last, Replies}}
+ end,
+ ConnData2 = ConnData#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(ConnData2, TransId, UserReply, Extra)
+
+ end;
+
+ [] ->
+ ?report_trace(ConnData, "trans reply (no receiver)", [T]),
+ return_unexpected_trans(ConnData, T, Extra)
+ end;
+
+
+%%
+%% This is _not_ a segmented message,
+%% i.e. it's an ordinary transaction reply
+%%
+handle_reply(#conn_data{conn_handle = CH} = CD, T, Extra) ->
+ TransId = to_local_trans_id(CD),
+ ?rt2("handle reply", [T, TransId]),
+ case megaco_monitor:lookup_request(TransId) of
+ [Req] when (is_record(Req, request) andalso
+ (CD#conn_data.cancel =:= true)) ->
+ ?TC_AWAIT_REPLY_EVENT(true),
+ do_handle_reply_cancel(CD, Req, T);
+
+ [#request{remote_mid = RMid} = Req] when ((RMid =:= preliminary_mid) orelse
+ (RMid =:= CH#megaco_conn_handle.remote_mid)) ->
+ ?TC_AWAIT_REPLY_EVENT(false),
+ %% Just in case conn_data got update after our lookup
+ %% but before we looked up the request record, we
+ %% check the cancel field again.
+ case megaco_config:conn_info(CD, cancel) of
+ true ->
+ do_handle_reply_cancel(CD, Req, T);
+ false ->
+ do_handle_reply(CD, Req, TransId, T, Extra)
+ end;
+
+ [#request{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData,
+ remote_mid = RMid}] ->
+ ?report_trace(CD,
+ "received trans reply with invalid remote mid",
+ [T, RMid]),
+ WrongMid = CH#megaco_conn_handle.remote_mid,
+ T2 = transform_transaction_reply_enc(CD#conn_data.protocol_version,
+ T),
+ UserReply = {error, {wrong_mid, WrongMid, RMid, T2}},
+ CD2 = CD#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(CD2, TransId, UserReply, Extra);
+
+ [] ->
+ ?TC_AWAIT_REPLY_EVENT(undefined),
+ ?report_trace(CD, "trans reply (no receiver)", [T]),
+ return_unexpected_trans(CD, T, Extra)
+ end.
+
+do_handle_reply_cancel(CD, #request{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData}, T) ->
+ CD2 = CD#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_unexpected_trans(CD2, T).
+
+%% Plain old handling of incomming replies
+do_handle_reply(CD,
+ #request{timer_ref = {_Type, Ref}, % OTP-4843
+ user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData,
+ keep_alive_timer = RKAT},
+ TransId, T, Extra)
+ when ((RKAT =:= plain) orelse (Action =:= call)) ->
+ %% Don't care about Req and Rep version diff
+ ?report_trace(CD, "trans reply", [T]),
+
+ %% This is the first reply (maybe of many)
+ megaco_monitor:delete_request(TransId),
+ megaco_monitor:cancel_apply_after(Ref), % OTP-4843
+ megaco_config:del_pending_counter(recv, TransId), % OTP-7189
+
+ %% Send acknowledgement
+ maybe_send_ack(T#megaco_transaction_reply.immAckRequired, CD),
+
+ UserReply =
+ case T#megaco_transaction_reply.transactionResult of
+ {transactionError, Reason} ->
+ {error, Reason};
+ {actionReplies, Replies} ->
+ {ok, Replies}
+ end,
+ CD2 = CD#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(CD2, TransId, UserReply, Extra);
+
+%% This may be the first reply (of maybe many)
+do_handle_reply(CD,
+ #request{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData,
+ keep_alive_ref = undefined} = Req,
+ TransId, T, Extra) ->
+ %% Don't care about Req and Rep version diff
+ ?report_trace(CD, "trans reply", [T]),
+
+ %% Could be the first reply, in which case we shall start the
+ %% Request Keep Alive timer...
+ %% This could happen for more than one (1) reply though, so
+ %% we need to check if the counter value actually equals one (1)!
+
+ ReplyNo =
+ create_or_maybe_increment_request_keep_alive_counter(CD, TransId),
+ if
+ (ReplyNo =:= 1) ->
+ %% This *is* the first reply!!
+ %% 1) Stop resend timer
+ {_Type, Ref} = Req#request.timer_ref, % OTP-4843
+ megaco_monitor:cancel_apply_after(Ref), % OTP-4843
+
+ %% 2) Delete pending counter
+ megaco_config:del_pending_counter(recv, TransId), % OTP-7189
+
+ %% 3) Start request keep alive timer
+ ConnHandle = CD#conn_data.conn_handle,
+ RKATimer = Req#request.keep_alive_timer,
+ {RKAWaitFor, _} = megaco_timer:init(RKATimer),
+ RKARef = megaco_monitor:apply_after(?MODULE,
+ request_keep_alive_timeout,
+ [ConnHandle, TransId],
+ RKAWaitFor),
+
+ %% 4) Maybe send acknowledgement (three-way-handshake)
+ maybe_send_ack(T#megaco_transaction_reply.immAckRequired, CD),
+
+ %% 5) And finally store the updated request record
+ Req2 = Req#request{keep_alive_ref = RKARef},
+ megaco_monitor:insert_request(Req2);
+
+ true ->
+ ok
+ end,
+
+ UserReply =
+ case T#megaco_transaction_reply.transactionResult of
+ {transactionError, Reason} ->
+ {error, ReplyNo, Reason};
+ {actionReplies, Replies} ->
+ {ok, ReplyNo, Replies}
+ end,
+ CD2 = CD#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(CD2, TransId, UserReply, Extra);
+
+%% This is *not* the first reply (of many)
+do_handle_reply(CD, #request{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData}, TransId, T, Extra) ->
+ %% Don't care about Req and Rep version diff
+ ?report_trace(CD, "trans reply (first reply already delivered)", [T]),
+
+ ReplyNo = increment_request_keep_alive_counter(CD, TransId),
+
+ UserReply =
+ case T#megaco_transaction_reply.transactionResult of
+ {transactionError, Reason} ->
+ {error, ReplyNo, Reason};
+ {actionReplies, Replies} ->
+ {ok, ReplyNo, Replies}
+ end,
+ CD2 = CD#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(CD2, TransId, UserReply, Extra).
+
+is_all_segments(Segs) ->
+ Sorted = lists:sort(Segs),
+ {is_all_segments(Sorted, 1, lists:last(Sorted)), Sorted}.
+
+is_all_segments([Last], Last, Last) ->
+ true;
+is_all_segments([_], _, _) ->
+ false;
+is_all_segments([SN|Segs], SN, Last) when (SN < Last) ->
+ is_all_segments(Segs, SN+1, Last);
+is_all_segments([SN1|_], SN2, _Last) when SN1 =/= SN2 ->
+ false.
+
+
+handle_segment_reply(CD,
+ #'SegmentReply'{transactionId = TransId,
+ segmentNumber = SN,
+ segmentationComplete = SC}, Extra) ->
+ ?rt2("handle segment reply", [{trans_id, TransId},
+ {segment_no, SN},
+ {segmentation_complete, SC}]),
+ TransId2 = to_remote_trans_id(CD#conn_data{serial = TransId}),
+ case megaco_monitor:lookup_reply(TransId2) of
+ [#reply{bytes = Sent,
+ segments = []} = Rep] when is_list(Sent) ->
+ ?rt2("no unsent segments", [Sent]),
+ handle_segment_reply_callback(CD, TransId, SN, SC, Extra),
+ case lists:keysearch(SN, 1, Sent) of
+ {value, {SN, _Bin, SegTmr}} ->
+ megaco_monitor:cancel_apply_after(SegTmr), %% BMK BMK
+ case lists:keydelete(SN, 1, Sent) of
+ [] -> %% We are done
+ Ref = Rep#reply.timer_ref,
+ megaco_monitor:cancel_apply_after(Ref),
+ megaco_monitor:update_reply_field(TransId2,
+ #reply.bytes,
+ []),
+ ok;
+ NewSent ->
+ megaco_monitor:update_reply_field(TransId2,
+ #reply.bytes,
+ NewSent),
+ ok
+ end;
+ _ ->
+ ok
+ end;
+
+ [#reply{bytes = Sent,
+ segments = NotSent}] when is_list(Sent) andalso
+ is_list(NotSent) ->
+ ?rt2("unsent segments", [Sent, NotSent]),
+ handle_segment_reply_callback(CD, TransId, SN, SC, Extra),
+ case lists:keysearch(SN, 1, Sent) of
+ {value, {SN, _Bin, SegTmr}} ->
+ megaco_monitor:cancel_apply_after(SegTmr), %% BMK BMK
+ NewSent = lists:keydelete(SN, 1, Sent),
+ [{SN2, Bin2}|NewNotSent] = NotSent,
+ case send_reply_segment(CD, "send trans reply segment",
+ SN2, Bin2) of
+ {ok, Bin3} ->
+ ?rt2("another segment sent", [Bin3]),
+ NewSent2 = [{SN2, Bin3, undefined}|NewSent],
+ NewFields =
+ [{#reply.bytes, NewSent2},
+ {#reply.segments, NewNotSent}],
+ megaco_monitor:update_reply_fields(TransId2,
+ NewFields),
+ ok;
+ Error ->
+ incNumErrors(CD#conn_data.conn_handle),
+ ?report_important(CD, "failed sending segment",
+ [{segment_no, SN2},
+ {error, Error}]),
+ error_msg("failed sending transaction reply [~w] "
+ "segment [~w]: ~w",
+ [TransId, SN2, Error]),
+ megaco_monitor:update_reply_field(TransId2,
+ #reply.bytes,
+ NewSent),
+ ok
+ end;
+ _ ->
+ ok
+ end;
+
+ [#reply{state = State}] ->
+ %% We received a segment reply for a segmented reply we have
+ %% not yet sent? This is either some sort of race condition
+ %% or the "the other side" is really confused.
+ %% Ignore the message but issue a warning just in case...
+ warning_msg("received unexpected segment reply: "
+ "~n Transaction Id: ~p"
+ "~n Segment Number: ~p"
+ "~n Segmentation Complete: ~p"
+ "~n Reply state: ~p",
+ [TransId2, SN, SC, State]),
+ ignore;
+
+ [] ->
+ ignore
+
+ end.
+
+
+%%
+%% This should be passed on to the user only if the user wish it
+%% (sri = segment reply indication)
+%%
+handle_segment_reply_callback(#conn_data{segment_reply_ind = true,
+ conn_handle = ConnHandle,
+ protocol_version = Version,
+ user_mod = UserMod,
+ user_args = UserArgs},
+ TransId, SN, SC, Extra) ->
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ConnHandle, Version, TransId, SN, SC | UserArgs];
+ _ ->
+ [ConnHandle, Version, TransId, SN, SC, Extra | UserArgs]
+ end,
+ (catch apply(UserMod, handle_segment_reply, Args));
+handle_segment_reply_callback(_CD, _TransId, _SN, _SC, _Extra) ->
+ ok.
+
+
+handle_acks([{ConnData, Rep, T} | Rest], Extra)
+ when Rep#reply.state == waiting_for_ack ->
+ handle_ack(ConnData, ok, Rep, T, Extra),
+ handle_acks(Rest, Extra);
+handle_acks([], _Extra) ->
+ ok.
+
+%% If the reply to which this is the ack was segmented,
+%% then we also need to check that we have received all
+%% the segment-replies. If not, an error callback call
+%% shall be made instead.
+handle_ack(ConnData, AckStatus,
+ #reply{trans_id = TransId,
+ bytes = Bytes,
+ timer_ref = ReplyRef,
+ pending_timer_ref = PendingRef, %% BMK Still running?
+ ack_action = AckAction}, T, Extra)
+ when is_binary(Bytes) orelse (Bytes =:= undefined) ->
+ handle_ack_cleanup(TransId, ReplyRef, PendingRef),
+ handle_ack_callback(ConnData, AckStatus, AckAction, T, Extra);
+
+handle_ack(ConnData, AckStatus,
+ #reply{trans_id = TransId,
+ bytes = [],
+ segments = [],
+ timer_ref = ReplyRef,
+ pending_timer_ref = PendingRef, %% BMK Still running?
+ ack_action = AckAction}, T, Extra) ->
+ handle_ack_cleanup(TransId, ReplyRef, PendingRef),
+ handle_ack_callback(ConnData, AckStatus, AckAction, T, Extra);
+
+handle_ack(ConnData, OrigAckStatus,
+ #reply{trans_id = TransId,
+ bytes = SegSent,
+ segments = NotSent,
+ timer_ref = ReplyRef,
+ pending_timer_ref = PendingRef, %% BMK Still running?
+ ack_action = OrigAckAction}, T, Extra)
+ when is_list(SegSent) andalso is_list(NotSent) ->
+ SN_NotAcked = [SN || {SN, _, _} <- SegSent],
+ SN_NotSent = [SN || {SN, _} <- NotSent],
+ AckStatus = {error, {segment_failure,
+ [{original_ack_status, OrigAckStatus},
+ {segments_not_acked, SN_NotAcked},
+ {segments_not_sent, SN_NotSent}]}},
+ AckAction =
+ case OrigAckAction of
+ {handle_ack, _} ->
+ OrigAckAction;
+ _ ->
+ {handle_ack, segmented_reply}
+ end,
+ cancel_segment_timers(SegSent),
+ handle_ack_cleanup(TransId, ReplyRef, PendingRef),
+ handle_ack_callback(ConnData, AckStatus, AckAction, T, Extra).
+
+handle_ack_cleanup(TransId, ReplyRef, PendingRef) ->
+ megaco_monitor:cancel_apply_after(ReplyRef),
+ megaco_monitor:cancel_apply_after(PendingRef),
+ megaco_monitor:delete_reply(TransId),
+ megaco_config:del_pending_counter(sent, TransId). %% BMK: Still existing?
+
+cancel_segment_timers(SegSent) when is_list(SegSent) ->
+ Cancel = fun({_, _, Ref}) ->
+ megaco_monitor:cancel_apply_after(Ref)
+ end,
+ lists:foreach(Cancel, SegSent);
+cancel_segment_timers(_) ->
+ ok.
+
+handle_ack_callback(_CD, ok = _AS, discard_ack = _AA, _T, _Extra) ->
+ ok;
+handle_ack_callback(ConnData, {error, Reason}, discard_ack = AckAction, T, Extra) ->
+ ?report_trace(ConnData, "handle ack (no callback)",
+ [T, AckAction, {error, Reason}, Extra]);
+handle_ack_callback(ConnData, AckStatus, {handle_ack, AckData}, T, Extra) ->
+ ?report_trace(ConnData, "callback: trans ack", [{ack_data, AckData}]),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ Version = ConnData#conn_data.protocol_version,
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ConnHandle, Version, AckStatus, AckData | UserArgs];
+ _ ->
+ [ConnHandle, Version, AckStatus, AckData, Extra | UserArgs]
+ end,
+ Res = (catch apply(UserMod, handle_trans_ack, Args)),
+ ?report_debug(ConnData, "return: trans ack", [T, AckData, {return, Res}]),
+ case Res of
+ ok ->
+ ok;
+ _ ->
+ warning_msg("transaction ack callback failed: ~w", [Res]),
+ ok
+ end,
+ Res.
+
+
+handle_message_error(ConnData, _Error, _Extra)
+ when ConnData#conn_data.monitor_ref == undefined_monitor_ref ->
+ %% May occur if another process already has setup a
+ %% temporary connection, but the handle_connect callback
+ %% function has not yet returned before the eager MG
+ %% re-sends its initial service change message.
+ ignore;
+handle_message_error(ConnData, Error, Extra) ->
+ ?report_trace(ConnData, "callback: message error", [Error]),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ Version = ConnData#conn_data.protocol_version,
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ConnHandle, Version, Error | UserArgs];
+ _ ->
+ [ConnHandle, Version, Error, Extra | UserArgs]
+ end,
+ Res = (catch apply(UserMod, handle_message_error, Args)),
+ ?report_debug(ConnData, "return: message error", [Error, {return, Res}]),
+ case Res of
+ ok ->
+ ok;
+ _ ->
+ warning_msg("message error callback failed: ~w", [Res]),
+ ok
+ end,
+ Res.
+
+handle_disconnect_callback(ConnData, UserReason)
+ when is_record(ConnData, conn_data) ->
+ ?report_trace(ConnData, "callback: disconnect", [{reason, UserReason}]),
+ ConnHandle = ConnData#conn_data.conn_handle,
+ Version = ConnData#conn_data.protocol_version,
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ Args = [ConnHandle, Version, UserReason | UserArgs],
+ Res = (catch apply(UserMod, handle_disconnect, Args)),
+ ?report_debug(ConnData, "return: disconnect", [{reason, UserReason}, {return, Res}]),
+ case Res of
+ ok ->
+ ok;
+ _ ->
+ warning_msg("disconnect callback failed: ~w", [Res]),
+ ok
+ end,
+ Res.
+
+
+%%----------------------------------------------------------------------
+%% Test "outgoing" messages
+%%----------------------------------------------------------------------
+
+%% test_request/5 -> {MegacoMessage, EncodingRes}
+%%
+%% This function is only intended for testing
+%% (e.g. answer the question: have I constructed a valid action request?)
+%%
+%% It's not exactly the same code as a call to 'call'
+%% or 'cast' but close enough.
+%%
+test_request(ConnHandle, Actions,
+ Version, EncodingMod, EncodingConfig)
+ when is_record(ConnHandle, megaco_conn_handle) and
+ is_integer(Version) andalso is_atom(EncodingMod) ->
+ %% Create a fake conn_data structure
+ ConnData = #conn_data{serial = 1,
+ protocol_version = Version,
+ conn_handle = ConnHandle,
+ auth_data = asn1_NOVALUE,
+ encoding_mod = EncodingMod,
+ encoding_config = EncodingConfig},
+
+ TRs = test_req_compose_transactions(ConnData, Actions),
+ Body = {transactions, TRs},
+ MegaMsg = megaco_messenger_misc:compose_message(ConnData, Version, Body),
+ EncodeRes = megaco_messenger_misc:encode_message(ConnData, MegaMsg),
+ {MegaMsg, EncodeRes}.
+
+
+test_req_compose_transactions(ConnData, [A|_] = ActionsList) when is_list(A) ->
+ LastSerial = ConnData#conn_data.serial,
+ test_req_compose_transactions(LastSerial, lists:reverse(ActionsList), []);
+test_req_compose_transactions(#conn_data{serial = Serial}, Actions) ->
+ TR = #'TransactionRequest'{transactionId = Serial,
+ actions = Actions},
+ [{transactionRequest, TR}].
+
+test_req_compose_transactions(_Serial, [], Acc) ->
+ lists:reverse(Acc);
+test_req_compose_transactions(Serial, [A|As], Acc) ->
+ TR = #'TransactionRequest'{transactionId = Serial,
+ actions = A},
+ test_req_compose_transactions(Serial, As, [{transactionRequest, TR}|Acc]).
+
+
+test_reply(ConnHandle, Version, EncodingMod, EncodingConfig, Error)
+ when is_record(Error, 'ErrorDescriptor') ->
+ Reply = {transactionError, Error},
+ test_reply_encode(ConnHandle, Version, EncodingMod, EncodingConfig, Reply);
+test_reply(ConnHandle, Version, EncodingMod, EncodingConfig, Replies)
+ when is_list(Replies) ->
+ Reply = {actionReplies, Replies},
+ test_reply_encode(ConnHandle, Version, EncodingMod, EncodingConfig, Reply).
+
+test_reply_encode(ConnHandle, Version, EncodingMod, EncodingConfig, Reply) ->
+ ImmAck = asn1_NOVALUE,
+ Serial = 1,
+ %% Create a fake conn_data structure
+ CD = #conn_data{serial = Serial,
+ protocol_version = Version,
+ conn_handle = ConnHandle,
+ auth_data = asn1_NOVALUE,
+ encoding_mod = EncodingMod,
+ encoding_config = EncodingConfig},
+ TR0 = #megaco_transaction_reply{transactionId = Serial,
+ immAckRequired = ImmAck,
+ transactionResult = Reply},
+ TR = megaco_messenger_misc:transform_transaction_reply(CD, TR0),
+ Body = {transactions, [{transactionReply, TR}]},
+ MegaMsg = megaco_messenger_misc:compose_message(CD, Version, Body),
+ EncodeRes = megaco_messenger_misc:encode_message(CD, MegaMsg),
+ {MegaMsg, EncodeRes}.
+
+
+%%----------------------------------------------------------------------
+%% Send (or prepare) outgoing messages
+%%----------------------------------------------------------------------
+
+%% Description:
+%% Encode a list of actions or a list of list of actions for
+%% later sending (using call or cast).
+%%
+%% encode_actions(CH, Acts, Opts) -> {ok, encoded_actions()} | {error, Reason}
+%% CH -> connection_handle()
+%% Acts -> action_reqs() | [action_reqs()]
+%% action_reqs() -> [action_req()]
+%% action_req() -> #'ActionRequest'{}
+%% Opts -> [option()]
+%% option() -> {Tab, Val}
+%% Tag -> atom()
+%% Val -> term()
+%% encoded_actions() -> binary() | [binary()]
+%% Reason -> term()
+encode_actions(CH, [A|_] = ActionsList, Opts)
+ when is_record(CH, megaco_conn_handle) andalso is_list(A) ->
+ (catch encode_multi_actions(CH, ActionsList, Opts));
+
+encode_actions(CH, [A|_] = Actions, Opts)
+ when is_record(CH, megaco_conn_handle) andalso is_tuple(A) ->
+ do_encode_actions(CH, Actions, Opts).
+
+encode_multi_actions(CH, ActionsList, Opts) ->
+ case prepare_req_send_options(CH, Opts) of
+ {ok, CD} ->
+ ActsList = [encode_multi_actions(CD, Acts) || Acts <- ActionsList],
+ {ok, ActsList};
+ Error ->
+ Error
+ end.
+
+encode_multi_actions(CD, Actions) ->
+ case megaco_messenger_misc:encode_actions(CD,
+ "encode multi actions",
+ Actions) of
+ {ok, Bin} ->
+ Bin;
+ Error ->
+ throw(Error)
+ end.
+
+do_encode_actions(CH, Actions, Opts)
+ when is_record(CH, megaco_conn_handle) ->
+ case prepare_req_send_options(CH, Opts) of
+ {ok, CD} ->
+ megaco_messenger_misc:encode_actions(CD,
+ "encode actions",
+ Actions);
+ Error ->
+ Error
+ end.
+
+prepare_req_send_options(CH, Opts) ->
+ case megaco_config:lookup_local_conn(CH) of
+ [CD] ->
+ override_req_send_options(any, CD, Opts);
+ [] ->
+ {error, {not_found, conn_data}}
+ end.
+
+
+call(ConnHandle, Actions, Options) ->
+ case lists:keymember(reply_data, 1, Options) of
+ true ->
+ {error, {bad_option, reply_data}};
+ false ->
+ Self = self(),
+ ProxyFun = fun() -> call_proxy(Self) end,
+ {Proxy, MRef} = erlang:spawn_monitor(ProxyFun),
+ Options2 = [{reply_data, Proxy} | Options],
+ call_or_cast(call, ConnHandle, Actions, Options2, MRef)
+ end.
+
+cast(ConnHandle, Actions, Options) ->
+ call_or_cast(cast, ConnHandle, Actions, Options, undefined).
+
+%% In a transaction there can be several actions, so if the
+%% First element of the Actions list is an ''ActionRequest''
+%% record this a list of ActionRequest's for one Transaction
+%% request. If on the other hand this is not the case, then
+%% the Actions list is assumed to be a list of list of
+%% ActionRequest. That is, action requests for several transactions.
+%% It could also be a binary or a list of binaries (if
+%% the actions has already been encoded).
+call_or_cast(CallOrCast, ConnHandle, [A|_] = Actions, Options, ProxyMon)
+ when is_tuple(A) ->
+ %% Just one transaction
+ case call_or_cast(CallOrCast, ConnHandle, [Actions], Options, ProxyMon) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ {error, Reason};
+ {Version, [Reply]} when is_integer(Version) ->
+ {Version, Reply};
+ {Version, Error} when is_integer(Version) ->
+ {Version, Error}
+ end;
+
+call_or_cast(CallOrCast, ConnHandle, Actions, Options, ProxyMon)
+ when is_binary(Actions) ->
+ %% Just one transaction (although the actions has already been encoded)
+ case call_or_cast(CallOrCast, ConnHandle, [Actions], Options, ProxyMon) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ {error, Reason};
+ {Version, [Reply]} when is_integer(Version) ->
+ {Version, Reply};
+ {Version, Error} when is_integer(Version) ->
+ {Version, Error}
+ end;
+
+call_or_cast(CallOrCast, ConnHandle, ActionsList, Options, ProxyMon)
+ when is_record(ConnHandle, megaco_conn_handle) ->
+ case prepare_req_send_options(CallOrCast,
+ ConnHandle, Options, ActionsList) of
+ {ok, ConnData} ->
+ ?report_trace(ConnData, "call_or_cast - options prepared", []),
+ case encode_requests(ConnData, ActionsList) of
+ {ok, TRs, BinOrBins} ->
+ ?report_trace(ConnData,
+ "call_or_cast - request encoded", []),
+ send_request(ConnData, ConnHandle,
+ TRs, CallOrCast, BinOrBins),
+ case CallOrCast of
+ call ->
+ TransIds = to_local_trans_id(ConnData, TRs),
+ wait_for_reply(ConnData, TransIds, ProxyMon);
+ cast ->
+ ok
+ end;
+ {error, Reason} ->
+ call_proxy_cleanup(ConnData, ProxyMon),
+ Version = ConnData#conn_data.protocol_version,
+ return_error(CallOrCast, Version, {error, Reason})
+ end;
+ {error, Reason} ->
+ call_proxy_cleanup(Options, ProxyMon),
+ return_error(CallOrCast, 1, {error, Reason})
+ end;
+call_or_cast(CallOrCast, ConnHandle, _Actions, Options, ProxyMon) ->
+ call_proxy_cleanup(Options, ProxyMon),
+ return_error(CallOrCast, 1, {error, {bad_megaco_conn_handle, ConnHandle}}).
+
+
+return_error(Action, Version, Error) ->
+ case Action of
+ call -> {Version, Error};
+ cast -> Error
+ end.
+
+wait_for_reply(CD, TransIds, ProxyMon) ->
+ ProxyPid = CD#conn_data.reply_data,
+ ProxyPid ! {go, self(), CD, TransIds},
+ receive
+ {reply, ProxyPid, Reply} ->
+ erlang:demonitor(ProxyMon, [flush]),
+ Reply;
+ {'DOWN', ProxyMon, process, ProxyPid, Info} ->
+ UserReply = {error, {call_proxy_crash, Info}},
+ {CD#conn_data.protocol_version, UserReply}
+ end.
+
+
+call_proxy_cleanup(#conn_data{reply_data = ProxyPid}, ProxyMon) ->
+ do_call_proxy_cleanup(ProxyPid, ProxyMon);
+call_proxy_cleanup(Options, ProxyMon) when is_list(Options) ->
+ ProxyPid =
+ case lists:keysearch(reply_data, 1, Options) of
+ {value, {reply_data, Data}} ->
+ Data;
+ _ ->
+ undefined
+ end,
+ do_call_proxy_cleanup(ProxyPid, ProxyMon);
+call_proxy_cleanup(ProxyPid, ProxyMon) ->
+ do_call_proxy_cleanup(ProxyPid, ProxyMon).
+
+do_call_proxy_cleanup(ProxyPid, ProxyMon) ->
+ maybe_demonitor(ProxyMon),
+ maybe_stop_proxy(ProxyPid),
+ ok.
+
+maybe_demonitor(undefined) ->
+ ok;
+maybe_demonitor(Mon) ->
+ (catch erlang:demonitor(Mon, [flush])),
+ ok.
+
+maybe_stop_proxy(Pid) when is_pid(Pid) ->
+ Pid ! {stop, self()},
+ ok;
+maybe_stop_proxy(_) ->
+ ok.
+
+
+call_proxy(Parent) ->
+ receive
+ {go, Parent, CD, TransIds} ->
+ call_proxy(Parent, CD, TransIds);
+ {stop, Parent} ->
+ exit(normal)
+ end.
+
+call_proxy(Parent, CD, TransIds) ->
+ Reply = proxy_wait_for_reply(CD, TransIds, []),
+ Parent ! {reply, self(), Reply},
+ call_proxy_gc(CD, CD#conn_data.call_proxy_gc_timeout).
+
+call_proxy_gc(CD, Timeout) when (Timeout > 0) ->
+ T = t(),
+ receive
+ {?MODULE, TransId, Version, Result} -> % Old format
+ CD2 = CD#conn_data{protocol_version = Version},
+ Extra = ?default_user_callback_extra,
+ return_unexpected_trans_reply(CD2, TransId, Result, Extra),
+ call_proxy_gc(CD, Timeout - (t() - T));
+
+ {?MODULE, TransId, Version, Result, Extra} ->
+ CD2 = CD#conn_data{protocol_version = Version},
+ return_unexpected_trans_reply(CD2, TransId, Result, Extra),
+ call_proxy_gc(CD, Timeout - (t() - T))
+
+ after Timeout ->
+ exit(normal)
+ end;
+call_proxy_gc(_CD, _Timeout) ->
+ exit(normal).
+
+proxy_wait_for_reply(_CD, [], Replies0) ->
+ % Make sure they come in the same order as the requests where sent
+ Replies1 = lists:keysort(2, Replies0),
+ %% Must all be the same version
+ [{Version, _, _}|_] = Replies1,
+ Replies2 = [Result || {_Version, _TransId, Result} <- Replies1],
+ {Version, Replies2};
+proxy_wait_for_reply(CD, TransIds, Replies) ->
+ receive
+ {?MODULE, TransId, Version, Reply} -> % Old format
+ {TransIds2, Replies2} =
+ wfr_handle_reply(CD,
+ TransIds, TransId,
+ Version, Replies, Reply),
+ proxy_wait_for_reply(CD, TransIds2, Replies2);
+
+ {?MODULE, TransId, Version, Reply, Extra} ->
+ {TransIds2, Replies2} =
+ wfr_handle_reply(CD,
+ TransIds, TransId,
+ Version, Replies, Reply, Extra),
+ proxy_wait_for_reply(CD, TransIds2, Replies2)
+ end.
+
+wfr_handle_reply(CD, TransIds, TransId, Version, Replies, Reply) ->
+ Extra = ?default_user_callback_extra,
+ wfr_handle_reply(CD, TransIds, TransId, Version, Replies, Reply, Extra).
+
+wfr_handle_reply(CD, TransIds, TransId, Version, Replies, Reply, Extra) ->
+ %% Is this meant for us?
+ case lists:member(TransId, TransIds) of
+ true -> % Yep
+ wfr_update(TransIds, TransId, Version, Replies, Reply, Extra);
+ false -> % Nop
+ CD2 = CD#conn_data{protocol_version = Version},
+ return_unexpected_trans_reply(CD2, TransId, Reply, Extra),
+ {TransIds, Replies}
+ end.
+
+wfr_mk_reply(Version, TransId, Result, ?default_user_callback_extra = _Extra) ->
+ {Version, TransId, Result};
+wfr_mk_reply(Version, TransId, Result0, Extra) ->
+ Result = list_to_tuple(lists:append(tuple_to_list(Result0), [Extra])),
+ {Version, TransId, Result}.
+
+%% Last segment of a reply
+%% transactionResult "=" actionReplies
+wfr_update(TransIds, TransId, Version, Results, {ok, {SegNo, Last, ARs}}, Extra)
+ when is_integer(SegNo) andalso (Last == true) ->
+ TransIds2 = lists:delete(TransId, TransIds),
+ case lists:keysearch(TransId, 2, Results) of
+
+ %% All segments ok (actionReplies)
+ {value, {V, TransId, {ok, SegReps}}} ->
+ SegReps2 = lists:keysort(1, [{SegNo, ARs}|SegReps]),
+ Rep = wfr_mk_reply(V, TransId, {ok, SegReps2}, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds2, Results2};
+
+ %% Atleast one segment error (transactionError)
+ {value, {V, TransId, {error, {segment, OkSegs, ErrSegs}}}} ->
+ OkSegs2 = lists:keysort(1, [{SegNo, ARs}|OkSegs]),
+ ErrSegs2 = lists:keysort(1, ErrSegs),
+ Error = {error, {segment, OkSegs2, ErrSegs2}},
+ Rep = wfr_mk_reply(V, TransId, Error, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds2, Results2};
+
+ false ->
+ %% First and only segment
+ Rep = wfr_mk_reply(Version, TransId, {ok, [{SegNo, ARs}]}, Extra),
+ {TransIds2, [Rep | Results]}
+
+ end;
+
+%% Last segment of a reply
+%% transactionResult "=" transactionError
+wfr_update(TransIds, TransId, Version, Results, {error, {SegNo, Last, ED}}, Extra)
+ when is_integer(SegNo) andalso (Last == true) ->
+ TransIds2 = lists:delete(TransId, TransIds),
+ case lists:keysearch(TransId, 2, Results) of
+
+ %% First segment with error (transactionError)
+ {value, {V, TransId, {ok, SegReps}}} ->
+ OkSegs = lists:keysort(1, [{SegNo, ED}|SegReps]),
+ ErrSegs = [{SegNo, ED}],
+ Error = {error, {segment, OkSegs, ErrSegs}},
+ Rep = wfr_mk_reply(V, TransId, Error, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds2, Results2};
+
+ %% Another segment with error (transactionError)
+ {value, {V, TransId, {error, {segment, OkSegs, ErrSegs}}}} ->
+ OkSegs2 = lists:keysort(1, OkSegs),
+ ErrSegs2 = lists:keysort(1, [{SegNo, ED}|ErrSegs]),
+ Error = {error, {segment, OkSegs2, ErrSegs2}},
+ Rep = wfr_mk_reply(V, TransId, Error, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds2, Results2};
+
+ false ->
+ %% First and only segment
+ OkSegs = [],
+ ErrSegs = [{SegNo, ED}],
+ Error = {error, {segment, OkSegs, ErrSegs}},
+ Rep = wfr_mk_reply(Version, TransId, Error, Extra),
+ {TransIds2, [Rep]}
+
+ end;
+
+%% One segment of a reply
+%% transactionResult "=" actionReplies
+wfr_update(TransIds, TransId, Version, Results, {ok, {SegNo, _Last, ARs}}, Extra)
+ when is_integer(SegNo) ->
+ case lists:keysearch(TransId, 2, Results) of
+
+ %% All segments ok (actionReplies)
+ {value, {V, TransId, {ok, SegReps}}} ->
+ SegReps2 = [{SegNo, ARs}|SegReps],
+ Rep = wfr_mk_reply(V, TransId, {ok, SegReps2}, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds, Results2};
+
+ %% Atleast one segment error (transactionError)
+ {value, {V, TransId, {error, {segment, OkSegs, ErrSegs}}}} ->
+ OkSegs2 = [{SegNo, ARs}|OkSegs],
+ Error = {error, {segment, OkSegs2, ErrSegs}},
+ Rep = wfr_mk_reply(V, TransId, Error, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds, Results2};
+
+ false ->
+ %% First and only segment
+ Rep = wfr_mk_reply(Version, TransId, {ok, [{SegNo, ARs}]}, Extra),
+ {TransIds, [Rep | Results]}
+
+ end;
+
+%% One segment of a reply
+%% transactionResult "=" transactionError
+wfr_update(TransIds, TransId, Version, Results, {error, {SegNo, _Last, ED}}, Extra)
+ when is_integer(SegNo) ->
+ case lists:keysearch(TransId, 2, Results) of
+
+ %% First segment with error (transactionError)
+ {value, {V, TransId, {ok, OkSegs}}} ->
+ ErrSegs = [{SegNo, ED}],
+ Error = {error, {segment, OkSegs, ErrSegs}},
+ Rep = wfr_mk_reply(V, TransId, Error, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds, Results2};
+
+ %% Another segment with error (transactionError)
+ {value, {V, TransId, {error, {OkSegs, ErrSegs}}}} ->
+ ErrSegs2 = [{SegNo, ED}|ErrSegs],
+ Error = {error, {segment, OkSegs, ErrSegs2}},
+ Rep = wfr_mk_reply(V, TransId, Error, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds, Results2};
+
+ false ->
+ %% First segment
+ OkSegs = [],
+ ErrSegs = [{SegNo, ED}],
+ Error = {error, {segment, OkSegs, ErrSegs}},
+ Rep = wfr_mk_reply(Version, TransId, Error, Extra),
+ {TransIds, [Rep]}
+
+ end;
+
+%% This means that some segments did not make it in time
+wfr_update(TransIds, TransId, Version, Results,
+ {error, {segment_timeout, Missing}}, Extra) ->
+ TransIds2 = lists:delete(TransId, TransIds),
+ case lists:keysearch(TransId, 2, Results) of
+
+ %% First segment with error (transactionError)
+ {value, {V, TransId, {ok, OkSegs}}} ->
+ Error = {error, {segment_timeout, Missing, OkSegs, []}},
+ Rep = wfr_mk_reply(V, TransId, Error, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds2, Results2};
+
+ %% Another segment with error (transactionError)
+ {value, {V, TransId, {error, {segment, OkSegs, ErrSegs}}}} ->
+ Error = {error, {segment_timeout, Missing, OkSegs, ErrSegs}},
+ Rep = wfr_mk_reply(V, TransId, Error, Extra),
+ Results2 = lists:keyreplace(TransId, 2, Results, Rep),
+ {TransIds2, Results2};
+
+ false ->
+ %% First segment
+ Error = {error, {segment_timeout, Missing, [], []}},
+ Rep = wfr_mk_reply(Version, TransId, Error, Extra),
+ {TransIds2, [Rep]}
+
+ end;
+
+%% And all other results (presumably results without segments).
+wfr_update(TransIds, TransId, Version, Results, Result, Extra) ->
+ TransIds2 = lists:delete(TransId, TransIds),
+ Results2 = [wfr_mk_reply(Version, TransId, Result, Extra)|Results],
+ {TransIds2, Results2}.
+
+
+%% TransInfo is either [trans_id()] or a [trans_req()]
+
+%% This is the normal case where we have just one
+%% transaction to be sent (using call or cast) using
+%% the transaction sender process.
+send_request(#conn_data{control_pid = CP,
+ trans_req = true,
+ trans_sender = Pid} = CD,
+ CH, [Serial], Action, [Bin])
+ when is_pid(Pid) andalso
+ is_integer(Serial) andalso
+ (node(CP) =:= node()) ->
+
+ ?report_trace(CD,
+ "send_request - one transaction via trans-sender",
+ [Serial]),
+
+ #conn_data{request_timer = InitTimer,
+ long_request_timer = LongTimer} = CD,
+ TransId = to_local_trans_id(CH, Serial),
+ insert_request(CD, CH, TransId, Action, {Serial, Bin},
+ InitTimer, LongTimer),
+ megaco_trans_sender:send_req(Pid, Serial, Bin);
+
+%% This is the general case where we have several transactions
+%% beeing sent (using call or cast) at once using
+%% the transaction sender process.
+send_request(#conn_data{control_pid = CP,
+ trans_req = true,
+ trans_sender = Pid} = CD,
+ CH, TransInfo, Action, Bins)
+ when is_pid(Pid) andalso
+ is_list(Bins) andalso
+ (node(CP) =:= node()) ->
+
+ ?report_trace(CD,
+ "send_request - multi transactions via trans_sender",
+ [TransInfo, Pid]),
+
+ #conn_data{request_timer = InitTimer,
+ long_request_timer = LongTimer} = CD,
+ insert_requests(CD, CH, TransInfo, Action, Bins,
+ InitTimer, LongTimer),
+ megaco_trans_sender:send_reqs(Pid, TransInfo, Bins);
+
+%% This is the case when one or more transactions is
+%% beeing sent in one message immediatelly (not using
+%% the transaction sender process. E.g. the binary is
+%% this encoded message.
+send_request(#conn_data{control_pid = CP} = CD,
+ CH, TRs, Action, Bin)
+ when is_list(TRs) andalso
+ is_binary(Bin) andalso
+ (node(CP) =:= node()) ->
+
+ %% d("send_request -> entry with"
+ %% "~n TRs: ~p", [TRs]),
+
+ ?report_trace(CD, "send_request - multi transaction", [TRs]),
+
+ #conn_data{request_timer = InitTimer,
+ long_request_timer = LongTimer} = CD,
+ insert_requests(CD, CH, TRs, Action, Bin,
+ InitTimer, LongTimer),
+ case megaco_messenger_misc:send_message(CD, false, Bin) of
+ {error, Reason} ->
+ cancel_requests(CD, TRs, Reason);
+ {ok, _} ->
+ ignore
+ end;
+
+%% This is the case where we are not on the node where the
+%% transport process run.
+send_request(#conn_data{control_pid = CP} = CD,
+ CH, TransInfo, Action, Bin)
+ when node(CP) =/= node() ->
+
+ ?report_trace(CD, "send_request - remote", [TransInfo]),
+
+ InitTimer = infinity,
+ LongTimer = infinity,
+ insert_requests(CD, CH, TransInfo, Action, Bin,
+ InitTimer, LongTimer),
+ Node = node(CP),
+ Args = [node(), CD, TransInfo, Bin],
+ rpc:cast(Node, ?MODULE, send_request_remote, Args).
+
+
+insert_requests(_, _, [], _, _, _, _) ->
+ ok;
+
+insert_requests(ConnData, ConnHandle, [Serial|Serials],
+ Action, [Bin|Bins], InitTimer, LongTimer)
+ when is_integer(Serial) andalso is_binary(Bin) ->
+ TransId = to_local_trans_id(ConnHandle, Serial),
+ insert_request(ConnData, ConnHandle,
+ TransId, Action, Bin, InitTimer, LongTimer),
+
+ insert_requests(ConnData, ConnHandle, Serials, Action, Bins,
+ InitTimer, LongTimer);
+
+insert_requests(ConnData, ConnHandle,
+ [{transactionRequest, TR}|TRs],
+ Action, Bin, InitTimer, LongTimer)
+ when is_record(TR, 'TransactionRequest') andalso is_binary(Bin) ->
+ #'TransactionRequest'{transactionId = Serial} = TR,
+ TransId = to_local_trans_id(ConnHandle, Serial),
+ insert_request(ConnData, ConnHandle,
+ TransId, Action, TR, InitTimer, LongTimer),
+
+ insert_requests(ConnData, ConnHandle, TRs, Action, Bin,
+ InitTimer, LongTimer).
+
+
+insert_request(ConnData, ConnHandle, TransId,
+ Action, Data, InitTimer, LongTimer) ->
+ #megaco_conn_handle{remote_mid = RemoteMid} = ConnHandle,
+ #conn_data{protocol_version = Version,
+ user_mod = UserMod,
+ user_args = UserArgs,
+ send_handle = SendHandle,
+ reply_data = ReplyData,
+ segment_recv_timer = InitSegTimer,
+ request_keep_alive_timeout = RKATimer} = ConnData,
+ {WaitFor, CurrTimer} = megaco_timer:init(InitTimer),
+ M = ?MODULE,
+ F = request_timeout,
+ A = [ConnHandle, TransId],
+ Ref = megaco_monitor:apply_after(M, F, A, WaitFor),
+ Req = #request{trans_id = TransId,
+ remote_mid = RemoteMid,
+ timer_ref = ?SIM({short, Ref}, init_request_timer),
+ init_timer = InitTimer,
+ init_long_timer = LongTimer,
+ curr_timer = CurrTimer,
+ version = Version,
+ bytes = {send, Data},
+ send_handle = SendHandle,
+ user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = ReplyData,
+ init_seg_timer = InitSegTimer,
+ keep_alive_timer = RKATimer},
+ megaco_monitor:insert_request(Req). % Timing problem?
+
+
+send_request_remote(ReplyNode, ConnData, TransInfo, Bin) ->
+ Action = remote,
+ ConnHandle = ConnData#conn_data.conn_handle,
+ ConnData2 = ConnData#conn_data{reply_data = ReplyNode},
+ send_request(ConnData2, ConnHandle, TransInfo, Action, Bin).
+
+prepare_req_send_options(CallOrCast, ConnHandle, Options, Actions) ->
+ %% Ensures that two processes cannot get same transaction id.
+ %% Bad send options may cause spurious transaction id to be consumed.
+ Incr = number_of_transactions(Actions),
+ case megaco_config:incr_trans_id_counter(ConnHandle, Incr) of
+ {ok, ConnData} ->
+ override_req_send_options(CallOrCast, ConnData, Options);
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+number_of_transactions([Action|_]) when is_tuple(Action) ->
+ 1;
+number_of_transactions(ActionsList) ->
+ length(ActionsList).
+
+override_req_send_options(ReplyAction, ConnData, [{Key, Val} | Tail]) ->
+ case Key of
+ protocol_version ->
+ ConnData2 = ConnData#conn_data{protocol_version = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ send_handle ->
+ ConnData2 = ConnData#conn_data{send_handle = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ request_timer ->
+ case megaco_config:verify_val(Key, Val) of
+ true ->
+ ConnData2 = ConnData#conn_data{request_timer = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ false ->
+ {error, {bad_send_option, {Key, Val}}}
+ end;
+ long_request_timer ->
+ case megaco_config:verify_val(Key, Val) of
+ true ->
+ ConnData2 = ConnData#conn_data{long_request_timer = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ false ->
+ {error, {bad_send_option, {Key, Val}}}
+ end;
+ call_proxy_gc_timeout when (ReplyAction =:= call) orelse
+ (ReplyAction =:= any) ->
+ case megaco_config:verify_val(Key, Val) of
+ true ->
+ ConnData2 =
+ ConnData#conn_data{call_proxy_gc_timeout = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ false ->
+ {error, {bad_send_option, {Key, Val}}}
+ end;
+ request_keep_alive_timeout when (ReplyAction =:= cast) orelse
+ (ReplyAction =:= any) ->
+ case megaco_config:verify_val(Key, Val) of
+ true ->
+ ConnData2 =
+ ConnData#conn_data{request_keep_alive_timeout = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ false ->
+ {error, {bad_send_option, {Key, Val}}}
+ end;
+ reply_data ->
+ ConnData2 = ConnData#conn_data{reply_data = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ user_mod when is_atom(Val) ->
+ ConnData2 = ConnData#conn_data{user_mod = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ user_args when is_list(Val) ->
+ ConnData2 = ConnData#conn_data{user_args = Val},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ trans_req when Val =:= false ->
+ %% We only allow turning the transaction-sender off, since
+ %% the opposite (turning it on) would causing to much headake...
+ %% This will allow not using the transaction sender for
+ %% occasional messages
+ ConnData2 = ConnData#conn_data{trans_req = Val,
+ trans_sender = undefined},
+ override_req_send_options(ReplyAction, ConnData2, Tail);
+ _Bad ->
+ {error, {bad_send_option, {Key, Val}}}
+ end;
+override_req_send_options(_ReplyAction, ConnData, []) ->
+ {ok, ConnData}.
+
+override_rep_send_options(ConnData, [{Key, Val} | Tail]) ->
+ case Key of
+ protocol_version ->
+ ConnData2 = ConnData#conn_data{protocol_version = Val},
+ override_rep_send_options(ConnData2, Tail);
+ send_handle ->
+ ConnData2 = ConnData#conn_data{send_handle = Val},
+ override_rep_send_options(ConnData2, Tail);
+ reply_timer ->
+ case megaco_config:verify_val(Key, Val) of
+ true ->
+ ConnData2 = ConnData#conn_data{reply_timer = Val},
+ override_rep_send_options(ConnData2, Tail);
+ false ->
+ {error, {bad_send_option, {Key, Val}}}
+ end;
+ trans_req when Val =:= false ->
+ %% We only allow turning the transaction-sender off, since
+ %% the opposite (turning it on) would causing to much headake...
+ %% This will allow not using the transaction sender for
+ %% occasional messages
+ ConnData2 = ConnData#conn_data{trans_req = Val,
+ trans_sender = undefined},
+ override_rep_send_options(ConnData2, Tail);
+ _Bad ->
+ {error, {bad_send_option, {Key, Val}}}
+ end;
+override_rep_send_options(ConnData, []) ->
+ {ok, ConnData}.
+
+
+%% ----
+%% This list is allways atleast one (list of actions) long.
+%% ----
+%% The proper number of transaction id numbers has already
+%% been "allocated", and the connection data record is
+%% updated accordingly.
+encode_requests(#conn_data{trans_req = true,
+ trans_sender = Pid,
+ serial = LastSerial} = CD, ActionsList)
+ when is_pid(Pid) ->
+ (catch encode_requests(CD, LastSerial,
+ lists:reverse(ActionsList), [], []));
+encode_requests(#conn_data{serial = LastSerial} = CD, ActionsList) ->
+ %% We shall not accumulate transactions.
+ %% This means that we shall not encode
+ %% the transactions individually (and send
+ %% them to the sender process, which
+ %% accumulate transactions for later sending),
+ %% Instead we encode the entire message directly.
+ %% => We shall return one binary, containing,
+ %% possibly, many transactions
+ encode_requests_in_msg(CD, LastSerial, lists:reverse(ActionsList)).
+
+
+%% This means that we shall compose and encode one complete
+%% megaco message, containing one or more transactions.
+encode_requests_in_msg(CD, LastSerial, ActionsList) ->
+ TRs = compose_requests_in_msg(LastSerial, ActionsList, []),
+ Body = {transactions, TRs},
+ Res = megaco_messenger_misc:encode_body(CD,
+ "encode trans request(s) msg",
+ Body),
+ case Res of
+ {ok, Bin} ->
+ {ok, TRs, Bin};
+ Error ->
+ Error
+ end.
+
+compose_requests_in_msg(_S, [], TRs) ->
+ TRs;
+compose_requests_in_msg(Serial, [A|As], Acc) ->
+ TR = #'TransactionRequest'{transactionId = Serial,
+ actions = A},
+ compose_requests_in_msg(Serial - 1, As, [{transactionRequest, TR}|Acc]).
+
+
+%% We have done the encoding in reverse order, so there
+%% is no need to reverse now.
+encode_requests(_, _, [], Serials, EncodedTRs) ->
+ {ok, Serials, EncodedTRs};
+encode_requests(CD, Serial, [Actions|ActionsList], Serials, EncodedTRs) ->
+ case do_encode_request(CD, Serial, Actions) of
+ {ok, Bin} ->
+ encode_requests(CD, Serial - 1, ActionsList,
+ [Serial|Serials], [Bin|EncodedTRs]);
+ Error ->
+ throw(Error)
+ end.
+
+
+do_encode_request(CD, Serial, Actions) ->
+ TR = #'TransactionRequest'{transactionId = Serial,
+ actions = Actions},
+ megaco_messenger_misc:encode_trans_request(CD, TR).
+
+
+imm_ack_req(Counter, when_pending_sent) when (Counter > 0) -> 'NULL';
+imm_ack_req(_Counter, when_pending_sent) -> asn1_NOVALUE;
+imm_ack_req(_Counter, ImmAck) -> ImmAck.
+
+maybe_send_reply(#conn_data{sent_pending_limit = Limit} = ConnData,
+ TransId, Result, SendOpts, ImmAck) ->
+
+ %% d("maybe_send_reply -> entry with"
+ %% "~n Limit: ~p"
+ %% "~n TransId: ~p"
+ %% "~n Result: ~p"
+ %% "~n SendOpts: ~p"
+ %% "~n ImmAck: ~p", [Limit, TransId, Result, SendOpts, ImmAck]),
+
+ %% Pending limit
+ %% Before we can send the reply we must check that we have
+ %% not passed the pending limit (and sent an error message).
+ case check_pending_limit(Limit, sent, TransId) of
+ {ok, Counter} ->
+ case override_rep_send_options(ConnData, SendOpts) of
+ {ok, ConnData2} ->
+ send_reply(ConnData2, Result,
+ imm_ack_req(Counter, ImmAck));
+ Error ->
+ Error
+ end;
+ aborted ->
+ {error, aborted}
+ end.
+
+encode_reply(CD, TR) ->
+ megaco_messenger_misc:encode_trans_reply(CD, TR).
+
+send_reply(#conn_data{serial = Serial,
+ trans_req = TransReq,
+ trans_sender = TransSnd} = CD, TransRes, ImmAck) ->
+
+ %% Encapsule the transaction result into a reply message
+
+ %% d("send_reply -> entry with"
+ %% "~n Serial: ~p"
+ %% "~n TransRes: ~p"
+ %% "~n ImmAck: ~p", [Serial, TransRes, ImmAck]),
+
+ TR = #megaco_transaction_reply{transactionId = Serial,
+ immAckRequired = ImmAck,
+ transactionResult = TransRes},
+ case encode_reply(CD, TR) of
+ {ok, Bin} when is_binary(Bin) andalso (TransReq =:= true) ->
+ ?rt2("send_reply - pass it on to the transaction sender",
+ [size(Bin)]),
+ megaco_trans_sender:send_reply(TransSnd, Bin),
+ {ok, Bin};
+
+ {ok, Bin} when is_binary(Bin) ->
+ ?rt2("send_reply - encoded", [size(Bin)]),
+ TraceLabel = "send trans reply",
+ Body = {transactions, [Bin]},
+ megaco_messenger_misc:send_body(CD, TraceLabel, Body);
+
+ {ok, Bins} when is_list(Bins) ->
+ ?rt2("send_reply - encoded (segmented)", [length(Bins)]),
+ Res = send_reply_segments(CD, Bins),
+ {ok, Res};
+
+ {error, not_implemented} ->
+ %% Oups, we cannot segment regardless the config,
+ %% so pack it all into one message and hope for
+ %% the best...
+ ?rt2("send_reply - cannot encode separate transactions", []),
+ TR2 = megaco_messenger_misc:transform_transaction_reply(CD, TR),
+ Body = {transactions, [{transactionReply, TR2}]},
+ megaco_messenger_misc:send_body(CD, "encode trans reply", Body);
+
+ {error, Reason} = Error ->
+ Code = ?megaco_internal_gateway_error,
+ Text = "encode transaction reply",
+ ED = #'ErrorDescriptor'{errorCode = Code,
+ errorText = Text},
+ Res = {transactionError, ED},
+ TR2 = #megaco_transaction_reply{transactionId = Serial,
+ transactionResult = Res},
+ TR3 = megaco_messenger_misc:transform_transaction_reply(CD, TR2),
+ TraceLabel = "<ERROR> encode trans reply body failed",
+ ?report_important(CD, TraceLabel, [TR, TR3, ED, Error]),
+ error_msg("failed encoding transaction reply body: ~s",
+ [format_encode_error_reason(Reason)]),
+ Body = {transactions, [{transactionReply, TR3}]},
+ megaco_messenger_misc:send_body(CD, TraceLabel, Body),
+ Error
+ end.
+
+send_reply_segments(CD, Bins) ->
+ TraceLabelPre = "send segmented trans reply",
+ (catch send_reply_segments(CD, TraceLabelPre, Bins)).
+
+send_reply_segments(#conn_data{segment_send = infinity} = CD, Label, Bins) ->
+ send_reply_segments(CD, Label, length(Bins), Bins);
+
+send_reply_segments(#conn_data{segment_send = K} = CD, Label, Bins)
+ when is_integer(K) andalso (K =< length(Bins)) ->
+ send_reply_segments(CD, Label, K, Bins);
+
+send_reply_segments(#conn_data{segment_send = K} = CD, Label, Bins)
+ when is_integer(K) ->
+ send_reply_segments(CD, Label, length(Bins), Bins).
+
+send_reply_segments(CD, Label, K, Bins) ->
+ send_reply_segments(CD, Label, K, Bins, []).
+
+send_reply_segments(_CD, _Label, 0, Bins, Sent) ->
+ ?rt2("send_reply_segments - done", [Sent, Bins]),
+ {Sent, Bins};
+send_reply_segments(CD, TraceLabelPre, K, [{SN, Bin}|Bins], Sent) ->
+ case send_reply_segment(CD, TraceLabelPre, SN, Bin) of
+ {ok, Bin2} ->
+ ?rt2("send_reply_segments - send", [K, SN]),
+ send_reply_segments(CD, TraceLabelPre, K-1,
+ Bins, [{SN, Bin2}|Sent]);
+ Error ->
+ throw(Error)
+ end.
+
+send_reply_segment(CD, TraceLabelPre, SN, Bin) ->
+ Label = lists:flatten(io_lib:format("~s[~w]", [TraceLabelPre, SN])),
+ Body = {transactions, [Bin]},
+ megaco_messenger_misc:send_body(CD, Label, Body).
+
+
+format_encode_error_reason(Reason) ->
+ FS =
+ case Reason of
+ {Mod, Func, [EC, Msg], {AE, CS}} when is_atom(Mod) andalso
+ is_atom(Func) andalso
+ is_list(EC) and
+ is_tuple(Msg) and
+ is_list(CS) ->
+ io_lib:format("~n Encode module: ~w"
+ "~n Func: ~w"
+ "~n Encode config: ~w"
+ "~n Message part: ~p"
+ "~n Actual error: ~p"
+ "~n Call stack: ~w",
+ [Mod, Func, EC, Msg, AE, CS]);
+
+ {Mod, Func, [EC, Msg], AE} when is_atom(Mod) andalso
+ is_atom(Func) andalso
+ is_list(EC) andalso
+ is_tuple(Msg) ->
+ io_lib:format("~n Encode module: ~w"
+ "~n Func: ~w"
+ "~n Encode config: ~w"
+ "~n Message part: ~p"
+ "~n Actual error: ~p",
+ [Mod, Func, EC, Msg, AE]);
+
+ {Mod, [EC, Msg], {AE, CS}} when is_atom(Mod) andalso
+ is_list(EC) andalso
+ is_tuple(Msg) andalso
+ is_list(CS) ->
+ io_lib:format("~n Encode module: ~w"
+ "~n Encode config: ~w"
+ "~n Message part: ~p"
+ "~n Actual error: ~p"
+ "~n Call stack: ~w",
+ [Mod, EC, Msg, AE, CS]);
+
+ {Mod, [EC, Msg], AE} when is_atom(Mod) andalso
+ is_list(EC) andalso
+ is_tuple(Msg) ->
+ io_lib:format("~n Encode module: ~w"
+ "~n Encode config: ~w"
+ "~n Message part: ~p"
+ "~n Actual error: ~p",
+ [Mod, EC, Msg, AE]);
+
+ Error ->
+ io_lib:format("~n ~w", [Error])
+ end,
+ lists:flatten(FS).
+
+
+%% Presumably the user would return immediately (with {pending, Data}) if it
+%% knows or suspects a request to take a long time to process.
+%% For this reason we assume that handling a resent request
+%% could not have caused an update of the pending limit counter.
+maybe_send_pending(#conn_data{sent_pending_limit = Limit} = ConnData,
+ TransId) ->
+ case check_and_maybe_incr_pending_limit(Limit, sent, TransId) of
+ ok ->
+ send_pending(ConnData);
+ error ->
+ SendReply = send_pending_limit_error(ConnData),
+ {aborted, SendReply};
+ aborted ->
+ {aborted, ignore}
+ end.
+
+
+send_pending(#conn_data{serial = Serial,
+ trans_req = true,
+ trans_sender = Pid}) ->
+ megaco_trans_sender:send_pending(Pid, Serial);
+send_pending(#conn_data{serial = Serial} = CD) ->
+ %% Encapsule the transaction result into a pending message
+ TP = #'TransactionPending'{transactionId = Serial},
+ Body = {transactions, [{transactionPending, TP}]},
+ megaco_messenger_misc:send_body(CD, "send trans pending", Body).
+
+
+maybe_send_ack('NULL', #conn_data{serial = Serial,
+ trans_ack = true,
+ trans_sender = Pid}) ->
+ megaco_trans_sender:send_ack_now(Pid, Serial);
+maybe_send_ack('NULL', CD) ->
+ send_ack(CD);
+maybe_send_ack(_, #conn_data{auto_ack = false}) ->
+ ignore;
+maybe_send_ack(_, #conn_data{serial = Serial,
+ trans_ack = true,
+ trans_sender = Pid})
+ when is_pid(Pid) ->
+ %% Send (later) via the transaction sender
+ megaco_trans_sender:send_ack(Pid, Serial),
+ ok;
+maybe_send_ack(_, CD) ->
+ %% Send now
+ send_ack(CD).
+
+
+send_ack(#conn_data{serial = Serial} = CD) ->
+ %% Encapsule the transaction result into a ack message
+ TRA = #'TransactionAck'{firstAck = Serial},
+ Body = {transactions, [{transactionResponseAck, [TRA]}]},
+ megaco_messenger_misc:send_body(CD, "send trans ack", Body).
+
+
+send_segment_reply(#conn_data{serial = Serial} = CD, SegNo) ->
+ SR = #'SegmentReply'{transactionId = Serial,
+ segmentNumber = SegNo},
+ Body = {transactions, [{segmentReply, SR}]},
+ megaco_messenger_misc:send_body(CD, "send segment reply", Body).
+
+send_segment_reply(#conn_data{serial = Serial} = CD, SegNo, Complete) ->
+ SR = #'SegmentReply'{transactionId = Serial,
+ segmentNumber = SegNo,
+ segmentationComplete = Complete},
+ Body = {transactions, [{segmentReply, SR}]},
+ megaco_messenger_misc:send_body(CD, "send segment reply", Body).
+
+send_segment_reply_complete(CD, SegNo) ->
+ send_segment_reply(CD, SegNo, 'NULL').
+
+
+send_pending_limit_error(ConnData) ->
+ ?report_pending_limit_exceeded(ConnData),
+ Code = ?megaco_number_of_transactionpending_exceeded,
+ Reason = "Pending limit exceeded",
+ send_trans_error(ConnData, Code, Reason).
+
+send_trans_error(ConnData, Code, Reason) ->
+ %% Encapsulate the transaction error into a reply message
+ ED = #'ErrorDescriptor'{errorCode = Code, errorText = Reason},
+ Serial = ConnData#conn_data.serial,
+ %% Version = ConnData#conn_data.protocol_version,
+ TransRes = {transactionError, ED},
+ TR = #megaco_transaction_reply{transactionId = Serial,
+ transactionResult = TransRes},
+ TR2 = megaco_messenger_misc:transform_transaction_reply(ConnData, TR),
+ Body = {transactions, [{transactionReply, TR2}]},
+ case megaco_messenger_misc:send_body(ConnData, "send trans error", Body) of
+ {error, Reason2} ->
+ ?report_important(ConnData,
+ "<ERROR> failed sending transaction error",
+ [Body, {error, Reason2}]),
+ error;
+ _ ->
+ ok
+ end.
+
+
+send_message_error(ConnData, Code, Reason) ->
+ ED = #'ErrorDescriptor'{errorCode = Code, errorText = Reason},
+ Body = {messageError, ED},
+ case megaco_messenger_misc:send_body(ConnData, "send trans error", Body) of
+ {error, Reason2} ->
+ ?report_important(ConnData,
+ "<ERROR> failed sending message error",
+ [Body, {error, Reason2}]),
+ error;
+ _ ->
+ ok
+ end.
+
+
+cancel(ConnHandle, Reason) when is_record(ConnHandle, megaco_conn_handle) ->
+ case megaco_config:lookup_local_conn(ConnHandle) of
+ [CD] ->
+ megaco_config:update_conn_info(CD, cancel, true),
+ do_cancel(ConnHandle, Reason, CD#conn_data{cancel = true}),
+ megaco_config:update_conn_info(CD, cancel, false),
+ ok;
+ [] ->
+ ConnData = fake_conn_data(ConnHandle),
+ do_cancel(ConnHandle, Reason, ConnData)
+ end.
+
+do_cancel(ConnHandle, Reason, ConnData) ->
+ ?report_trace(ConnData, "cancel", [ConnHandle, Reason]),
+ LocalMid = ConnHandle#megaco_conn_handle.local_mid,
+ RemoteMid = ConnHandle#megaco_conn_handle.remote_mid,
+ ReqTransIdPat = #trans_id{mid = LocalMid, _ = '_'},
+ ReqPat = #request{trans_id = ReqTransIdPat,
+ remote_mid = RemoteMid,
+ _ = '_'},
+ CancelReq = fun(Req) ->
+ cancel_request(ConnData, Req, Reason),
+ {_Type, Ref} = Req#request.timer_ref, %% OTP-4843
+ megaco_monitor:cancel_apply_after(Ref)
+ end,
+ Requests = megaco_monitor:match_requests(ReqPat),
+ lists:foreach(CancelReq, Requests),
+ RemoteMid = ConnHandle#megaco_conn_handle.remote_mid,
+ RepTransIdPat = #trans_id{mid = RemoteMid, _ = '_'}, % BUGBUG List here?
+ RepPat = #reply{trans_id = RepTransIdPat,
+ local_mid = LocalMid,
+ _ = '_'},
+ CancelRep = fun(Rep) ->
+ cancel_reply(ConnData, Rep, Reason)
+ end,
+ Replies = megaco_monitor:match_replies(RepPat),
+ lists:foreach(CancelRep, Replies),
+ ok.
+
+cancel_requests(_ConnData, [], _Reason) ->
+ ok;
+cancel_requests(ConnData, [{transactionRequest,TR}|TRs], Reason) ->
+ #'TransactionRequest'{transactionId = TransId0} = TR,
+ TransId = to_local_trans_id(ConnData#conn_data.conn_handle, TransId0),
+ case megaco_monitor:lookup_request(TransId) of
+ [] ->
+ ignore;
+ [Req] when is_record(Req, request) ->
+ cancel_request(ConnData, Req, Reason)
+ end,
+ cancel_requests(ConnData, TRs, Reason).
+
+cancel_request(ConnData, Req, Reason) ->
+ ?report_trace(ignore, "cancel request", [Req]),
+ ?TC_AWAIT_CANCEL_EVENT(),
+ TransId = Req#request.trans_id,
+ Version = Req#request.version,
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ UserReply = {error, Reason},
+ ConnData2 = ConnData#conn_data{protocol_version = Version,
+ user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ cancel_request2(ConnData2, TransId, UserReply).
+
+cancel_request2(ConnData, TransId, UserReply) ->
+ megaco_monitor:delete_request(TransId),
+ megaco_config:del_pending_counter(recv, TransId), % OTP-7189
+ Serial = TransId#trans_id.serial,
+ ConnData2 = ConnData#conn_data{serial = Serial},
+ return_reply(ConnData2, TransId, UserReply).
+
+
+return_reply(ConnData, TransId, UserReply) ->
+ Extra = ?default_user_callback_extra,
+ return_reply(ConnData, TransId, UserReply, Extra).
+
+return_reply(ConnData, TransId, UserReply, Extra) ->
+ ?report_trace(ConnData, "callback: trans reply", [UserReply]),
+ Version = ConnData#conn_data.protocol_version,
+ UserData = ConnData#conn_data.reply_data,
+ case ConnData#conn_data.reply_action of
+ call when is_pid(UserData) ->
+ ?report_trace(ConnData, "callback: (call) trans reply",
+ [UserReply]),
+ Pid = UserData,
+ Pid ! {?MODULE, TransId, Version, UserReply, Extra};
+ cast ->
+ ?report_trace(ConnData, "callback: (cast) trans reply", [UserReply]),
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ ConnHandle = ConnData#conn_data.conn_handle,
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ConnHandle, Version, UserReply, UserData | UserArgs];
+ _ ->
+ [ConnHandle, Version, UserReply, UserData, Extra | UserArgs]
+ end,
+ Res = (catch apply(UserMod, handle_trans_reply, Args)),
+ ?report_debug(ConnData, "return: (cast) trans reply",
+ [UserReply, {return, Res}]),
+ case Res of
+ ok ->
+ ok;
+ _ ->
+ warning_msg("transaction reply callback failed: ~w",
+ [Res]),
+ ok
+ end,
+ Res;
+ remote ->
+ ?report_trace(ConnData, "callback: (remote) trans reply", [UserReply]),
+ Node = UserData,
+ Args = [ConnData, UserReply, Extra],
+ rpc:cast(Node, ?MODULE, receive_reply_remote, Args)
+ end.
+
+receive_reply_remote(ConnData, UserReply) ->
+ Extra = ?default_user_callback_extra,
+ receive_reply_remote(ConnData, UserReply, Extra).
+
+receive_reply_remote(ConnData, UserReply, Extra) ->
+ TransId = to_local_trans_id(ConnData),
+ case (catch megaco_monitor:lookup_request(TransId)) of
+ [#request{timer_ref = {_Type, Ref}} = Req] -> %% OTP-4843
+ %% Don't care about Req and Rep version diff
+ megaco_monitor:delete_request(TransId),
+ megaco_monitor:cancel_apply_after(Ref), % OTP-4843
+ megaco_config:del_pending_counter(recv, TransId), % OTP-7189
+
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ ConnData2 = ConnData#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(ConnData2, TransId, UserReply, Extra);
+
+ _ ->
+ ?report_trace(ConnData, "remote reply (no receiver)",
+ [UserReply]),
+ return_unexpected_trans_reply(ConnData, TransId, UserReply, Extra)
+ end.
+
+cancel_reply(ConnData, #reply{state = waiting_for_ack} = Rep, Reason) ->
+ ?report_trace(ignore, "cancel reply [waiting_for_ack]", [Rep]),
+ megaco_monitor:cancel_apply_after(Rep#reply.pending_timer_ref),
+ Serial = (Rep#reply.trans_id)#trans_id.serial,
+ ConnData2 = ConnData#conn_data{serial = Serial},
+ T = #'TransactionAck'{firstAck = Serial},
+ Extra = ?default_user_callback_extra,
+ handle_ack(ConnData2, {error, Reason}, Rep, T, Extra);
+
+cancel_reply(_ConnData, #reply{state = aborted} = Rep, _Reason) ->
+ ?report_trace(ignore, "cancel reply [aborted]", [Rep]),
+ #reply{trans_id = TransId,
+ timer_ref = ReplyRef,
+ pending_timer_ref = PendingRef} = Rep,
+ megaco_monitor:delete_reply(TransId),
+ megaco_monitor:cancel_apply_after(ReplyRef),
+ megaco_monitor:cancel_apply_after(PendingRef), % Still running?
+ megaco_config:del_pending_counter(sent, TransId), % Still existing?
+ ok;
+
+cancel_reply(_ConnData, Rep, ignore) ->
+ ?report_trace(ignore, "cancel reply [ignore]", [Rep]),
+ #reply{trans_id = TransId,
+ timer_ref = ReplyRef,
+ pending_timer_ref = PendingRef} = Rep,
+ megaco_monitor:delete_reply(TransId),
+ megaco_monitor:cancel_apply_after(ReplyRef),
+ megaco_monitor:cancel_apply_after(PendingRef), % Still running?
+ megaco_config:del_pending_counter(sent, TransId), % Still existing?
+ ok;
+
+cancel_reply(_CD, _Rep, _Reason) ->
+ ok.
+
+
+request_keep_alive_timeout(ConnHandle, TransId) ->
+ megaco_config:del_pending_counter(ConnHandle, TransId),
+ megaco_monitor:lookup_request(TransId),
+ ok.
+
+
+request_timeout(ConnHandle, TransId) ->
+ ?rt1(ConnHandle, "request timeout", [TransId]),
+ case megaco_monitor:lookup_request(TransId) of
+ [] ->
+ request_not_found_ignore;
+ [Req] when is_record(Req, request) ->
+ case megaco_config:lookup_local_conn(ConnHandle) of
+ [CD] when (CD#conn_data.cancel =:= true) ->
+ cancel_in_progress_ignore;
+ [CD] ->
+ incNumTimerRecovery(ConnHandle),
+ do_request_timeout(ConnHandle, TransId, CD, Req);
+ [] when ConnHandle#megaco_conn_handle.remote_mid =:= preliminary_mid ->
+ %% There are two possibillities:
+ %% 1) The connection has just been upgraded from a
+ %% preliminary to a real connection. So this timeout
+ %% is just a glitch. E.g. between the removel of this
+ %% ConnHandle and the timer.
+ %% 2) The first message sent, the service-change, got no
+ %% reply (UDP without three-way-handshake).
+ %% And then the other side (MGC) sends a request,
+ %% which causes an auto-upgrade
+ request_timeout_upgraded(ConnHandle, Req);
+ [] ->
+ incNumTimerRecovery(ConnHandle),
+ ConnData = fake_conn_data(ConnHandle),
+ do_request_timeout(ConnHandle, TransId, ConnData, Req)
+ end
+ end.
+
+request_timeout_upgraded(ConnHandle, Req) ->
+ CD = fake_conn_data(ConnHandle),
+ cancel_request(CD, Req, timeout).
+
+do_request_timeout(ConnHandle, TransId, ConnData,
+ #request{curr_timer = CurrTimer} = Req) ->
+
+ ?rt1(ConnHandle, "process request timeout", [TransId, CurrTimer]),
+
+ SendHandle = Req#request.send_handle,
+ Version = Req#request.version,
+ ConnData2 = ConnData#conn_data{send_handle = SendHandle,
+ protocol_version = Version},
+ case CurrTimer of
+ timeout -> %%%%%%%
+ cancel_request(ConnData2, Req, timeout),
+ timeout1;
+
+ %% Restartable timer
+ %% (max_retries = infinity_restartable)
+ {_, timeout} ->
+ cancel_request(ConnData2, Req, timeout),
+ timeout2;
+
+ Timer ->
+ {SendOrNoSend, Data} = Req#request.bytes,
+ case SendOrNoSend of
+ send ->
+ case maybe_encode(ConnData2, Data) of
+ {ok, Bin} ->
+ ?report_trace(ConnData2, "re-send trans request",
+ [{bytes, Bin}]),
+ case maybe_send_message(ConnData2, true, Bin) of
+ ok ->
+ sent1_ignore;
+ {ok, _} ->
+ sent2_ignore;
+ {error, Reason} ->
+ ?report_important(ConnData2,
+ "<ERROR> "
+ "re-send trans "
+ "request failed",
+ [{bytes, Bin},
+ {error, Reason}])
+ end;
+
+ {error, Reason} ->
+ %% Since it was possible to encode the original
+ %% message this should really never happen...
+ ?report_important(ConnData2,
+ "<ERROR> "
+ "re-send trans request failed",
+ [{transaction,
+ Req#request.bytes},
+ {error, Reason}])
+ end;
+ no_send ->
+ not_sent_ok
+ end,
+ {WaitFor, Timer2} = megaco_timer:restart(Timer),
+ OptBin = opt_garb_binary(Timer2, Data),
+ {Type, _} = Req#request.timer_ref,
+ M = ?MODULE,
+ F = request_timeout,
+ A = [ConnHandle, TransId],
+ Ref2 = megaco_monitor:apply_after(M, F, A, WaitFor),
+ NewFields =
+ [{#request.bytes, {SendOrNoSend, OptBin}},
+ {#request.timer_ref, {Type, Ref2}},
+ {#request.curr_timer, Timer2}],
+ megaco_monitor:update_request_fields(TransId, NewFields), % Timing problem
+ {restarted, WaitFor, Timer2}
+
+ end.
+
+maybe_encode(#conn_data{trans_req = false} = CD, {_Serial, Bin})
+ when is_binary(Bin) ->
+ Body = {transactions, [{transactionRequest, Bin}]},
+ megaco_messenger_misc:encode_body(CD, "encode trans request msg", Body);
+maybe_encode(_CD, {_Serial, Bin} = D) when is_binary(Bin) ->
+ {ok, D};
+maybe_encode(#conn_data{trans_req = true,
+ trans_sender = Pid} = CD,
+ #'TransactionRequest'{transactionId = Serial} = TR)
+ when is_pid(Pid) ->
+ case megaco_messenger_misc:encode_trans_request(CD, TR) of
+ {ok, Bin} ->
+ {ok, {Serial, Bin}};
+ Error ->
+ Error
+ end;
+maybe_encode(CD, TR)
+ when is_record(TR, 'TransactionRequest') ->
+ Body = {transactions, [{transactionRequest, TR}]},
+ megaco_messenger_misc:encode_body(CD, "encode trans request msg", Body);
+maybe_encode(_CD, Trash) ->
+ {error, {invalid_bin, Trash}}.
+
+maybe_send_message(CD, Resend, Bin) when is_binary(Bin) ->
+ megaco_messenger_misc:send_message(CD, Resend, Bin);
+maybe_send_message(#conn_data{trans_sender = Pid}, _Resend, {Serial, Bin})
+ when is_pid(Pid) andalso is_integer(Serial) andalso is_binary(Bin) ->
+ megaco_trans_sender:send_req(Pid, Serial, Bin).
+
+
+reply_timeout(ConnHandle, TransId, timeout) ->
+ handle_reply_timer_timeout(ConnHandle, TransId);
+
+%% This means that infinity_restartable was used for max_retries.
+%% There is currently no reason to use this for the reply_timeout,
+%% since there is no external event to restart the timer!
+reply_timeout(ConnHandle, TransId, {_, timeout}) ->
+ handle_reply_timer_timeout(ConnHandle, TransId);
+
+reply_timeout(ConnHandle, TransId, Timer) ->
+ ?report_trace(ConnHandle, "reply timeout", [Timer, TransId]),
+
+ case megaco_monitor:lookup_reply(TransId) of
+ [] ->
+ reply_not_found_ignore;
+
+ [#reply{state = waiting_for_ack,
+ ack_action = {handle_ack, _}} = Rep] ->
+ case megaco_config:lookup_local_conn(ConnHandle) of
+ [CD] when (CD#conn_data.cancel =:= true) ->
+ cancel_in_progress_ignore;
+ [CD] ->
+ incNumTimerRecovery(ConnHandle),
+ do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep);
+ [] ->
+ incNumTimerRecovery(ConnHandle),
+ CD = fake_conn_data(ConnHandle),
+ do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep)
+ end;
+
+ [#reply{state = waiting_for_ack,
+ bytes = Sent} = Rep] when is_list(Sent) ->
+ case megaco_config:lookup_local_conn(ConnHandle) of
+ [ConnData] ->
+ incNumTimerRecovery(ConnHandle),
+ do_reply_timeout(ConnHandle, TransId, ConnData,
+ Timer, Rep);
+ [] ->
+ incNumTimerRecovery(ConnHandle),
+ ConnData = fake_conn_data(ConnHandle),
+ do_reply_timeout(ConnHandle, TransId, ConnData,
+ Timer, Rep)
+ end;
+
+ [#reply{state = waiting_for_ack} = Rep] ->
+ do_reply_timeout(ConnHandle, TransId, Timer, Rep);
+
+ [#reply{state = aborted} = Rep] ->
+ do_reply_timeout(ConnHandle, TransId, Timer, Rep);
+
+ _ ->
+ ignore
+
+ end.
+
+do_reply_timeout(ConnHandle, TransId, ConnData, Timer,
+ #reply{send_handle = SH,
+ version = V,
+ bytes = Bytes} = Rep) when is_binary(Bytes) ->
+
+%% d("do_reply_timeout -> entry with"
+%% "~n ConnHandle: ~p"
+%% "~n TransId: ~p"
+%% "~n Timer: ~p"
+%% "~n Rep: ~p"
+%% "~n", [ConnHandle, TransId, Timer, Rep]),
+
+ CD = ConnData#conn_data{send_handle = SH,
+ protocol_version = V},
+
+ ?rt1(CD, "re-send trans reply", [{bytes, Bytes}]),
+ case megaco_messenger_misc:send_message(CD, true, Bytes) of
+ {ok, _} ->
+ ignore;
+ {error, Reason} ->
+ ?report_important(CD, "<ERROR> re-send trans reply failed",
+ [{bytes, Bytes}, {error, Reason}])
+ end,
+ do_reply_timeout(ConnHandle, TransId, Timer, Rep);
+
+do_reply_timeout(ConnHandle, TransId, ConnData, Timer,
+ #reply{send_handle = SH,
+ version = V,
+ bytes = Sent} = Rep) when is_list(Sent) ->
+
+%% d("do_reply_timeout -> entry with"
+%% "~n ConnHandle: ~p"
+%% "~n TransId: ~p"
+%% "~n Timer: ~p"
+%% "~n Rep: ~p"
+%% "~n", [ConnHandle, TransId, Timer, Rep]),
+
+ CD = ConnData#conn_data{send_handle = SH,
+ protocol_version = V},
+
+ ReSend =
+ fun({SN, Bytes}) ->
+ ?rt1(CD, "re-send segmented trans reply",
+ [{segment_no, SN}, {bytes, Bytes}]),
+ case megaco_messenger_misc:send_message(CD, true, Bytes) of
+%% ok ->
+%% ignore;
+ {ok, _} ->
+ ignore;
+ {error, Reason} ->
+ ?report_important(CD,
+ "<ERROR> re-send segmented "
+ "trans reply failed",
+ [{segment_no, SN},
+ {bytes, Bytes},
+ {error, Reason}])
+ end
+ end,
+ lists:foreach(ReSend, Sent),
+ do_reply_timeout(ConnHandle, TransId, Timer, Rep).
+
+do_reply_timeout(ConnHandle, TransId, Timer, #reply{bytes = Bytes}) ->
+ {WaitFor, Timer2} = megaco_timer:restart(Timer),
+ OptBin = case Bytes of
+ Bin when is_binary(Bin) ->
+ opt_garb_binary(Timer2, Bin);
+ Sent when is_list(Sent) ->
+ Garb = fun(Bin) -> opt_garb_binary(Timer2, Bin) end,
+ [{SN, Garb(Bin)} || {SN, Bin} <- Sent]
+ end,
+ M = ?MODULE,
+ F = reply_timeout,
+ A = [ConnHandle, TransId, Timer2],
+ Ref2 = megaco_monitor:apply_after(M, F, A, WaitFor),
+ NewFields =
+ [{#reply.bytes, OptBin},
+ {#reply.timer_ref, Ref2}],
+ megaco_monitor:update_reply_fields(TransId, NewFields), % Timing problem?
+ {restarted, WaitFor, Timer2}.
+
+
+handle_reply_timer_timeout(ConnHandle, TransId) ->
+ ?report_trace(ConnHandle, "handle reply timeout", [timeout, TransId]),
+ incNumTimerRecovery(ConnHandle),
+ %% OTP-4378
+ case megaco_monitor:lookup_reply(TransId) of
+ [#reply{state = waiting_for_ack} = Rep] ->
+ Serial = (Rep#reply.trans_id)#trans_id.serial,
+ ConnData =
+ case megaco_config:lookup_local_conn(ConnHandle) of
+ [ConnData0] ->
+ ConnData0;
+ [] ->
+ fake_conn_data(ConnHandle)
+ end,
+ ConnData2 = ConnData#conn_data{serial = Serial},
+ T = #'TransactionAck'{firstAck = Serial},
+ Extra = ?default_user_callback_extra,
+ handle_ack(ConnData2, {error, timeout}, Rep, T, Extra);
+ [#reply{pending_timer_ref = Ref, % aborted?
+ bytes = SegSent}] -> % may be a binary
+ megaco_monitor:cancel_apply_after(Ref),
+ cancel_segment_timers(SegSent),
+ megaco_monitor:delete_reply(TransId),
+ megaco_config:del_pending_counter(sent, TransId);
+ [] ->
+ ignore_reply_removed
+ end.
+
+%% segment_reply_timeout(ConnHandle, TransId, SN, timeout) ->
+%% ?report_trace(ConnHandle, "segment reply timeout", [timeout, SN, TransId]),
+%% D = fun({_, _, SegRef}) ->
+%% megaco_monitor:cancel_apply_after(SegRef)
+%% end,
+%% incNumTimerRecovery(ConnHandle),
+%% %% OTP-4378
+%% case megaco_monitor:lookup_reply(TransId) of
+%% [#reply{state = waiting_for_ack,
+%% bytes = Sent} = Rep] ->
+%% Serial = (Rep#reply.trans_id)#trans_id.serial,
+%% ConnData =
+%% case megaco_config:lookup_local_conn(ConnHandle) of
+%% [ConnData0] ->
+%% ConnData0;
+%% [] ->
+%% fake_conn_data(ConnHandle)
+%% end,
+%% ConnData2 = ConnData#conn_data{serial = Serial},
+%% T = #'TransactionAck'{firstAck = Serial},
+%% lists:foreach(D, Sent),
+%% Extra = ?default_user_callback_extra,
+%% handle_ack(ConnData2, {error, timeout}, Rep, T, Extra);
+%% [#reply{pending_timer_ref = Ref,
+%% bytes = Sent}] -> % aborted?
+%% lists:foreach(D, Sent),
+%% megaco_monitor:cancel_apply_after(Ref),
+%% megaco_monitor:delete_reply(TransId),
+%% megaco_config:del_pending_counter(sent, TransId);
+
+%% [] ->
+%% ignore
+
+%% end.
+
+%% segment_reply_timeout(ConnHandle, TransId, SN, Timer) ->
+%% ?report_trace(ConnHandle, "reply timeout", [Timer, SN, TransId]),
+
+%% %% d("reply_timeout -> entry with"
+%% %% "~n ConnHandle: ~p"
+%% %% "~n TransId: ~p"
+%% %% "~n Timer: ~p", [ConnHandle, TransId, Timer]),
+
+%% case megaco_monitor:lookup_reply(TransId) of
+%% [] ->
+%% ignore; % Trace ??
+
+%% [#reply{state = waiting_for_ack,
+%% bytes = ack_action = {handle_ack, _}} = Rep] ->
+%% case megaco_config:lookup_local_conn(ConnHandle) of
+%% [ConnData] ->
+%% incNumTimerRecovery(ConnHandle),
+%% do_reply_timeout(ConnHandle, TransId, ConnData,
+%% Timer, Rep);
+%% [] ->
+%% incNumTimerRecovery(ConnHandle),
+%% ConnData = fake_conn_data(ConnHandle),
+%% do_reply_timeout(ConnHandle, TransId, ConnData,
+%% Timer, Rep)
+%% end;
+
+%% [#reply{state = waiting_for_ack} = Rep] ->
+%% do_reply_timeout(ConnHandle, TransId, Timer, Rep);
+
+%% [#reply{state = aborted} = Rep] ->
+%% do_reply_timeout(ConnHandle, TransId, Timer, Rep);
+
+%% _ ->
+%% ignore
+
+%% end.
+
+
+%% This clause is to catch the timers started prior to the code-upgrade
+pending_timeout(#conn_data{conn_handle = CH}, TransId, Timer) ->
+ ?report_trace(CH, "pending timeout(1)", [Timer, TransId]),
+ pending_timeout(CH, TransId, Timer);
+
+pending_timeout(ConnHandle, TransId, Timer) ->
+ ?report_trace(ConnHandle, "pending timeout(2)", [Timer, TransId]),
+ case megaco_config:lookup_local_conn(ConnHandle) of
+ [CD] when (CD#conn_data.cancel == true) ->
+ cancel_in_progress_ignore;
+ [CD] ->
+ Serial = TransId#trans_id.serial,
+ handle_pending_timeout(CD#conn_data{serial = Serial},
+ TransId, Timer);
+ [] ->
+ no_such_connection_ignore
+ end.
+
+handle_pending_timeout(CD, TransId, Timer) ->
+ ?report_trace(CD, "handle pending timeout", []),
+ case megaco_monitor:lookup_reply(TransId) of
+ [#reply{state = State,
+ handler = Pid} = Rep] when (State =:= prepare) orelse
+ (State =:= eval_request) ->
+
+ #conn_data{sent_pending_limit = Limit,
+ conn_handle = ConnHandle} = CD,
+
+ %% ------------------------------------------
+ %%
+ %% Check pending limit
+ %%
+ %% ------------------------------------------
+
+ case check_and_maybe_incr_pending_limit(Limit, sent, TransId) of
+ ok ->
+
+ %% ---------------------------------------------
+ %%
+ %% 1) Send pending message
+ %% 2) Possibly restart the pending timer
+ %%
+ %% ---------------------------------------------
+
+ send_pending(CD),
+ case Timer of
+ timeout ->
+ %% We are done
+ incNumTimerRecovery(ConnHandle),
+ timeout1;
+ {_, timeout} ->
+ %% We are done
+ incNumTimerRecovery(ConnHandle),
+ timeout2;
+ _ ->
+ {WaitFor, Timer2} = megaco_timer:restart(Timer),
+ M = ?MODULE,
+ F = pending_timeout,
+ A = [ConnHandle, TransId, Timer2],
+ PendingRef =
+ megaco_monitor:apply_after(M, F, A, WaitFor),
+ %% Timing problem?
+ megaco_monitor:update_reply_field(TransId,
+ #reply.pending_timer_ref,
+ PendingRef),
+ {restarted, WaitFor, Timer2}
+ end;
+
+
+ error ->
+
+ %% ------------------------------------------
+ %%
+ %% 1) Send 506 error message to other side
+ %% 2) Notify user
+ %% 3) Set reply data in aborted state
+ %%
+ %% -------------------------------------------
+
+ send_pending_limit_error(CD),
+ handle_request_abort_callback(CD, TransId, Pid),
+ %% Timing problem?
+ Rep2 = Rep#reply{state = aborted},
+ cancel_reply(CD, Rep2, aborted),
+ pending_limit_error;
+
+
+ aborted ->
+
+ %% ------------------------------------------
+ %%
+ %% Pending limit already passed
+ %%
+ %% -------------------------------------------
+ Rep2 = Rep#reply{state = aborted},
+ cancel_reply(CD, Rep2, aborted),
+ pending_limit_aborted
+
+ end;
+ [] ->
+ reply_not_found; % Trace ??
+
+ [#reply{state = waiting_for_ack}] ->
+ %% The reply has already been sent
+ %% No need for any pending trans reply
+ reply_has_been_sent;
+
+ [#reply{state = aborted} = Rep] ->
+ %% glitch, but cleanup just the same
+ cancel_reply(CD, Rep, aborted),
+ reply_aborted_state
+
+ end.
+
+
+segment_timeout(ConnHandle, TransId, timeout = Timer) ->
+ ?report_trace(ConnHandle, "segment timeout", [TransId, Timer]),
+ incNumTimerRecovery(ConnHandle),
+ case megaco_monitor:lookup_request(TransId) of
+ [] ->
+ timeout_not_found_ignore;
+
+ [#request{seg_recv = Segs} = Req] ->
+ ConnData =
+ case megaco_config:lookup_local_conn(ConnHandle) of
+ [ConnData0] ->
+ ConnData0;
+ [] ->
+ fake_conn_data(ConnHandle)
+ end,
+ Last = lists:last(lists:sort(Segs)),
+ All = lists:seq(1,Last),
+ case All -- Segs of
+ [] ->
+ %% The last segment has just arrived, ignore
+ ok;
+ Missing ->
+ %% Send the error message
+ Code = ?megaco_segments_not_received,
+ Reason = missing_to_str(Missing),
+ send_message_error(ConnData, Code, Reason),
+
+ %% Report to the user
+ UserMod = Req#request.user_mod,
+ UserArgs = Req#request.user_args,
+ Action = Req#request.reply_action,
+ UserData = Req#request.reply_data,
+ UserReply = {error, {segment_timeout, Missing}},
+ ConnData2 = ConnData#conn_data{user_mod = UserMod,
+ user_args = UserArgs,
+ reply_action = Action,
+ reply_data = UserData},
+ return_reply(ConnData2, TransId, UserReply)
+ end
+ end;
+
+segment_timeout(ConnHandle, TransId, Timer) ->
+ ?report_trace(ConnHandle, "segment timeout", [TransId, Timer]),
+ case megaco_monitor:lookup_request_field(TransId, #request.trans_id) of
+ {ok, _} ->
+ {WaitFor, Timer2} = megaco_timer:restart(Timer),
+ M = ?MODULE,
+ F = segment_timeout,
+ A = [ConnHandle, TransId, Timer2],
+ Ref = megaco_monitor:apply_after(M, F, A, WaitFor),
+ %% Timing problem?
+ megaco_monitor:update_request_field(TransId,
+ #request.seg_timer_ref,
+ Ref),
+ {restarted, WaitFor, Timer2};
+ _ ->
+ not_found_ignore
+ end.
+
+%% segment_reply_timeout() ->
+%% ok.
+
+missing_to_str(Missing) ->
+ lists:flatten(missing_to_str2(Missing)).
+
+missing_to_str2([X]) ->
+ [integer_to_list(X)];
+missing_to_str2([H|T]) ->
+ [integer_to_list(H) , "," | missing_to_str2(T)].
+
+return_unexpected_trans_reply(ConnData, TransId,
+ {actionReplies, _} = UserReply, Extra) ->
+ Trans = make_transaction_reply(ConnData, TransId, UserReply),
+ return_unexpected_trans(ConnData, Trans, Extra);
+return_unexpected_trans_reply(ConnData, TransId,
+ {transactionError, _} = UserReply, Extra) ->
+ Trans = make_transaction_reply(ConnData, TransId, UserReply),
+ return_unexpected_trans(ConnData, Trans, Extra);
+return_unexpected_trans_reply(CD, TransId, {error, Reason}, Extra) ->
+ ?report_important(CD, "unexpected trans reply with error",
+ [TransId, Reason, Extra]),
+ ok;
+return_unexpected_trans_reply(CD, TransId, Crap, Extra) ->
+ ?report_important(CD, "unexpected trans reply with crap",
+ [TransId, Crap, Extra]),
+ ok.
+
+return_unexpected_trans(ConnData, Trans) ->
+ Extra = ?default_user_callback_extra,
+ return_unexpected_trans(ConnData, Trans, Extra).
+
+return_unexpected_trans(ConnData, Trans0, Extra) ->
+ UserMod = ConnData#conn_data.user_mod,
+ UserArgs = ConnData#conn_data.user_args,
+ ConnHandle = ConnData#conn_data.conn_handle,
+ Version = ConnData#conn_data.protocol_version,
+ Trans = transform_transaction_reply_enc(Version, Trans0),
+ Args =
+ case Extra of
+ ?default_user_callback_extra ->
+ [ConnHandle, Version, Trans | UserArgs];
+ _ ->
+ [ConnHandle, Version, Trans, Extra | UserArgs]
+ end,
+ Res = (catch apply(UserMod, handle_unexpected_trans, Args)),
+ ?report_debug(ConnData, "return: unexpected trans",
+ [Trans, {return, Res}]),
+ case Res of
+ ok ->
+ ok;
+ _ ->
+ warning_msg("unexpected transaction callback failed: ~w", [Res]),
+ ok
+ end,
+ Res.
+
+
+%%-----------------------------------------------------------------
+
+to_remote_trans_id(#conn_data{conn_handle = CH, serial = Serial}) ->
+ Mid = CH#megaco_conn_handle.remote_mid,
+ #trans_id{mid = Mid, serial = Serial}.
+
+to_local_trans_id(#conn_data{conn_handle = CH, serial = Serial}) ->
+ Mid = CH#megaco_conn_handle.local_mid,
+ #trans_id{mid = Mid, serial = Serial}.
+
+to_local_trans_id(#conn_data{conn_handle = CH}, [S|_] = Serials)
+ when is_integer(S) ->
+ Mid = CH#megaco_conn_handle.local_mid,
+ [#trans_id{mid = Mid, serial = Serial} || Serial <- Serials];
+to_local_trans_id(#conn_data{conn_handle = CH},
+ [{transactionRequest, TR}|_] = TRs)
+ when is_record(TR, 'TransactionRequest') ->
+ Mid = CH#megaco_conn_handle.local_mid,
+ [#trans_id{mid = Mid, serial = Serial} ||
+ {transactionRequest,
+ #'TransactionRequest'{transactionId = Serial}} <- TRs];
+
+to_local_trans_id(#megaco_conn_handle{local_mid = Mid}, Serial)
+ when is_integer(Serial) ->
+ #trans_id{mid = Mid, serial = Serial};
+to_local_trans_id(#conn_data{conn_handle = CH}, Serial)
+ when is_integer(Serial) ->
+ Mid = CH#megaco_conn_handle.local_mid,
+ #trans_id{mid = Mid, serial = Serial}.
+
+
+%%-----------------------------------------------------------------
+
+transform_transaction_reply_dec({'TransactionReply',
+ TransId, IAR, TransRes}) ->
+ #megaco_transaction_reply{transactionId = TransId,
+ immAckRequired = IAR,
+ transactionResult = TransRes};
+transform_transaction_reply_dec({'TransactionReply',
+ TransId, IAR, TransRes,
+ SegNo, SegComplete}) ->
+ #megaco_transaction_reply{transactionId = TransId,
+ immAckRequired = IAR,
+ transactionResult = TransRes,
+ segmentNumber = SegNo,
+ segmentationComplete = SegComplete}.
+
+transform_transaction_reply_enc(
+ 3,
+ #megaco_transaction_reply{transactionId = TransId,
+ immAckRequired = IAR,
+ transactionResult = TransRes,
+ segmentNumber = SegNo,
+ segmentationComplete = SegComplete}) ->
+ {'TransactionReply', TransId, IAR, TransRes, SegNo, SegComplete};
+transform_transaction_reply_enc(
+ Version,
+ #megaco_transaction_reply{transactionId = TransId,
+ immAckRequired = IAR,
+ transactionResult = TransRes})
+ when (Version < 3) ->
+ {'TransactionReply', TransId, IAR, TransRes};
+transform_transaction_reply_enc(_, TR) ->
+ TR.
+
+make_transaction_reply(#conn_data{protocol_version = Version},
+ TransId, TransRes) ->
+ make_transaction_reply(Version, TransId, asn1_NOVALUE, TransRes).
+
+%% make_transaction_reply(#conn_data{protocol_version = Version},
+%% TransId, IAR, TransRes) ->
+%% make_transaction_reply(Version, TransId, IAR, TransRes);
+
+make_transaction_reply(3, TransId, IAR, TransRes) ->
+ {'TransactionReply', TransId, IAR, TransRes, asn1_NOVALUE, asn1_NOVALUE};
+make_transaction_reply(_, TransId, IAR, TransRes) ->
+ {'TransactionReply', TransId, IAR, TransRes}.
+
+
+%%-----------------------------------------------------------------
+
+%%-----------------------------------------------------------------
+warning_msg(F, A) ->
+ ?megaco_warning(F, A).
+
+error_msg(F, A) ->
+ ?megaco_error(F, A).
+
+
+%%-----------------------------------------------------------------
+
+%% d(F) ->
+%% d(F,[]).
+%%
+%% d(F,A) ->
+%% d(true,F,A).
+%% %% d(get(dbg),F,A).
+%%
+%% d(true,F,A) ->
+%% io:format("*** [~s] ~p:~p ***"
+%% "~n " ++ F ++ "~n",
+%% [format_timestamp(now()), self(),?MODULE|A]);
+%% d(_, _, _) ->
+%% ok.
+%%
+%% format_timestamp({_N1, _N2, N3} = Now) ->
+%% {Date, Time} = calendar:now_to_datetime(Now),
+%% {YYYY,MM,DD} = Date,
+%% {Hour,Min,Sec} = Time,
+%% FormatDate =
+%% io_lib:format("~.4w:~.2.0w:~.2.0w ~.2.0w:~.2.0w:~.2.0w 4~w",
+%% [YYYY,MM,DD,Hour,Min,Sec,round(N3/1000)]),
+%% lists:flatten(FormatDate).
+
+%% Time in milli seconds
+t() ->
+ {A,B,C} = erlang:now(),
+ A*1000000000+B*1000+(C div 1000).
+
+
+%%-----------------------------------------------------------------
+%% Func: incNumErrors/0, incNumErrors/1, incNumTimerRecovery/1
+%% Description: SNMP counter increment functions
+%%-----------------------------------------------------------------
+incNumErrors() ->
+ incNum(medGwyGatewayNumErrors).
+
+incNumErrors(CH) ->
+ incNum({CH, medGwyGatewayNumErrors}).
+
+incNumTimerRecovery(CH) ->
+ incNum({CH, medGwyGatewayNumTimerRecovery}).
+
+incNum(Cnt) ->
+ case (catch ets:update_counter(megaco_stats, Cnt, 1)) of
+ {'EXIT', {badarg, _Reason}} ->
+ ets:insert(megaco_stats, {Cnt, 1});
+ Old ->
+ Old
+ end.
+