diff options
author | Erlang/OTP <otp@erlang.org> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <otp@erlang.org> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/megaco/src/engine/megaco_trans_sender.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/megaco/src/engine/megaco_trans_sender.erl')
-rw-r--r-- | lib/megaco/src/engine/megaco_trans_sender.erl | 699 |
1 files changed, 699 insertions, 0 deletions
diff --git a/lib/megaco/src/engine/megaco_trans_sender.erl b/lib/megaco/src/engine/megaco_trans_sender.erl new file mode 100644 index 0000000000..710fef405a --- /dev/null +++ b/lib/megaco/src/engine/megaco_trans_sender.erl @@ -0,0 +1,699 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2003-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%%---------------------------------------------------------------------- +%% Purpose: Transaction sender process +%%---------------------------------------------------------------------- + +-module(megaco_trans_sender). + +-export([start_link/5, + stop/1, + upgrade/2, + send_req/3, + send_reqs/3, + send_ack/2, + send_ack_now/2, + send_pending/2, + send_reply/2, + timeout/2, + ack_maxcount/2, + req_maxcount/2, + req_maxsize/2]). +-export([system_continue/3, system_terminate/4, system_code_change/4]). +-export([init/6]). + + +-include_lib("megaco/include/megaco.hrl"). +-include("megaco_message_internal.hrl"). +-include_lib("megaco/src/app/megaco_internal.hrl"). + + +-record(state, + { + parent, + conn_handle, + timeout, + req_sz = 0, + req_maxsize, %% Max total size of all accumulated reqs + req_maxcount, + ack_maxcount, + reqs = [], + acks = [] + }). + + +%%%----------------------------------------------------------------- +%%% Public API +%%%----------------------------------------------------------------- +start_link(CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) -> + ?d("start_link -> entry with" + "~n CH: ~p" + "~n To: ~p" + "~n MaxSzReqs: ~p" + "~n MaxNoReqs: ~p" + "~n MaxNoAcks: ~p", [CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]), + Args = [self(), CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks], + proc_lib:start_link(?MODULE, init, Args). + +stop(Pid) when is_pid(Pid) -> + Pid ! stop, + ok. + +upgrade(Pid, CH) when is_pid(Pid) -> + Pid ! {upgrade, CH}, + ok. + +send_req(Pid, Tid, Req) when is_pid(Pid) andalso is_binary(Req) -> + Pid ! {send_req, Tid, Req}, + ok. + +send_reqs(Pid, Tids, Reqs) + when is_pid(Pid) andalso + is_list(Tids) andalso + is_list(Reqs) andalso + (length(Tids) =:= length(Reqs)) -> + Pid ! {send_reqs, Tids, Reqs}, + ok. + +send_ack(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) -> + Pid ! {send_ack, Serial}, + ok. + +send_ack_now(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) -> + Pid ! {send_ack_now, Serial}, + ok. + +send_pending(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) -> + Pid ! {send_pending, Serial}, + ok. + +send_reply(Pid, Reply) when is_pid(Pid) andalso is_binary(Reply) -> + Pid ! {send_reply, Reply}. + +ack_maxcount(Pid, Max) when is_pid(Pid) andalso is_integer(Max) -> + Pid ! {ack_maxcount, Max}, + ok. + +req_maxcount(Pid, Max) when is_pid(Pid) andalso is_integer(Max) -> + Pid ! {req_maxcount, Max}, + ok. + +req_maxsize(Pid, Max) when is_pid(Pid) andalso is_integer(Max) -> + Pid ! {req_maxsize, Max}, + ok. + +timeout(Pid, Timeout) when is_pid(Pid) -> + Pid ! {timeout, Timeout}, + ok. + + + +%%%----------------------------------------------------------------- +%%% Internal exports +%%%----------------------------------------------------------------- + +init(Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) -> + ?d("init -> entry with" + "~n Parent: ~p" + "~n CH: ~p" + "~n To: ~p" + "~n MaxSzReqs: ~p" + "~n MaxNoReqs: ~p" + "~n MaxNoAcks: ~p", [Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]), + process_flag(trap_exit, true), + proc_lib:init_ack(Parent, {ok, self()}), + S = #state{parent = Parent, + conn_handle = CH, + timeout = To, + req_maxsize = MaxSzReqs, + req_maxcount = MaxNoReqs, + ack_maxcount = MaxNoAcks}, + loop(S, To). + + +%%%----------------------------------------------------------------- +%%% Internal functions +%%%----------------------------------------------------------------- +%% idle (= empty) +loop(#state{reqs = [], acks = [], timeout = Timeout} = S, _) -> + receive + {send_ack, Serial} -> + ?d("loop(empty) -> received send_ack [~w] request", [Serial]), + loop(S#state{acks = [Serial]}, Timeout); + + {send_ack_now, Serial} -> + ?d("loop(empty) -> received send_ack_now [~w] request", [Serial]), + send_msg(S#state.conn_handle, [], [Serial]), + loop(S, Timeout); + + {send_req, Tid, Req} when size(Req) >= S#state.req_maxsize -> + ?d("loop(empty) -> received (big) send_req request ~w", [Tid]), + send_msg(S#state.conn_handle, [{Tid, Req}], []), + loop(S, Timeout); + + {send_req, Tid, Req} -> + ?d("loop(empty) -> received send_req request ~w", [Tid]), + loop(S#state{req_sz = size(Req), reqs = [{Tid,Req}]}, Timeout); + + {send_reqs, Tids, Reqs} -> + ?d("loop(empty) -> received send_reqs request: ~w", [Tids]), + {NewS, _} = handle_send_reqs(Tids, Reqs, S), + loop(NewS, Timeout); + + {send_pending, Serial} -> + ?d("loop(empty) -> received send_pending [~w] request", [Serial]), + handle_send_result( + send_pending(S#state.conn_handle, Serial, [], []) + ), + loop(S, Timeout); + + {send_reply, Reply} -> + ?d("loop(empty) -> received send_reply request", []), + #state{conn_handle = CH, req_maxsize = MaxSz} = S, + handle_send_result( send_reply(CH, Reply, MaxSz, 0, [], []) ), + loop(S, Timeout); + + {upgrade, CH} -> + ?d("loop(empty) -> received upgrade request:" + "~n CH: ~p", [CH]), + loop(S#state{conn_handle = CH}, Timeout); + + {ack_maxcount, NewMax} -> + ?d("loop(empty) -> received ack_maxcount request", []), + loop(S#state{ack_maxcount = NewMax}, Timeout); + + {req_maxcount, NewMax} -> + ?d("loop(empty) -> received req_maxcount request", []), + loop(S#state{req_maxcount = NewMax}, Timeout); + + {req_maxsize, NewMax} -> + ?d("loop(empty) -> received req_maxsize request", []), + loop(S#state{req_maxsize = NewMax}, Timeout); + + {timeout, NewTimeout} -> + ?d("loop(empty) -> received timeout request", []), + loop(S#state{timeout = NewTimeout}, NewTimeout); + + stop -> + ?d("loop(empty) -> received stop request", []), + exit(normal); + + {system, From, Msg} -> + ?d("loop(empty) -> received system message:" + "~n From: ~p" + "~n Msg: ~p", [From, Msg]), + Parent = S#state.parent, + sys:handle_system_msg(Msg, From, Parent, + ?MODULE, [], {S, Timeout}); + + {'EXIT', Parent, Reason} when S#state.parent == Parent -> + ?d("loop(empty) -> received upgrade request", []), + exit(Reason); + + M -> + warning_msg("received unexpected message (ignoring): " + "~n~p", [M]), + loop(S, Timeout) + + end; + +%% active (= some acks or reqs waiting to to be sent) +loop(#state{reqs = Reqs, acks = Acks, ack_maxcount = MaxAcks, + timeout = Timeout} = S, To) + when To >= 0 -> + Start = t(), + receive + {send_ack, Serial} when length(Acks) + 1 >= MaxAcks -> + ?d("loop(active,~w) -> " + "received [~w] send_ack [~w] request", + [To, length(Acks), Serial]), + handle_send_result( + send_msg(S#state.conn_handle, Reqs, [Serial|Acks]) + ), + loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); + + {send_ack, Serial} -> + ?d("loop(active,~w) -> received send_ack [~w] request", + [To, Serial]), + loop(S#state{acks = [Serial|Acks]}, to(To, Start)); + + {send_ack_now, Serial} -> + ?d("loop(active,~w) -> [~w,~w] " + "received send_ack_now [~w] request", + [To, length(Reqs), length(Acks), Serial]), + handle_send_result( + send_msg(S#state.conn_handle, Reqs, [Serial|Acks]) + ), + loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); + + %% We need to check that this is not a resend!! + %% In that case, send whatever we have in store + {send_req, Tid, Req} -> + ?d("loop(active,~w) -> received send_req request ~w", [To,Tid]), + {NewS, NewT} = + case handle_send_req(Tid, Req, S) of + {S1, true} -> + {S1, Timeout}; + {S1, false} -> + {S1, to(To, Start)} + end, + loop(NewS, NewT); + + {send_reqs, Tids, NewReqs} -> + ?d("loop(active,~w) -> received send_reqs request ~w", [To,Tids]), + {NewS, NewT} = + case handle_send_reqs(Tids, NewReqs, S) of + {S1, true} -> + {S1, Timeout}; + {S1, false} -> + {S1, to(To, Start)} + end, + loop(NewS, NewT); + + {send_pending, Serial} -> + ?d("loop(active,~w) -> received send_pending [~w] request", + [To, Serial]), + handle_send_result( + send_pending(S#state.conn_handle, Serial, Reqs, Acks) + ), + loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); + + {send_reply, Reply} -> + ?d("loop(active,~w) -> received send_reply request", [To]), + #state{conn_handle = CH, req_maxsize = MaxSz, req_sz = ReqSz} = S, + handle_send_result( + send_reply(CH, Reply, MaxSz, ReqSz, Reqs, Acks) + ), + loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); + + {upgrade, CH} -> + ?d("loop(active,~w) -> received upgrade request", [To]), + loop(S#state{conn_handle = CH}, to(To, Start)); + + {req_maxsize, NewMax} -> + ?d("loop(active,~w) -> received req_maxsize request", [To]), + loop(S#state{req_maxsize = NewMax}, to(To, Start)); + + {req_maxcount, NewMax} -> + ?d("loop(active,~w) -> received req_maxcount request", [To]), + loop(S#state{req_maxcount = NewMax}, to(To, Start)); + + {ack_maxcount, NewMax} -> + ?d("loop(active,~w) -> received ack_maxcount request", [To]), + loop(S#state{ack_maxcount = NewMax}, to(To, Start)); + + {timeout, NewTimeout} when NewTimeout > Timeout -> + ?d("loop(active,~w) -> received timeout request: ~w", + [To, NewTimeout]), + %% We need to recalculate To + NewTo = NewTimeout - (Timeout - to(To, Start)), + loop(S#state{timeout = NewTimeout}, NewTo); + + {timeout, NewTimeout} -> + ?d("loop(active,~w) -> received timeout request: ~w", + [To, NewTimeout]), + %% We need to recalculate To + NewTo = to(To, Start) - (Timeout - NewTimeout), + loop(S#state{timeout = NewTimeout}, NewTo); + + stop -> + ?d("loop(active,~w) -> received stop request", [To]), + handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), + exit(normal); + + {system, From, Msg} -> + ?d("loop(active,~w) -> received system message:" + "~n From: ~p" + "~n Msg: ~p", [To, From, Msg]), + Parent = S#state.parent, + sys:handle_system_msg(Msg, From, Parent, + ?MODULE, [], {S, to(To, Start)}); + + {'EXIT', Parent, Reason} when S#state.parent == Parent -> + ?d("loop(active,~w) -> received exit request", [To]), + exit(Reason); + + M -> + warning_msg("received unexpected message (ignoring): " + "~n~p", [M]), + loop(S, to(To, Start)) + + after To -> + ?d("loop(active,~w) -> timeout - time to send", [To]), + handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), + loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout) + end; + +loop(#state{reqs = Reqs, acks = Acks, timeout = Timeout} = S, _To) -> + ?d("loop(active) -> timeout [~w, ~w]", [length(Reqs),length(Acks)]), + handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), + loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout). + + +%%%----------------------------------------------------------------- + +%% The request is itself larger then the max size, so first send +%% everything we have stored in one message, and then the new request +%% in another. +%% Note that it does not matter if we with this request +%% passed the maxcount limit. +%% Note that this message cannot be a re-sent, since +%% such a request would have been stored, but sent immediatly. +handle_send_req(Tid, Req, + #state{conn_handle = CH, + req_maxsize = MaxSz, reqs = Reqs, acks = Acks} = S) + when size(Req) >= MaxSz -> + ?d("handle_send_req -> request bigger then maxsize ~w", [MaxSz]), + handle_send_result( send_msg(CH, Reqs, Acks) ), + handle_send_result( send_msg(CH, [{Tid, Req}], []) ), + {S#state{req_sz = 0, reqs = [], acks = []}, true}; + +%% And handle all the other cases +handle_send_req(Tid, Req, + #state{conn_handle = CH, req_sz = ReqSz, + req_maxcount = MaxReqs, req_maxsize = MaxSz, + reqs = Reqs, acks = Acks} = S) -> + case lists:keymember(Tid, 1, Reqs) of + true -> + %% A re-send, time to send whatever we have in the store + ?d("handle_send_req -> was a re-send, so flush",[]), + handle_send_result( send_msg(CH, Reqs, Acks) ), + {S#state{req_sz = 0, reqs = [], acks = []}, true}; + + false when length(Reqs) + 1 >= MaxReqs -> + %% We finally passed the req-maxcount limit + ?d("handle_send_req -> maxcount ~w passed", [MaxReqs]), + handle_send_result( + send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks) + ), + {S#state{req_sz = 0, reqs = [], acks = []}, true}; + + false when size(Req) + ReqSz >= MaxSz -> + %% We finally passed the req-maxsize limit + ?d("handle_send_req -> maxsize ~w passed", [MaxSz]), + handle_send_result( + send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks) + ), + {S#state{req_sz = 0, reqs = [], acks = []}, true}; + + false -> + %% Still not time to send + ?d("handle_send_req -> nothing to be sent",[]), + {S#state{req_sz = ReqSz + size(Req), reqs = [{Tid, Req}|Reqs]}, + false} + end. + + +%% We passed the req-maxcount limit: Time to send, atleast some of +%% the stuff... +handle_send_reqs(Tids, Reqs0, + #state{conn_handle = CH, + req_maxsize = MaxSz, req_sz = ReqSz, + req_maxcount = MaxReqs, reqs = Reqs, acks = Acks} = S) + when length(Reqs0) + length(Reqs) >= MaxReqs -> + ?d("handle_send_reqs -> maxcount ~w: ~w, ~w", + [MaxSz,length(Reqs0),length(Reqs)]), + Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []), + {NewReqs, NewReqSz} = send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz), + ?d("handle_send_reqs -> sent:" + "~n NewReqSz: ~w" + "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), + {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true}; + +%% We did not pass the req-maxcount limit, but we could have passed the +%% req-maxsize limit, so maybe send... +handle_send_reqs(Tids, Reqs0, #state{conn_handle = CH, + req_maxsize = MaxSz, req_sz = ReqSz, + reqs = Reqs, acks = Acks} = S) -> + ?d("handle_send_reqs -> not maxcount - maybe maxsize (~w)", [MaxSz]), + Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []), + + case maybe_send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz, false) of + {NewReqs, NewReqSz, true} -> + ?d("handle_send_reqs -> sent:" + "~n NewReqSz: ~w" + "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), + {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true}; + {NewReqs, NewReqSz, false} -> + ?d("handle_send_reqs -> not sent:" + "~n NewReqSz: ~w" + "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), + {S#state{req_sz = NewReqSz, reqs = NewReqs}, false} + end. + +merge_tids_and_reqs([], [], Reqs) -> + Reqs; +merge_tids_and_reqs([Tid|Tids], [Req|Reqs], Acc) -> + merge_tids_and_reqs(Tids, Reqs, [{Tid,Req}|Acc]). + +%% We know that we shall send, so if maybe_send_reqs does not, +%% we send it our self... +send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz) -> + ?d("send_reqs -> entry when" + "~n length(Reqs): ~w" + "~n Acks: ~w" + "~n length(Acc): ~w" + "~n AccSz: ~w", [length(Reqs), Acks, length(Acc), AccSz]), + case maybe_send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz, false) of + {NewReqs, _NewReqSz, false} -> + ?d("send_reqs -> nothing sent yet" + "~n length(NewReqs): ~w", [length(NewReqs)]), + handle_send_result( send_msg(CH, NewReqs, Acks) ), + {[], 0}; + {NewReqs, NewReqSz, true} -> + ?d("send_reqs -> something sent" + "~n length(NewReqs): ~w" + "~n NewReqSz: ~w", [length(NewReqs), NewReqSz]), + {NewReqs, NewReqSz} + end. + + +maybe_send_reqs(_CH, [], _Acks, Acc, AccSz, _MaxSz, Sent) -> + ?d("maybe_send_reqs -> done when" + "~n Sent: ~w" + "~n AccSz: ~w" + "~n length(Acc): ~w", [Sent, AccSz, length(Acc)]), + {Acc, AccSz, Sent}; +maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, _AccSz, MaxSz, _Sent) + when size(Req) >= MaxSz -> + %% The request was above the maxsize limit, so first send + %% what's in store and the the big request. + ?d("maybe_send_reqs -> entry when request [~w] size (~w) > max size" + "~n Acks: ~w" + "~n length(Acc): ~w", [Tid, size(Req), Acks, length(Acc)]), + handle_send_result( send_msg(CH, Acc, Acks) ), + handle_send_result( send_msg(CH, [{Tid, Req}], []) ), + maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true); +maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, _Sent) + when AccSz + size(Req) >= MaxSz -> + %% We _did_ pass the maxsize limit with this request, so send + ?d("maybe_send_reqs -> entry when sum of requests (~w) > max size" + "~n Tid: ~w" + "~n Acks: ~w" + "~n length(Acc): ~w", [Tid, size(Req) + AccSz, Acks, length(Acc)]), + handle_send_result( send_msg(CH, [{Tid, Req}|Acc], Acks) ), + maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true); +maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, Sent) -> + ?d("maybe_send_reqs -> entry when" + "~n Tid: ~w" + "~n size(Req): ~w" + "~n Acks: ~w" + "~n length(Acc): ~w" + "~n AccSz: ~w", [Tid, size(Req), Acks, length(Acc), AccSz]), + NewAcc = [{Tid,Req}|Acc], + NewAccSz = AccSz + size(Req), + maybe_send_reqs(CH, Reqs, Acks, NewAcc, NewAccSz, MaxSz, Sent). + + +%%%----------------------------------------------------------------- + +send_pending(CH, Serial, Reqs, Acks) -> + ?d("send_pending -> entry with" + "~n Serial: ~w" + "~n length(Reqs): ~w" + "~n length(Acks): ~w", [Serial, length(Reqs), length(Acks)]), + case megaco_config:lookup_local_conn(CH) of + [CD] -> + TP = #'TransactionPending'{transactionId = Serial}, + Pend = {transactionPending, TP}, + do_send_msg(CD, Pend, lists:reverse(Reqs), Acks); + [] -> + ok + end. + + +%% We need to check the size of the reply. If the reply itself is +%% larger then the max limit, then it is sent in a separate message. +send_reply(CH, Reply, MaxSz, _ReqSz, Reqs, Acks) -> + ?d("send_reply -> entry with" + "~n length(Reqs): ~w" + "~n length(Acks): ~w", [length(Reqs), length(Acks)]), + case megaco_config:lookup_local_conn(CH) of + [CD] when size(Reply) > MaxSz -> + handle_send_result( send_msg(CD, lists:reverse(Reqs), Acks) ), + Rep = {transactionReply, Reply}, + do_send_msg(CD, Rep, [], []); + [CD] -> + Rep = {transactionReply, Reply}, + do_send_msg(CD, Rep, lists:reverse(Reqs), Acks); + [] -> + ok + end. + +do_send_msg(CD, Trans, [], []) -> + Body = {transactions, [Trans]}, + Slogan = "send trans reply/pending", + ?d("do_send_msg -> ~s", [Slogan]), + megaco_messenger_misc:send_body(CD, Slogan, Body); +do_send_msg(CD, Trans, Reqs0, []) -> + Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], + Body = {transactions, [Trans|Reqs]}, + Slogan = "send trans reply/pending and reqs", + ?d("do_send_msg -> ~s", [Slogan]), + megaco_messenger_misc:send_body(CD, Slogan, Body); +do_send_msg(CD, Trans, [], SerialRanges) -> + Acks = make_acks(ranges(SerialRanges), []), + Body = {transactions, [Trans, {transactionResponseAck, Acks}]}, + Slogan = "send trans reply/pending and acks", + ?d("do_send_msg -> ~s", [Slogan]), + megaco_messenger_misc:send_body(CD, Slogan, Body); +do_send_msg(CD, Trans, Reqs0, SerialRanges) -> + Acks = make_acks(ranges(SerialRanges), []), + Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], + Body = {transactions, [Trans, {transactionResponseAck, Acks}|Reqs]}, + Slogan = "send trans reply/pending, reqs and acks", + ?d("do_send_msg -> ~s", [Slogan]), + megaco_messenger_misc:send_body(CD, Slogan, Body). + + + +send_msg(_, [], []) -> + ok; +send_msg(CH, Reqs, Serials) -> + case megaco_config:lookup_local_conn(CH) of + [ConnData] -> + do_send_msg(ConnData, lists:reverse(Reqs), Serials); + [] -> + ok + end. + + +do_send_msg(CD, Reqs0, []) -> + ?d("do_send_msg -> entry with" + "~n length(Reqs0): ~p", [length(Reqs0)]), + Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], + %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), + Body = {transactions, Reqs}, + megaco_messenger_misc:send_body(CD, "send trans reqs", Body); +do_send_msg(CD, [], SerialRanges) -> + ?d("do_send_msg -> entry with" + "~n SerialRanges: ~p", [SerialRanges]), + Acks = make_acks(ranges(SerialRanges), []), + %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), + Body = {transactions, [{transactionResponseAck, Acks}]}, + megaco_messenger_misc:send_body(CD, "send trans acks", Body); +do_send_msg(CD, Reqs0, SerialRanges) -> + ?d("do_send_msg -> entry with" + "~n length(Reqs0): ~p" + "~n SerialRanges: ~p", [length(Reqs0), SerialRanges]), + Acks = make_acks(ranges(SerialRanges), []), + Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], + %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), + Body = {transactions, [{transactionResponseAck, Acks}|Reqs]}, + megaco_messenger_misc:send_body(CD, "send trans reqs and acks", Body). + + +handle_send_result(ok) -> + ok; +handle_send_result({ok, _}) -> + ok; +handle_send_result({error, {send_message_cancelled, _Reason}}) -> + ok; +handle_send_result({error, {send_message_failed, Reason}}) -> + error_msg("Failed sending message: ~n ~p", [Reason]), + error; +handle_send_result(Error) -> + error_msg("Failed sending message: ~n ~p", [Error]), + error. + + +ranges(L) -> + lists:reverse(ranges(lists:sort(L), [], [])). + +ranges([], Range, Ranges) -> + ranges2(Range, Ranges); +ranges([S1|Sn], [S2|_] = Range, Ranges) when S1 == (S2+1) -> + ranges(Sn, [S1|Range], Ranges); +ranges([S|Sn], Range, Ranges) -> + ranges(Sn, [S], ranges2(Range, Ranges)). + +ranges2([], Ranges) -> + Ranges; +ranges2([S], Ranges) -> + [{S,S}|Ranges]; +ranges2(Range0, Ranges) -> + Range = lists:reverse(Range0), + [{hd(Range),lists:last(Range)}|Ranges]. + + +make_acks([], Acks) -> + lists:reverse(Acks); +make_acks([{S,S}|SerialRanges], Acks) -> + TRA = #'TransactionAck'{firstAck = S}, + make_acks(SerialRanges, [TRA|Acks]); +make_acks([{F,L}|SerialRanges], Acks) -> + TRA = #'TransactionAck'{firstAck = F, lastAck = L}, + make_acks(SerialRanges, [TRA|Acks]). + + + +%%%----------------------------------------------------------------- + +to(To, Start) -> + To - (t() - Start). + +%% Time in milli seconds +t() -> + {A,B,C} = erlang:now(), + A*1000000000+B*1000+(C div 1000). + +warning_msg(F, A) -> + ?megaco_warning("Transaction sender: " ++ F, A). + +error_msg(F, A) -> + ?megaco_error("Transaction sender: " ++ F, A). + + +%%%----------------------------------------------------------------- +%%% System messages handled here +%%%----------------------------------------------------------------- + +system_continue(_Parent, _Dbg, {S,To}) -> + loop(S, To). + +system_terminate(Reason, _Parent, _Dbg, {S, _}) -> + #state{conn_handle = CH, reqs = Reqs, acks = Acks} = S, + send_msg(CH, Reqs, Acks), + exit(Reason). + +system_code_change(S, _Module, _OLdVsn, _Extra) -> + ?d("system_code_change -> entry", []), + {ok, S}. |