aboutsummaryrefslogtreecommitdiffstats
path: root/lib/megaco/src/engine/megaco_trans_sender.erl
diff options
context:
space:
mode:
authorErlang/OTP <otp@erlang.org>2009-11-20 14:54:40 +0000
committerErlang/OTP <otp@erlang.org>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/megaco/src/engine/megaco_trans_sender.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/megaco/src/engine/megaco_trans_sender.erl')
-rw-r--r--lib/megaco/src/engine/megaco_trans_sender.erl699
1 files changed, 699 insertions, 0 deletions
diff --git a/lib/megaco/src/engine/megaco_trans_sender.erl b/lib/megaco/src/engine/megaco_trans_sender.erl
new file mode 100644
index 0000000000..710fef405a
--- /dev/null
+++ b/lib/megaco/src/engine/megaco_trans_sender.erl
@@ -0,0 +1,699 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2003-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%%----------------------------------------------------------------------
+%% Purpose: Transaction sender process
+%%----------------------------------------------------------------------
+
+-module(megaco_trans_sender).
+
+-export([start_link/5,
+ stop/1,
+ upgrade/2,
+ send_req/3,
+ send_reqs/3,
+ send_ack/2,
+ send_ack_now/2,
+ send_pending/2,
+ send_reply/2,
+ timeout/2,
+ ack_maxcount/2,
+ req_maxcount/2,
+ req_maxsize/2]).
+-export([system_continue/3, system_terminate/4, system_code_change/4]).
+-export([init/6]).
+
+
+-include_lib("megaco/include/megaco.hrl").
+-include("megaco_message_internal.hrl").
+-include_lib("megaco/src/app/megaco_internal.hrl").
+
+
+-record(state,
+ {
+ parent,
+ conn_handle,
+ timeout,
+ req_sz = 0,
+ req_maxsize, %% Max total size of all accumulated reqs
+ req_maxcount,
+ ack_maxcount,
+ reqs = [],
+ acks = []
+ }).
+
+
+%%%-----------------------------------------------------------------
+%%% Public API
+%%%-----------------------------------------------------------------
+start_link(CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) ->
+ ?d("start_link -> entry with"
+ "~n CH: ~p"
+ "~n To: ~p"
+ "~n MaxSzReqs: ~p"
+ "~n MaxNoReqs: ~p"
+ "~n MaxNoAcks: ~p", [CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]),
+ Args = [self(), CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks],
+ proc_lib:start_link(?MODULE, init, Args).
+
+stop(Pid) when is_pid(Pid) ->
+ Pid ! stop,
+ ok.
+
+upgrade(Pid, CH) when is_pid(Pid) ->
+ Pid ! {upgrade, CH},
+ ok.
+
+send_req(Pid, Tid, Req) when is_pid(Pid) andalso is_binary(Req) ->
+ Pid ! {send_req, Tid, Req},
+ ok.
+
+send_reqs(Pid, Tids, Reqs)
+ when is_pid(Pid) andalso
+ is_list(Tids) andalso
+ is_list(Reqs) andalso
+ (length(Tids) =:= length(Reqs)) ->
+ Pid ! {send_reqs, Tids, Reqs},
+ ok.
+
+send_ack(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) ->
+ Pid ! {send_ack, Serial},
+ ok.
+
+send_ack_now(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) ->
+ Pid ! {send_ack_now, Serial},
+ ok.
+
+send_pending(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) ->
+ Pid ! {send_pending, Serial},
+ ok.
+
+send_reply(Pid, Reply) when is_pid(Pid) andalso is_binary(Reply) ->
+ Pid ! {send_reply, Reply}.
+
+ack_maxcount(Pid, Max) when is_pid(Pid) andalso is_integer(Max) ->
+ Pid ! {ack_maxcount, Max},
+ ok.
+
+req_maxcount(Pid, Max) when is_pid(Pid) andalso is_integer(Max) ->
+ Pid ! {req_maxcount, Max},
+ ok.
+
+req_maxsize(Pid, Max) when is_pid(Pid) andalso is_integer(Max) ->
+ Pid ! {req_maxsize, Max},
+ ok.
+
+timeout(Pid, Timeout) when is_pid(Pid) ->
+ Pid ! {timeout, Timeout},
+ ok.
+
+
+
+%%%-----------------------------------------------------------------
+%%% Internal exports
+%%%-----------------------------------------------------------------
+
+init(Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) ->
+ ?d("init -> entry with"
+ "~n Parent: ~p"
+ "~n CH: ~p"
+ "~n To: ~p"
+ "~n MaxSzReqs: ~p"
+ "~n MaxNoReqs: ~p"
+ "~n MaxNoAcks: ~p", [Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]),
+ process_flag(trap_exit, true),
+ proc_lib:init_ack(Parent, {ok, self()}),
+ S = #state{parent = Parent,
+ conn_handle = CH,
+ timeout = To,
+ req_maxsize = MaxSzReqs,
+ req_maxcount = MaxNoReqs,
+ ack_maxcount = MaxNoAcks},
+ loop(S, To).
+
+
+%%%-----------------------------------------------------------------
+%%% Internal functions
+%%%-----------------------------------------------------------------
+%% idle (= empty)
+loop(#state{reqs = [], acks = [], timeout = Timeout} = S, _) ->
+ receive
+ {send_ack, Serial} ->
+ ?d("loop(empty) -> received send_ack [~w] request", [Serial]),
+ loop(S#state{acks = [Serial]}, Timeout);
+
+ {send_ack_now, Serial} ->
+ ?d("loop(empty) -> received send_ack_now [~w] request", [Serial]),
+ send_msg(S#state.conn_handle, [], [Serial]),
+ loop(S, Timeout);
+
+ {send_req, Tid, Req} when size(Req) >= S#state.req_maxsize ->
+ ?d("loop(empty) -> received (big) send_req request ~w", [Tid]),
+ send_msg(S#state.conn_handle, [{Tid, Req}], []),
+ loop(S, Timeout);
+
+ {send_req, Tid, Req} ->
+ ?d("loop(empty) -> received send_req request ~w", [Tid]),
+ loop(S#state{req_sz = size(Req), reqs = [{Tid,Req}]}, Timeout);
+
+ {send_reqs, Tids, Reqs} ->
+ ?d("loop(empty) -> received send_reqs request: ~w", [Tids]),
+ {NewS, _} = handle_send_reqs(Tids, Reqs, S),
+ loop(NewS, Timeout);
+
+ {send_pending, Serial} ->
+ ?d("loop(empty) -> received send_pending [~w] request", [Serial]),
+ handle_send_result(
+ send_pending(S#state.conn_handle, Serial, [], [])
+ ),
+ loop(S, Timeout);
+
+ {send_reply, Reply} ->
+ ?d("loop(empty) -> received send_reply request", []),
+ #state{conn_handle = CH, req_maxsize = MaxSz} = S,
+ handle_send_result( send_reply(CH, Reply, MaxSz, 0, [], []) ),
+ loop(S, Timeout);
+
+ {upgrade, CH} ->
+ ?d("loop(empty) -> received upgrade request:"
+ "~n CH: ~p", [CH]),
+ loop(S#state{conn_handle = CH}, Timeout);
+
+ {ack_maxcount, NewMax} ->
+ ?d("loop(empty) -> received ack_maxcount request", []),
+ loop(S#state{ack_maxcount = NewMax}, Timeout);
+
+ {req_maxcount, NewMax} ->
+ ?d("loop(empty) -> received req_maxcount request", []),
+ loop(S#state{req_maxcount = NewMax}, Timeout);
+
+ {req_maxsize, NewMax} ->
+ ?d("loop(empty) -> received req_maxsize request", []),
+ loop(S#state{req_maxsize = NewMax}, Timeout);
+
+ {timeout, NewTimeout} ->
+ ?d("loop(empty) -> received timeout request", []),
+ loop(S#state{timeout = NewTimeout}, NewTimeout);
+
+ stop ->
+ ?d("loop(empty) -> received stop request", []),
+ exit(normal);
+
+ {system, From, Msg} ->
+ ?d("loop(empty) -> received system message:"
+ "~n From: ~p"
+ "~n Msg: ~p", [From, Msg]),
+ Parent = S#state.parent,
+ sys:handle_system_msg(Msg, From, Parent,
+ ?MODULE, [], {S, Timeout});
+
+ {'EXIT', Parent, Reason} when S#state.parent == Parent ->
+ ?d("loop(empty) -> received upgrade request", []),
+ exit(Reason);
+
+ M ->
+ warning_msg("received unexpected message (ignoring): "
+ "~n~p", [M]),
+ loop(S, Timeout)
+
+ end;
+
+%% active (= some acks or reqs waiting to to be sent)
+loop(#state{reqs = Reqs, acks = Acks, ack_maxcount = MaxAcks,
+ timeout = Timeout} = S, To)
+ when To >= 0 ->
+ Start = t(),
+ receive
+ {send_ack, Serial} when length(Acks) + 1 >= MaxAcks ->
+ ?d("loop(active,~w) -> "
+ "received [~w] send_ack [~w] request",
+ [To, length(Acks), Serial]),
+ handle_send_result(
+ send_msg(S#state.conn_handle, Reqs, [Serial|Acks])
+ ),
+ loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout);
+
+ {send_ack, Serial} ->
+ ?d("loop(active,~w) -> received send_ack [~w] request",
+ [To, Serial]),
+ loop(S#state{acks = [Serial|Acks]}, to(To, Start));
+
+ {send_ack_now, Serial} ->
+ ?d("loop(active,~w) -> [~w,~w] "
+ "received send_ack_now [~w] request",
+ [To, length(Reqs), length(Acks), Serial]),
+ handle_send_result(
+ send_msg(S#state.conn_handle, Reqs, [Serial|Acks])
+ ),
+ loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout);
+
+ %% We need to check that this is not a resend!!
+ %% In that case, send whatever we have in store
+ {send_req, Tid, Req} ->
+ ?d("loop(active,~w) -> received send_req request ~w", [To,Tid]),
+ {NewS, NewT} =
+ case handle_send_req(Tid, Req, S) of
+ {S1, true} ->
+ {S1, Timeout};
+ {S1, false} ->
+ {S1, to(To, Start)}
+ end,
+ loop(NewS, NewT);
+
+ {send_reqs, Tids, NewReqs} ->
+ ?d("loop(active,~w) -> received send_reqs request ~w", [To,Tids]),
+ {NewS, NewT} =
+ case handle_send_reqs(Tids, NewReqs, S) of
+ {S1, true} ->
+ {S1, Timeout};
+ {S1, false} ->
+ {S1, to(To, Start)}
+ end,
+ loop(NewS, NewT);
+
+ {send_pending, Serial} ->
+ ?d("loop(active,~w) -> received send_pending [~w] request",
+ [To, Serial]),
+ handle_send_result(
+ send_pending(S#state.conn_handle, Serial, Reqs, Acks)
+ ),
+ loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout);
+
+ {send_reply, Reply} ->
+ ?d("loop(active,~w) -> received send_reply request", [To]),
+ #state{conn_handle = CH, req_maxsize = MaxSz, req_sz = ReqSz} = S,
+ handle_send_result(
+ send_reply(CH, Reply, MaxSz, ReqSz, Reqs, Acks)
+ ),
+ loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout);
+
+ {upgrade, CH} ->
+ ?d("loop(active,~w) -> received upgrade request", [To]),
+ loop(S#state{conn_handle = CH}, to(To, Start));
+
+ {req_maxsize, NewMax} ->
+ ?d("loop(active,~w) -> received req_maxsize request", [To]),
+ loop(S#state{req_maxsize = NewMax}, to(To, Start));
+
+ {req_maxcount, NewMax} ->
+ ?d("loop(active,~w) -> received req_maxcount request", [To]),
+ loop(S#state{req_maxcount = NewMax}, to(To, Start));
+
+ {ack_maxcount, NewMax} ->
+ ?d("loop(active,~w) -> received ack_maxcount request", [To]),
+ loop(S#state{ack_maxcount = NewMax}, to(To, Start));
+
+ {timeout, NewTimeout} when NewTimeout > Timeout ->
+ ?d("loop(active,~w) -> received timeout request: ~w",
+ [To, NewTimeout]),
+ %% We need to recalculate To
+ NewTo = NewTimeout - (Timeout - to(To, Start)),
+ loop(S#state{timeout = NewTimeout}, NewTo);
+
+ {timeout, NewTimeout} ->
+ ?d("loop(active,~w) -> received timeout request: ~w",
+ [To, NewTimeout]),
+ %% We need to recalculate To
+ NewTo = to(To, Start) - (Timeout - NewTimeout),
+ loop(S#state{timeout = NewTimeout}, NewTo);
+
+ stop ->
+ ?d("loop(active,~w) -> received stop request", [To]),
+ handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ),
+ exit(normal);
+
+ {system, From, Msg} ->
+ ?d("loop(active,~w) -> received system message:"
+ "~n From: ~p"
+ "~n Msg: ~p", [To, From, Msg]),
+ Parent = S#state.parent,
+ sys:handle_system_msg(Msg, From, Parent,
+ ?MODULE, [], {S, to(To, Start)});
+
+ {'EXIT', Parent, Reason} when S#state.parent == Parent ->
+ ?d("loop(active,~w) -> received exit request", [To]),
+ exit(Reason);
+
+ M ->
+ warning_msg("received unexpected message (ignoring): "
+ "~n~p", [M]),
+ loop(S, to(To, Start))
+
+ after To ->
+ ?d("loop(active,~w) -> timeout - time to send", [To]),
+ handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ),
+ loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout)
+ end;
+
+loop(#state{reqs = Reqs, acks = Acks, timeout = Timeout} = S, _To) ->
+ ?d("loop(active) -> timeout [~w, ~w]", [length(Reqs),length(Acks)]),
+ handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ),
+ loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout).
+
+
+%%%-----------------------------------------------------------------
+
+%% The request is itself larger then the max size, so first send
+%% everything we have stored in one message, and then the new request
+%% in another.
+%% Note that it does not matter if we with this request
+%% passed the maxcount limit.
+%% Note that this message cannot be a re-sent, since
+%% such a request would have been stored, but sent immediatly.
+handle_send_req(Tid, Req,
+ #state{conn_handle = CH,
+ req_maxsize = MaxSz, reqs = Reqs, acks = Acks} = S)
+ when size(Req) >= MaxSz ->
+ ?d("handle_send_req -> request bigger then maxsize ~w", [MaxSz]),
+ handle_send_result( send_msg(CH, Reqs, Acks) ),
+ handle_send_result( send_msg(CH, [{Tid, Req}], []) ),
+ {S#state{req_sz = 0, reqs = [], acks = []}, true};
+
+%% And handle all the other cases
+handle_send_req(Tid, Req,
+ #state{conn_handle = CH, req_sz = ReqSz,
+ req_maxcount = MaxReqs, req_maxsize = MaxSz,
+ reqs = Reqs, acks = Acks} = S) ->
+ case lists:keymember(Tid, 1, Reqs) of
+ true ->
+ %% A re-send, time to send whatever we have in the store
+ ?d("handle_send_req -> was a re-send, so flush",[]),
+ handle_send_result( send_msg(CH, Reqs, Acks) ),
+ {S#state{req_sz = 0, reqs = [], acks = []}, true};
+
+ false when length(Reqs) + 1 >= MaxReqs ->
+ %% We finally passed the req-maxcount limit
+ ?d("handle_send_req -> maxcount ~w passed", [MaxReqs]),
+ handle_send_result(
+ send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks)
+ ),
+ {S#state{req_sz = 0, reqs = [], acks = []}, true};
+
+ false when size(Req) + ReqSz >= MaxSz ->
+ %% We finally passed the req-maxsize limit
+ ?d("handle_send_req -> maxsize ~w passed", [MaxSz]),
+ handle_send_result(
+ send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks)
+ ),
+ {S#state{req_sz = 0, reqs = [], acks = []}, true};
+
+ false ->
+ %% Still not time to send
+ ?d("handle_send_req -> nothing to be sent",[]),
+ {S#state{req_sz = ReqSz + size(Req), reqs = [{Tid, Req}|Reqs]},
+ false}
+ end.
+
+
+%% We passed the req-maxcount limit: Time to send, atleast some of
+%% the stuff...
+handle_send_reqs(Tids, Reqs0,
+ #state{conn_handle = CH,
+ req_maxsize = MaxSz, req_sz = ReqSz,
+ req_maxcount = MaxReqs, reqs = Reqs, acks = Acks} = S)
+ when length(Reqs0) + length(Reqs) >= MaxReqs ->
+ ?d("handle_send_reqs -> maxcount ~w: ~w, ~w",
+ [MaxSz,length(Reqs0),length(Reqs)]),
+ Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []),
+ {NewReqs, NewReqSz} = send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz),
+ ?d("handle_send_reqs -> sent:"
+ "~n NewReqSz: ~w"
+ "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),
+ {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true};
+
+%% We did not pass the req-maxcount limit, but we could have passed the
+%% req-maxsize limit, so maybe send...
+handle_send_reqs(Tids, Reqs0, #state{conn_handle = CH,
+ req_maxsize = MaxSz, req_sz = ReqSz,
+ reqs = Reqs, acks = Acks} = S) ->
+ ?d("handle_send_reqs -> not maxcount - maybe maxsize (~w)", [MaxSz]),
+ Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []),
+
+ case maybe_send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz, false) of
+ {NewReqs, NewReqSz, true} ->
+ ?d("handle_send_reqs -> sent:"
+ "~n NewReqSz: ~w"
+ "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),
+ {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true};
+ {NewReqs, NewReqSz, false} ->
+ ?d("handle_send_reqs -> not sent:"
+ "~n NewReqSz: ~w"
+ "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),
+ {S#state{req_sz = NewReqSz, reqs = NewReqs}, false}
+ end.
+
+merge_tids_and_reqs([], [], Reqs) ->
+ Reqs;
+merge_tids_and_reqs([Tid|Tids], [Req|Reqs], Acc) ->
+ merge_tids_and_reqs(Tids, Reqs, [{Tid,Req}|Acc]).
+
+%% We know that we shall send, so if maybe_send_reqs does not,
+%% we send it our self...
+send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz) ->
+ ?d("send_reqs -> entry when"
+ "~n length(Reqs): ~w"
+ "~n Acks: ~w"
+ "~n length(Acc): ~w"
+ "~n AccSz: ~w", [length(Reqs), Acks, length(Acc), AccSz]),
+ case maybe_send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz, false) of
+ {NewReqs, _NewReqSz, false} ->
+ ?d("send_reqs -> nothing sent yet"
+ "~n length(NewReqs): ~w", [length(NewReqs)]),
+ handle_send_result( send_msg(CH, NewReqs, Acks) ),
+ {[], 0};
+ {NewReqs, NewReqSz, true} ->
+ ?d("send_reqs -> something sent"
+ "~n length(NewReqs): ~w"
+ "~n NewReqSz: ~w", [length(NewReqs), NewReqSz]),
+ {NewReqs, NewReqSz}
+ end.
+
+
+maybe_send_reqs(_CH, [], _Acks, Acc, AccSz, _MaxSz, Sent) ->
+ ?d("maybe_send_reqs -> done when"
+ "~n Sent: ~w"
+ "~n AccSz: ~w"
+ "~n length(Acc): ~w", [Sent, AccSz, length(Acc)]),
+ {Acc, AccSz, Sent};
+maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, _AccSz, MaxSz, _Sent)
+ when size(Req) >= MaxSz ->
+ %% The request was above the maxsize limit, so first send
+ %% what's in store and the the big request.
+ ?d("maybe_send_reqs -> entry when request [~w] size (~w) > max size"
+ "~n Acks: ~w"
+ "~n length(Acc): ~w", [Tid, size(Req), Acks, length(Acc)]),
+ handle_send_result( send_msg(CH, Acc, Acks) ),
+ handle_send_result( send_msg(CH, [{Tid, Req}], []) ),
+ maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true);
+maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, _Sent)
+ when AccSz + size(Req) >= MaxSz ->
+ %% We _did_ pass the maxsize limit with this request, so send
+ ?d("maybe_send_reqs -> entry when sum of requests (~w) > max size"
+ "~n Tid: ~w"
+ "~n Acks: ~w"
+ "~n length(Acc): ~w", [Tid, size(Req) + AccSz, Acks, length(Acc)]),
+ handle_send_result( send_msg(CH, [{Tid, Req}|Acc], Acks) ),
+ maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true);
+maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, Sent) ->
+ ?d("maybe_send_reqs -> entry when"
+ "~n Tid: ~w"
+ "~n size(Req): ~w"
+ "~n Acks: ~w"
+ "~n length(Acc): ~w"
+ "~n AccSz: ~w", [Tid, size(Req), Acks, length(Acc), AccSz]),
+ NewAcc = [{Tid,Req}|Acc],
+ NewAccSz = AccSz + size(Req),
+ maybe_send_reqs(CH, Reqs, Acks, NewAcc, NewAccSz, MaxSz, Sent).
+
+
+%%%-----------------------------------------------------------------
+
+send_pending(CH, Serial, Reqs, Acks) ->
+ ?d("send_pending -> entry with"
+ "~n Serial: ~w"
+ "~n length(Reqs): ~w"
+ "~n length(Acks): ~w", [Serial, length(Reqs), length(Acks)]),
+ case megaco_config:lookup_local_conn(CH) of
+ [CD] ->
+ TP = #'TransactionPending'{transactionId = Serial},
+ Pend = {transactionPending, TP},
+ do_send_msg(CD, Pend, lists:reverse(Reqs), Acks);
+ [] ->
+ ok
+ end.
+
+
+%% We need to check the size of the reply. If the reply itself is
+%% larger then the max limit, then it is sent in a separate message.
+send_reply(CH, Reply, MaxSz, _ReqSz, Reqs, Acks) ->
+ ?d("send_reply -> entry with"
+ "~n length(Reqs): ~w"
+ "~n length(Acks): ~w", [length(Reqs), length(Acks)]),
+ case megaco_config:lookup_local_conn(CH) of
+ [CD] when size(Reply) > MaxSz ->
+ handle_send_result( send_msg(CD, lists:reverse(Reqs), Acks) ),
+ Rep = {transactionReply, Reply},
+ do_send_msg(CD, Rep, [], []);
+ [CD] ->
+ Rep = {transactionReply, Reply},
+ do_send_msg(CD, Rep, lists:reverse(Reqs), Acks);
+ [] ->
+ ok
+ end.
+
+do_send_msg(CD, Trans, [], []) ->
+ Body = {transactions, [Trans]},
+ Slogan = "send trans reply/pending",
+ ?d("do_send_msg -> ~s", [Slogan]),
+ megaco_messenger_misc:send_body(CD, Slogan, Body);
+do_send_msg(CD, Trans, Reqs0, []) ->
+ Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0],
+ Body = {transactions, [Trans|Reqs]},
+ Slogan = "send trans reply/pending and reqs",
+ ?d("do_send_msg -> ~s", [Slogan]),
+ megaco_messenger_misc:send_body(CD, Slogan, Body);
+do_send_msg(CD, Trans, [], SerialRanges) ->
+ Acks = make_acks(ranges(SerialRanges), []),
+ Body = {transactions, [Trans, {transactionResponseAck, Acks}]},
+ Slogan = "send trans reply/pending and acks",
+ ?d("do_send_msg -> ~s", [Slogan]),
+ megaco_messenger_misc:send_body(CD, Slogan, Body);
+do_send_msg(CD, Trans, Reqs0, SerialRanges) ->
+ Acks = make_acks(ranges(SerialRanges), []),
+ Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0],
+ Body = {transactions, [Trans, {transactionResponseAck, Acks}|Reqs]},
+ Slogan = "send trans reply/pending, reqs and acks",
+ ?d("do_send_msg -> ~s", [Slogan]),
+ megaco_messenger_misc:send_body(CD, Slogan, Body).
+
+
+
+send_msg(_, [], []) ->
+ ok;
+send_msg(CH, Reqs, Serials) ->
+ case megaco_config:lookup_local_conn(CH) of
+ [ConnData] ->
+ do_send_msg(ConnData, lists:reverse(Reqs), Serials);
+ [] ->
+ ok
+ end.
+
+
+do_send_msg(CD, Reqs0, []) ->
+ ?d("do_send_msg -> entry with"
+ "~n length(Reqs0): ~p", [length(Reqs0)]),
+ Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0],
+ %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),
+ Body = {transactions, Reqs},
+ megaco_messenger_misc:send_body(CD, "send trans reqs", Body);
+do_send_msg(CD, [], SerialRanges) ->
+ ?d("do_send_msg -> entry with"
+ "~n SerialRanges: ~p", [SerialRanges]),
+ Acks = make_acks(ranges(SerialRanges), []),
+ %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),
+ Body = {transactions, [{transactionResponseAck, Acks}]},
+ megaco_messenger_misc:send_body(CD, "send trans acks", Body);
+do_send_msg(CD, Reqs0, SerialRanges) ->
+ ?d("do_send_msg -> entry with"
+ "~n length(Reqs0): ~p"
+ "~n SerialRanges: ~p", [length(Reqs0), SerialRanges]),
+ Acks = make_acks(ranges(SerialRanges), []),
+ Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0],
+ %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),
+ Body = {transactions, [{transactionResponseAck, Acks}|Reqs]},
+ megaco_messenger_misc:send_body(CD, "send trans reqs and acks", Body).
+
+
+handle_send_result(ok) ->
+ ok;
+handle_send_result({ok, _}) ->
+ ok;
+handle_send_result({error, {send_message_cancelled, _Reason}}) ->
+ ok;
+handle_send_result({error, {send_message_failed, Reason}}) ->
+ error_msg("Failed sending message: ~n ~p", [Reason]),
+ error;
+handle_send_result(Error) ->
+ error_msg("Failed sending message: ~n ~p", [Error]),
+ error.
+
+
+ranges(L) ->
+ lists:reverse(ranges(lists:sort(L), [], [])).
+
+ranges([], Range, Ranges) ->
+ ranges2(Range, Ranges);
+ranges([S1|Sn], [S2|_] = Range, Ranges) when S1 == (S2+1) ->
+ ranges(Sn, [S1|Range], Ranges);
+ranges([S|Sn], Range, Ranges) ->
+ ranges(Sn, [S], ranges2(Range, Ranges)).
+
+ranges2([], Ranges) ->
+ Ranges;
+ranges2([S], Ranges) ->
+ [{S,S}|Ranges];
+ranges2(Range0, Ranges) ->
+ Range = lists:reverse(Range0),
+ [{hd(Range),lists:last(Range)}|Ranges].
+
+
+make_acks([], Acks) ->
+ lists:reverse(Acks);
+make_acks([{S,S}|SerialRanges], Acks) ->
+ TRA = #'TransactionAck'{firstAck = S},
+ make_acks(SerialRanges, [TRA|Acks]);
+make_acks([{F,L}|SerialRanges], Acks) ->
+ TRA = #'TransactionAck'{firstAck = F, lastAck = L},
+ make_acks(SerialRanges, [TRA|Acks]).
+
+
+
+%%%-----------------------------------------------------------------
+
+to(To, Start) ->
+ To - (t() - Start).
+
+%% Time in milli seconds
+t() ->
+ {A,B,C} = erlang:now(),
+ A*1000000000+B*1000+(C div 1000).
+
+warning_msg(F, A) ->
+ ?megaco_warning("Transaction sender: " ++ F, A).
+
+error_msg(F, A) ->
+ ?megaco_error("Transaction sender: " ++ F, A).
+
+
+%%%-----------------------------------------------------------------
+%%% System messages handled here
+%%%-----------------------------------------------------------------
+
+system_continue(_Parent, _Dbg, {S,To}) ->
+ loop(S, To).
+
+system_terminate(Reason, _Parent, _Dbg, {S, _}) ->
+ #state{conn_handle = CH, reqs = Reqs, acks = Acks} = S,
+ send_msg(CH, Reqs, Acks),
+ exit(Reason).
+
+system_code_change(S, _Module, _OLdVsn, _Extra) ->
+ ?d("system_code_change -> entry", []),
+ {ok, S}.