From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- lib/megaco/test/megaco_test_mgc.erl | 1221 +++++++++++++++++++++++++++++++++++ 1 file changed, 1221 insertions(+) create mode 100644 lib/megaco/test/megaco_test_mgc.erl (limited to 'lib/megaco/test/megaco_test_mgc.erl') diff --git a/lib/megaco/test/megaco_test_mgc.erl b/lib/megaco/test/megaco_test_mgc.erl new file mode 100644 index 0000000000..05c482f1af --- /dev/null +++ b/lib/megaco/test/megaco_test_mgc.erl @@ -0,0 +1,1221 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2003-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%%---------------------------------------------------------------------- +%% Purpose: Implements an "MGC" used by the test suite +%%---------------------------------------------------------------------- +-module(megaco_test_mgc). + +-export([start/4, start/5, stop/1, + get_stats/2, reset_stats/1, + user_info/1, user_info/2, conn_info/1, conn_info/2, + update_user_info/3, update_conn_info/3, + request_ignore/1, + request_discard/1, request_discard/2, + request_pending/1, request_pending/2, request_pending_ignore/1, + request_handle/1, request_handle/2, + request_handle_pending/1, request_handle_pending/2, + request_handle_sloppy/1, request_handle_sloppy/2, + ack_info/2, abort_info/2, req_info/2, + disconnect/2, + verbosity/2]). +-export([mgc/3]). + +%% Megaco callback api +-export([ + handle_connect/3, + handle_disconnect/4, + handle_syntax_error/4, + handle_message_error/4, + handle_trans_request/4, + handle_trans_long_request/4, + handle_trans_reply/5, + handle_trans_ack/5, + handle_unexpected_trans/4, + handle_trans_request_abort/5 + ]). + +-include("megaco_test_lib.hrl"). +-include_lib("megaco/include/megaco.hrl"). +-include_lib("megaco/include/megaco_message_v1.hrl"). + +-define(A4444, ["11111111", "00000000", "00000000"]). +-define(A4445, ["11111111", "00000000", "11111111"]). +-define(A5555, ["11111111", "11111111", "00000000"]). +-define(A5556, ["11111111", "11111111", "11111111"]). + +-define(valid_actions, + [ignore, pending, pending_ignore, discard_ack, handle_ack, handle_pending_ack, handle_sloppy_ack]). + +-record(mgc, {parent = undefined, + tcp_sup = undefined, + udp_sup = undefined, + req_action = discard_ack, + req_timeout = 0, + mid = undefined, + ack_info = undefined, + abort_info = undefined, + req_info = undefined, + mg = [], + dsi_timer}). + + +%%% ------------------------------------------------------------------ + +start(Node, Mid, ET, Verbosity) -> + %% Conf = [{megaco_trace, io}], + %% Conf = [{megaco_trace, "megaco-mgc.trace"}], + Conf = [{megaco_trace, false}], + start(Node, Mid, ET, Conf, Verbosity). + +start(Node, Mid, ET, Conf, Verbosity) -> + d("start mgc[~p]: ~p", [Node, Mid]), + RI = {receive_info, mk_recv_info(ET)}, + Config = [{local_mid, Mid}, RI] ++ Conf, + Pid = spawn_link(Node, ?MODULE, mgc, [self(), Verbosity, Config]), + await_started(Pid). + +mk_recv_info(ET) -> + mk_recv_info(ET, []). + +mk_recv_info([], Acc) -> + Acc; +mk_recv_info([{Encoding, Transport}|ET], Acc) + when is_atom(Encoding) andalso is_atom(Transport) -> + {EMod, Port} = select_encoding(Encoding), + TMod = select_transport(Transport), + RI = [{encoding_module, EMod}, + {encoding_config, []}, + {transport_module, TMod}, + {port, Port}], + mk_recv_info(ET, [RI|Acc]); +mk_recv_info([{Encoding, Transport, TO}|ET], Acc) + when is_atom(Encoding) andalso is_atom(Transport) andalso is_list(TO) -> + {EMod, Port} = select_encoding(Encoding), + TMod = select_transport(Transport), + RI = [{encoding_module, EMod}, + {encoding_config, []}, + {transport_module, TMod}, + {port, Port}, + {transport_opts, TO}], + mk_recv_info(ET, [RI|Acc]); +mk_recv_info([{Encoding, EC, Transport}|ET], Acc) + when is_atom(Encoding) andalso is_list(EC) andalso is_atom(Transport) -> + {EMod, Port} = select_encoding(Encoding), + TMod = select_transport(Transport), + RI = [{encoding_module, EMod}, + {encoding_config, EC}, + {transport_module, TMod}, + {port, Port}], + mk_recv_info(ET, [RI|Acc]); +mk_recv_info([ET|_], _) -> + throw({error, {invalid_encoding_transport, ET}}). + +select_encoding(text) -> + {megaco_pretty_text_encoder, 2944}; +select_encoding(pretty_text) -> + {megaco_pretty_text_encoder, 2944}; +select_encoding(compact_text) -> + {megaco_compact_text_encoder, 2944}; +select_encoding(binary) -> + {megaco_ber_bin_encoder, 2945}; +select_encoding(erl_dist) -> + {megaco_erl_dist_encoder, 2946}; +select_encoding(Encoding) -> + throw({error, {invalid_encoding, Encoding}}). + +select_transport(tcp) -> + megaco_tcp; +select_transport(udp) -> + megaco_udp; +select_transport(Transport) -> + throw({error, {invalid_transport, Transport}}). + + +await_started(Pid) -> + receive + {started, Pid} -> + d("await_started ~p: ok", [Pid]), + {ok, Pid}; + {'EXIT', Pid, + {failed_starting_tcp_listen, {could_not_start_listener, {gen_tcp_listen, eaddrinuse}}}} -> + i("await_started ~p: address already in use", [Pid]), + ?SKIP(eaddrinuse); + {'EXIT', Pid, Reason} -> + i("await_started ~p: received exit signal: ~p", [Pid, Reason]), + exit({failed_starting, Pid, Reason}) + after 10000 -> + i("await_started ~p: timeout", [Pid]), + exit({error, timeout}) + end. + + +stop(Pid) -> + server_request(Pid, stop, stopped). + +get_stats(Pid, No) -> + server_request(Pid, {statistics, No}, {statistics_reply, No}). + +reset_stats(Pid) -> + server_request(Pid, reset_stats, reset_stats_ack). + +user_info(Pid) -> + server_request(Pid, {user_info, all}, user_info_ack). + +user_info(Pid, Tag) -> + server_request(Pid, {user_info, Tag}, user_info_ack). + +conn_info(Pid) -> + server_request(Pid, {conn_info, all}, conn_info_ack). + +conn_info(Pid, Tag) -> + server_request(Pid, {conn_info, Tag}, conn_info_ack). + +update_user_info(Pid, Tag, Val) -> + server_request(Pid, {update_user_info, Tag, Val}, update_user_info_ack). + +update_conn_info(Pid, Tag, Val) -> + server_request(Pid, {update_conn_info, Tag, Val}, update_conn_info_ack). + +disconnect(Pid, Reason) -> + server_request(Pid, {disconnect, Reason}, disconnected). + +ack_info(Pid, InfoPid) -> + Pid ! {ack_info, InfoPid, self()}. + +abort_info(Pid, InfoPid) -> + Pid ! {abort_info, InfoPid, self()}. + +req_info(Pid, InfoPid) -> + Pid ! {req_info, InfoPid, self()}. + +verbosity(Pid, V) -> + Pid ! {verbosity, V, self()}. + +request_ignore(Pid) -> + request_action(Pid, {ignore, infinity}). + +request_pending_ignore(Pid) -> + request_action(Pid, {pending_ignore, infinity}). + +request_discard(Pid) -> + request_discard(Pid,0). + +request_discard(Pid, To) -> + request_action(Pid, {discard_ack, To}). + +request_pending(Pid) -> + request_pending(Pid, 5000). + +request_pending(Pid, To) -> + request_action(Pid, {pending, To}). + +request_handle(Pid) -> + request_handle(Pid, 0). + +request_handle(Pid, To) -> + request_action(Pid, {handle_ack, To}). + +request_handle_pending(Pid) -> + request_handle_pending(Pid, 0). + +request_handle_pending(Pid, To) -> + request_action(Pid, {handle_pending_ack, To}). + +request_handle_sloppy(Pid) -> + request_handle_sloppy(Pid, 0). + +request_handle_sloppy(Pid, To) -> + request_action(Pid, {handle_sloppy_ack, To}). + +request_action(Pid, Action) -> + server_request(Pid, request_action, Action, request_action_ack). + + +server_request(Pid, Req, ReplyTag) -> + Pid ! {Req, self()}, + receive + {ReplyTag, Reply, Pid} -> + Reply; + {'EXIT', Pid, Reason} -> + exit({failed, Req, Pid, Reason}) + after 10000 -> + exit({timeout, Req, Pid}) + end. + +server_request(Pid, Req, ReqData, ReplyTag) -> + Pid ! {Req, ReqData, self()}, + receive + {ReplyTag, Reply, Pid} -> + Reply; + {'EXIT', Pid, Reason} -> + exit({failed, Req, Pid, Reason}) + after 10000 -> + exit({timeout, Req, Pid}) + end. + + +server_reply(Pid, ReplyTag, Reply) -> + Pid ! {ReplyTag, Reply, self()}. + + +%%% ------------------------------------------------------------------ + + +mgc(Parent, Verbosity, Config) -> + process_flag(trap_exit, true), + put(verbosity, Verbosity), + put(sname, "MGC"), + i("mgc -> starting"), + case (catch init(Config)) of + {error, Reason} -> + exit(Reason); + {Mid, TcpSup, UdpSup, DSITimer} -> + notify_started(Parent), + S = #mgc{parent = Parent, + tcp_sup = TcpSup, + udp_sup = UdpSup, + mid = Mid, + dsi_timer = DSITimer}, + i("mgc -> started"), + display_system_info("at start "), + loop(S) + end. + +init(Config) -> + d("init -> entry"), + random_init(), + Mid = get_conf(local_mid, Config), + RI = get_conf(receive_info, Config), + + d("init -> maybe start the display system info timer"), + DSITimer = + case get_conf(display_system_info, Config, undefined) of + Time when is_integer(Time) -> + d("init -> creating display system info timer"), + create_timer(Time, display_system_info); + _ -> + undefined + end, + Conf0 = lists:keydelete(display_system_info, 1, Config), + + d("init -> start megaco"), + application:start(megaco), + + d("init -> possibly enable megaco trace"), + case lists:keysearch(megaco_trace, 1, Config) of + {value, {megaco_trace, true}} -> + megaco:enable_trace(max, io); + {value, {megaco_trace, io}} -> + megaco:enable_trace(max, io); + {value, {megaco_trace, File}} when is_list(File) -> + megaco:enable_trace(max, File); + _ -> + ok + end, + Conf1 = lists:keydelete(megaco_trace, 1, Conf0), + + d("init -> start megaco user"), + Conf2 = lists:keydelete(local_mid, 1, Conf1), + Conf3 = lists:keydelete(receive_info, 1, Conf2), + ok = megaco:start_user(Mid, Conf3), + + d("init -> update user info (user_mod)"), + ok = megaco:update_user_info(Mid, user_mod, ?MODULE), + + d("init -> update user info (user_args)"), + ok = megaco:update_user_info(Mid, user_args, [self()]), + + d("init -> get user info (receive_handle)"), + RH = megaco:user_info(Mid,receive_handle), + d("init -> parse receive info"), + Transports = parse_receive_info(RI, RH), + + d("init -> start transports"), + {Tcp, Udp} = start_transports(Transports), + {Mid, Tcp, Udp, DSITimer}. + +loop(S) -> + d("loop -> await request"), + receive + {display_system_info, Time} -> + display_system_info(S#mgc.mid), + NewTimer = create_timer(Time, display_system_info), + loop(S#mgc{dsi_timer = NewTimer}); + + {stop, Parent} when S#mgc.parent =:= Parent -> + i("loop -> stopping", []), + display_system_info(S#mgc.mid, "at finish "), + cancel_timer(S#mgc.dsi_timer), + Mid = S#mgc.mid, + (catch close_conns(Mid)), + megaco:stop_user(Mid), + application:stop(megaco), + i("loop -> stopped", []), + server_reply(Parent, stopped, ok), + exit(normal); + + {{disconnect, Reason}, Parent} when S#mgc.parent == Parent -> + i("loop -> disconnecting", []), + Mid = S#mgc.mid, + [Conn|_] = megaco:user_info(Mid, connections), + Res = megaco:disconnect(Conn, {self(), Reason}), + server_reply(Parent, disconnected, Res), + loop(S); + + {{update_user_info, Tag, Val}, Parent} when S#mgc.parent == Parent -> + i("loop -> got update_user_info: ~w -> ~p", [Tag, Val]), + Res = (catch megaco:update_user_info(S#mgc.mid, Tag, Val)), + d("loop -> Res: ~p", [Res]), + server_reply(Parent, update_user_info_ack, Res), + loop(S); + + {{user_info, Tag}, Parent} when S#mgc.parent == Parent -> + i("loop -> got user_info request for ~w", [Tag]), + Res = (catch megaco:user_info(S#mgc.mid, Tag)), + d("loop -> Res: ~p", [Res]), + server_reply(Parent, user_info_ack, Res), + loop(S); + + {{update_conn_info, Tag, Val}, Parent} when S#mgc.parent == Parent -> + i("loop -> got update_conn_info: ~w -> ~p", [Tag, Val]), + Conns = megaco:user_info(S#mgc.mid, connections), + Fun = fun(CH) -> + (catch megaco:update_conn_info(CH, Tag, Val)) + end, + Res = lists:map(Fun, Conns), + d("loop -> Res: ~p", [Res]), + server_reply(Parent, update_conn_info_ack, Res), + loop(S); + + {{conn_info, Tag}, Parent} when S#mgc.parent == Parent -> + i("loop -> got conn_info request for ~w", [Tag]), + Conns = megaco:user_info(S#mgc.mid, connections), + Fun = fun(CH) -> + {CH, (catch megaco:conn_info(CH, Tag))} + end, + Res = lists:map(Fun, Conns), + d("loop -> Res: ~p", [Res]), + server_reply(Parent, conn_info_ack, Res), + loop(S); + + + %% + {request_action, {Action, To}, Parent} when S#mgc.parent == Parent -> + i("loop -> got new request_action: ~p:~w", [Action,To]), + {Reply, S1} = + case lists:member(Action, ?valid_actions) of + true when To >= 0; To == infinity -> + {{ok, S#mgc.req_action}, + S#mgc{req_action = Action, req_timeout = To}}; + true -> + {{error, {invalid_action_timeout, To}}, S}; + false -> + {{error, {invalid_action, Action}}, S} + end, + server_reply(Parent, request_action_ack, Reply), + loop(S1); + + + %% Reset stats + {reset_stats, Parent} when S#mgc.parent == Parent -> + i("loop -> got request to reset stats counters"), + do_reset_stats(S#mgc.mid), + server_reply(Parent, reset_stats_ack, ok), + loop(S); + + + %% Give me statistics + {{statistics, 1}, Parent} when S#mgc.parent == Parent -> + i("loop -> got request for statistics 1"), + {ok, Gen} = megaco:get_stats(), + GetTrans = + fun(CH) -> + Reason = {statistics, CH}, + Pid = megaco:conn_info(CH, control_pid), + SendMod = megaco:conn_info(CH, send_mod), + SendHandle = megaco:conn_info(CH, send_handle), + {ok, Stats} = + case SendMod of + megaco_tcp -> megaco_tcp:get_stats(SendHandle); + megaco_udp -> megaco_udp:get_stats(SendHandle); + SendMod -> exit(Pid, Reason) + end, + {SendHandle, Stats} + end, + Mid = S#mgc.mid, + Trans = + lists:map(GetTrans, megaco:user_info(Mid, connections)), + Reply = {ok, [{gen, Gen}, {trans, Trans}]}, + server_reply(Parent, {statistics_reply, 1}, Reply), + loop(S); + + + {{statistics, 2}, Parent} when S#mgc.parent == Parent -> + i("loop -> got request for statistics 2"), + {ok, Gen} = megaco:get_stats(), + #mgc{tcp_sup = TcpSup, udp_sup = UdpSup} = S, + TcpStats = get_trans_stats(TcpSup, megaco_tcp), + UdpStats = get_trans_stats(UdpSup, megaco_udp), + Reply = {ok, [{gen, Gen}, {trans, [TcpStats, UdpStats]}]}, + server_reply(Parent, {statistics_reply, 2}, Reply), + loop(S); + + + %% Megaco callback messages + {request, Request, From} -> + d("loop -> received megaco request from ~p:~n~p", + [From, Request]), + {Reply, S1} = handle_megaco_request(Request, S), + d("loop -> send request reply: ~n~p", [Reply]), + reply(From, Reply), + loop(S1); + + + {ack_info, To, Parent} when S#mgc.parent == Parent -> + i("loop -> received request to inform about received ack's ", []), + loop(S#mgc{ack_info = To}); + + + {abort_info, To, Parent} when S#mgc.parent == Parent -> + i("loop -> received request to inform about received aborts ", []), + loop(S#mgc{abort_info = To}); + + + {req_info, To, Parent} when S#mgc.parent == Parent -> + i("loop -> received request to inform about received req's ", []), + loop(S#mgc{req_info = To}); + + + {verbosity, V, Parent} when S#mgc.parent == Parent -> + i("loop -> received new verbosity: ~p", [V]), + put(verbosity,V), + loop(S); + + + {'EXIT', Pid, Reason} when S#mgc.tcp_sup =:= Pid -> + error_msg("MGC received unexpected exit " + "from TCP transport supervisor (~p):~n~p", + [Pid, Reason]), + i("loop -> [tcp] exiting", []), + display_system_info(S#mgc.mid, "at bad finish (tcp) "), + cancel_timer(S#mgc.dsi_timer), + Mid = S#mgc.mid, + (catch close_conns(Mid)), + megaco:stop_user(Mid), + application:stop(megaco), + i("loop -> stopped", []), + StopReason = {error, {tcp_terminated, Pid, Reason}}, + server_reply(S#mgc.parent, stopped, StopReason), + exit(StopReason); + + + {'EXIT', Pid, Reason} when S#mgc.udp_sup =:= Pid -> + error_msg("MGC received unexpected exit " + "from UDP transport supervisor (~p):~n~p", + [Pid, Reason]), + i("loop -> [udp] exiting", []), + display_system_info(S#mgc.mid, "at bad finish (udp) "), + cancel_timer(S#mgc.dsi_timer), + Mid = S#mgc.mid, + (catch close_conns(Mid)), + megaco:stop_user(Mid), + application:stop(megaco), + i("loop -> stopped", []), + StopReason = {error, {udp_terminated, Pid, Reason}}, + server_reply(S#mgc.parent, stopped, StopReason), + exit(StopReason); + + + Invalid -> + i("loop -> received invalid request: ~p", [Invalid]), + loop(S) + end. + + +do_reset_stats(Mid) -> + megaco:reset_stats(), + do_reset_trans_stats(megaco:user_info(Mid, connections), []). + +do_reset_trans_stats([], _Reset) -> + ok; +do_reset_trans_stats([CH|CHs], Reset) -> + SendMod = megaco:conn_info(CH, send_mod), + case lists:member(SendMod, Reset) of + true -> + do_reset_trans_stats(CHs, Reset); + false -> + SendMod:reset_stats(), + do_reset_trans_stats(CHs, [SendMod|Reset]) + end. + + +close_conns(Mid) -> + Reason = {self(), ignore}, + Disco = fun(CH) -> + (catch do_close_conn(CH, Reason)) + end, + lists:map(Disco, megaco:user_info(Mid, connections)). + +do_close_conn(CH, Reason) -> + d("close connection to ~p", [CH#megaco_conn_handle.remote_mid]), + Pid = megaco:conn_info(CH, control_pid), + SendMod = megaco:conn_info(CH, send_mod), + SendHandle = megaco:conn_info(CH, send_handle), + megaco:disconnect(CH, Reason), + case SendMod of + megaco_tcp -> megaco_tcp:close(SendHandle); + megaco_udp -> megaco_udp:close(SendHandle); + SendMod -> exit(Pid, Reason) + end. + +get_trans_stats(P, SendMod) when is_pid(P) -> + case (catch SendMod:get_stats()) of + {ok, Stats} -> + {SendMod, Stats}; + Else -> + {SendMod, Else} + end; +get_trans_stats(_P, SendMod) -> + {SendMod, undefined}. + +parse_receive_info([], _RH) -> + throw({error, no_receive_info}); +parse_receive_info(RI, RH) -> + parse_receive_info(RI, RH, []). + +parse_receive_info([], _RH, Transports) -> + d("parse_receive_info -> done when" + "~n Transports: ~p", [Transports]), + Transports; +parse_receive_info([RI|RIs], RH, Transports) -> + d("parse_receive_info -> parse receive info"), + case (catch parse_receive_info1(RI, RH)) of + {error, Reason} -> + i("failed parsing receive info: ~p~n~p", [RI, Reason]), + exit({failed_parsing_recv_info, RI, Reason}); + RH1 -> + parse_receive_info(RIs, RH, [RH1|Transports]) + end. + +parse_receive_info1(RI, RH) -> + d("parse_receive_info1 -> get encoding module"), + EM = get_encoding_module(RI), + d("parse_receive_info1 -> get encoding config"), + EC = get_encoding_config(RI, EM), + d("parse_receive_info1 -> get transport module"), + TM = get_transport_module(RI), + d("parse_receive_info1 -> get transport port"), + TP = get_transport_port(RI), + d("parse_receive_info1 -> get transport opts"), + TO = get_transport_opts(RI), + RH1 = RH#megaco_receive_handle{send_mod = TM, + encoding_mod = EM, + encoding_config = EC}, + d("parse_receive_info1 -> " + "~n Transport Opts: ~p" + "~n Port: ~p" + "~n Receive handle: ~p", [TO, TP, RH1]), + {TO, TP, RH1}. + + + +%% -------------------------------------------------------- +%% On some platforms there seem to take some time before +%% a port is released by the OS (after having been used, +%% as is often the case in the test suites). +%% So, starting the transports is done in two steps. +%% First) Start the actual transport(s) +%% Second) Create the listener (tcp) or open the +%% send/receive port (udp). +%% The second step *may* need to be repeated! +%% -------------------------------------------------------- +start_transports([]) -> + throw({error, no_transport}); +start_transports(Transports) when is_list(Transports) -> + {Tcp, Udp} = start_transports1(Transports, undefined, undefined), + ok = start_transports2(Transports, Tcp, Udp), + {Tcp, Udp}. + +start_transports1([], Tcp, Udp) -> + {Tcp, Udp}; +start_transports1([{_TO, _Port, RH}|Transports], Tcp, Udp) + when ((RH#megaco_receive_handle.send_mod =:= megaco_tcp) andalso + (not is_pid(Tcp))) -> + case megaco_tcp:start_transport() of + {ok, Sup} -> + start_transports1(Transports, Sup, Udp); + Else -> + throw({error, {failed_starting_tcp_transport, Else}}) + end; +start_transports1([{_TO, _Port, RH}|Transports], Tcp, Udp) + when ((RH#megaco_receive_handle.send_mod =:= megaco_udp) andalso + (not is_pid(Udp))) -> + case megaco_udp:start_transport() of + {ok, Sup} -> + start_transports1(Transports, Tcp, Sup); + Else -> + throw({error, {failed_starting_udp_transport, Else}}) + end; +start_transports1([_|Transports], Tcp, Udp) -> + start_transports1(Transports, Tcp, Udp). + +start_transports2([], _, _) -> + ok; +start_transports2([{TO, Port, RH}|Transports], Tcp, Udp) + when RH#megaco_receive_handle.send_mod =:= megaco_tcp -> + start_tcp(TO, RH, Port, Tcp), + start_transports2(Transports, Tcp, Udp); +start_transports2([{TO, Port, RH}|Transports], Tcp, Udp) + when RH#megaco_receive_handle.send_mod =:= megaco_udp -> + start_udp(TO, RH, Port, Udp), + start_transports2(Transports, Tcp, Udp). + +start_tcp(TO, RH, Port, Sup) -> + d("start tcp transport"), + start_tcp(TO, RH, Port, Sup, 250). + +start_tcp(TO, RH, Port, Sup, Timeout) + when is_pid(Sup) andalso is_integer(Timeout) andalso (Timeout > 0) -> + d("tcp listen on ~p", [Port]), + Opts = [{port, Port}, + {receive_handle, RH}, + {tcp_options, [{nodelay, true}]}] ++ TO, + try_start_tcp(Sup, Opts, Timeout, noError). + +try_start_tcp(Sup, Opts, Timeout, Error0) when (Timeout < 5000) -> + Sleep = random(Timeout) + 100, + d("try create tcp listen socket (~p,~p)", [Timeout, Sleep]), + case megaco_tcp:listen(Sup, Opts) of + ok -> + d("listen socket created", []), + Sup; + Error1 when Error0 =:= noError -> % Keep the first error + d("failed creating listen socket [1]: ~p", [Error1]), + sleep(Sleep), + try_start_tcp(Sup, Opts, Timeout*2, Error1); + Error2 -> + d("failed creating listen socket [2]: ~p", [Error2]), + sleep(Sleep), + try_start_tcp(Sup, Opts, Timeout*2, Error0) + end; +try_start_tcp(Sup, _Opts, _Timeout, Error) -> + megaco_tcp:stop_transport(Sup), + case Error of + {error, Reason} -> + throw({error, {failed_starting_tcp_listen, Reason}}); + _ -> + throw({error, {failed_starting_tcp_listen, Error}}) + end. + + +start_udp(TO, RH, Port, Sup) -> + d("start udp transport"), + start_udp(TO, RH, Port, Sup, 250). + +start_udp(TO, RH, Port, Sup, Timeout) -> + d("udp open ~p", [Port]), + Opts = [{port, Port}, {receive_handle, RH}] ++ TO, + try_start_udp(Sup, Opts, Timeout, noError). + +try_start_udp(Sup, Opts, Timeout, Error0) when (Timeout < 5000) -> + d("try open udp socket (~p)", [Timeout]), + case megaco_udp:open(Sup, Opts) of + {ok, _SendHandle, _ControlPid} -> + d("port opened", []), + Sup; + Error1 when Error0 =:= noError -> % Keep the first error + d("failed open port [1]: ~p", [Error1]), + sleep(Timeout), + try_start_udp(Sup, Opts, Timeout*2, Error1); + Error2 -> + d("failed open port [2]: ~p", [Error2]), + sleep(Timeout), + try_start_udp(Sup, Opts, Timeout*2, Error0) + end; +try_start_udp(Sup, _Opts, _Timeout, Error) -> + megaco_udp:stop_transport(Sup), + throw({error, {failed_starting_udp_open, Error}}). + + +%% ----------------------- +%% Handle megaco callbacks +%% + +handle_megaco_request({handle_connect, CH, _PV}, #mgc{mg = MGs} = S) -> + case lists:member(CH, MGs) of + true -> + i("MG already connected: ~n ~p", [CH]), + {error, S}; + false -> + {ok, S#mgc{mg = [CH|MGs]}} + end; + +handle_megaco_request({handle_disconnect, CH, _PV, R}, S) -> + d("handle_megaco_request(handle_disconnect) -> entry with" + "~n CH: ~p" + "~n R: ~p", [CH, R]), + CancelRes = (catch megaco:cancel(CH, R)), % Cancel the outstanding messages + d("handle_megaco_request(handle_disconnect) -> megaco cancel result: ~p", [CancelRes]), + MGs = lists:delete(CH, S#mgc.mg), + d("handle_megaco_request(handle_disconnect) -> MGs: ~p", [MGs]), + {ok, S#mgc{mg = MGs}}; + +handle_megaco_request({handle_syntax_error, _RH, _PV, _ED}, S) -> + {reply, S}; + +handle_megaco_request({handle_message_error, _CH, _PV, _ED}, S) -> + {no_reply, S}; + +handle_megaco_request({handle_trans_request, CH, PV, ARs}, + #mgc{req_info = P} = S) when is_pid(P) -> + d("handle_megaco_request(handle_trans_request,~p) -> entry", [P]), + P ! {req_received, self(), ARs}, + do_handle_trans_request(CH, PV, ARs, S); +handle_megaco_request({handle_trans_request, CH, PV, ARs}, S) -> + d("handle_megaco_request(handle_trans_request) -> entry"), + do_handle_trans_request(CH, PV, ARs, S); + +handle_megaco_request({handle_trans_long_request, CH, PV, RD}, S) -> + d("handle_megaco_request(handle_long_trans_request) -> entry"), + Reply0 = handle_act_requests(CH, PV, RD, discard_ack), + Reply = + case S of + #mgc{req_action = ignore, req_timeout = To} -> + d("handle_megaco_request(handle_long_trans_request) -> " + "~n To: ~p", [To]), + {delay_reply, To, Reply0}; + _ -> + d("handle_megaco_request(handle_long_trans_request) -> " + "~n S: ~p", [S]), + Reply0 + end, + {Reply, S}; + +handle_megaco_request({handle_trans_reply, _CH, _PV, _AR, _RD}, S) -> + {ok, S}; + +handle_megaco_request({handle_trans_ack, CH, PV, AS, AD}, + #mgc{ack_info = P} = S) when is_pid(P) -> + d("handle_megaco_request(handle_trans_ack,~p) -> entry when" + "~n CH: ~p" + "~n PV: ~p" + "~n AS: ~p" + "~n AD: ~p", [P, CH, PV, AS, AD]), + P ! {ack_received, self(), AS}, + {ok, S}; + +handle_megaco_request({handle_trans_ack, CH, PV, AS, AD}, S) -> + d("handle_megaco_request(handle_trans_ack) -> entry with" + "~n CH: ~p" + "~n PV: ~p" + "~n AS: ~p" + "~n AD: ~p", [CH, PV, AS, AD]), + {ok, S}; + +handle_megaco_request({handle_unexpected_trans, CH, PV, TR}, S) -> + d("handle_megaco_request(handle_unexpected_trans) -> entry with" + "~n CH: ~p" + "~n PV: ~p" + "~n TR: ~p", [CH, PV, TR]), + {ok, S}; + +handle_megaco_request({handle_trans_request_abort, CH, PV, TI, Handler}, S) -> + d("handle_megaco_request(handle_trans_request_abort) -> entry with" + "~n CH: ~p" + "~n PV: ~p" + "~n TI: ~p" + "~n Handler: ~p", [CH, PV, TI, Handler]), + Reply = + case S#mgc.abort_info of + P when is_pid(P) -> + P ! {abort_received, self(), TI}, + ok; + _ -> + ok + end, + {Reply, S}. + + +do_handle_trans_request(CH, PV, ARs, + #mgc{req_action = Action, req_timeout = To} = S) -> + d("do_handle_megaco_request(handle_trans_request) -> entry with" + "~n Action: ~p" + "~n To: ~p", [Action, To]), + case handle_act_requests(CH, PV, ARs, Action) of + {pending_ignore, ActReqs} -> + {{pending, ActReqs}, S#mgc{req_action = ignore}}; + Reply -> + {{delay_reply, To, Reply}, S} + end. + + +handle_act_requests(_CH, _PV, _ActReqs, ignore) -> + ignore; +handle_act_requests(_CH, _PV, ActReqs, pending) -> + {pending, ActReqs}; +handle_act_requests(_CH, _PV, ActReqs, pending_ignore) -> + {pending_ignore, ActReqs}; +handle_act_requests(CH, PV, ActReqs, handle_ack) -> + Reply = (catch do_handle_act_requests(CH, PV, ActReqs, [])), + {{handle_ack, ActReqs}, Reply}; +handle_act_requests(CH, PV, ActReqs, handle_sloppy_ack) -> + Reply = (catch do_handle_act_requests(CH, PV, ActReqs, [])), + {{handle_sloppy_ack, ActReqs}, Reply}; +handle_act_requests(CH, PV, ActReqs, _) -> + Reply = (catch do_handle_act_requests(CH, PV, ActReqs, [])), + {discard_ack, Reply}. + +do_handle_act_requests(_CH, _PV, [], ActReplies) -> + lists:reverse(ActReplies); +do_handle_act_requests(CH, PV, [ActReq|ActReqs], ActReplies) -> + ActReply = handle_act_request(CH, PV, ActReq), + do_handle_act_requests(CH, PV, ActReqs, [ActReply|ActReplies]). + +handle_act_request(CH, PV, ActReq) -> + #'ActionRequest'{contextId = CtxId, commandRequests = Cmds} = ActReq, + CmdReplies = handle_cmd_requests(CH, PV, CtxId, Cmds), + #'ActionReply'{contextId = CtxId, + commandReply = CmdReplies}. + +handle_cmd_requests(CH, PV, ?megaco_null_context_id, + [#'CommandRequest'{command={serviceChangeReq,Req}}]) -> + Rep = service_change(CH, PV, Req), + [{serviceChangeReply, Rep}]; +handle_cmd_requests(CH, PV, CtxId, Cmds) -> + do_handle_cmd_requests(CH, PV, CtxId, Cmds, []). + +do_handle_cmd_requests(_CH, _PV, _CtxId, [], CmdReplies) -> + lists:reverse(CmdReplies); +do_handle_cmd_requests(CH, PV, CtxId, [Cmd|Cmds], CmdReplies) -> + CmdReply = handle_cmd_request(CH, PV, CtxId, Cmd), + do_handle_cmd_requests(CH, PV, CtxId, Cmds, [CmdReply|CmdReplies]). + +handle_cmd_request(CH, PV, CtxId, + #'CommandRequest'{command = {Tag,Req}}) -> + case Tag of + notifyReq -> + (catch handle_notify_req(CH,PV,CtxId,Req)); + + serviceChangeReq -> + ED = cre_error_descr(?megaco_not_implemented, + "Service change only allowed " + "on null context handled"), + throw(ED); + + _ -> + Code = ?megaco_not_implemented, + ED = cre_error_descr(Code,"Unknown command requst received:" + "~n Tag: ~p~n Req: ~p",[Tag,Req]), + throw(ED) + end. + +handle_notify_req(CH, PV, CtxId, + #'NotifyRequest'{terminationID = [Tid], + observedEventsDescriptor = EvDesc}) -> + handle_event(CH, PV, CtxId, Tid, EvDesc). + +handle_event(_CH, _PV, _Cid, Tid, EvDesc) -> + d("handle_event -> received" + "~n EvDesc: ~p" + "~n Tid: ~p", [EvDesc, Tid]), + {notifyReply, cre_notifyRep(Tid)}. + + +service_change(CH, _PV, SCR) -> + SCP = SCR#'ServiceChangeRequest'.serviceChangeParms, + #'ServiceChangeParm'{serviceChangeAddress = Address, + serviceChangeProfile = Profile, + serviceChangeReason = [_Reason]} = SCP, + TermId = SCR#'ServiceChangeRequest'.terminationID, + if + TermId == [?megaco_root_termination_id] -> + MyMid = CH#megaco_conn_handle.local_mid, + Res = {serviceChangeResParms, + cre_serviceChangeResParms(MyMid, Address, Profile)}, + cre_serviceChangeReply(TermId, Res); + true -> + Res = {errorDescriptor, + cre_error_descr(?megaco_not_implemented, + "Only handled for root")}, + cre_serviceChangeReply(TermId, Res) + end. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +cre_serviceChangeReply(TermId, Result) -> + #'ServiceChangeReply'{terminationID = TermId, + serviceChangeResult = Result}. + +cre_serviceChangeResParms(Mid, Addr, Prof) -> + #'ServiceChangeResParm'{serviceChangeMgcId = Mid, + serviceChangeAddress = Addr, + serviceChangeProfile = Prof}. + + +cre_notifyRep(Tid) -> + #'NotifyReply'{terminationID = [Tid]}. + +% cre_notifyRep(Tid,Err) -> +% #'NotifyReply'{terminationID = [Tid], errorDescriptor = Err}. + +cre_error_descr(Code,Text) -> + #'ErrorDescriptor'{errorCode = Code, errorText = Text}. + +cre_error_descr(Code,FormatString,Args) -> + Text = lists:flatten(io_lib:format(FormatString,Args)), + cre_error_descr(Code,Text). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +notify_started(Parent) -> + Parent ! {started, self()}. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% The megaco user callback interface + +handle_connect(CH, PV, Pid) -> + case CH#megaco_conn_handle.remote_mid of + preliminary_mid -> + %% Avoids deadlock + ok; + _ -> + Reply = request(Pid, {handle_connect, CH, PV}), + Reply + end. + +handle_disconnect(_CH, _PV, + {user_disconnect, {Pid, ignore}}, + Pid) -> + ok; +handle_disconnect(CH, _PV, + {user_disconnect, {Pid, cancel}}, + Pid) -> + megaco:cancel(CH, disconnected), + ok; +handle_disconnect(CH, PV, R, Pid) -> + request(Pid, {handle_disconnect, CH, PV, R}). + +handle_syntax_error(ReceiveHandle, ProtocolVersion, ErrorDescriptor, Pid) -> + Req = {handle_syntax_error, ReceiveHandle, ProtocolVersion, + ErrorDescriptor}, + request(Pid, Req). + +handle_message_error(ConnHandle, ProtocolVersion, ErrorDescriptor, Pid) -> + Req = {handle_message_error, ConnHandle, ProtocolVersion, ErrorDescriptor}, + request(Pid, Req). + +handle_trans_request(CH, PV, AR, Pid) -> + Reply = request(Pid, {handle_trans_request, CH, PV, AR}), + Reply. + +handle_trans_long_request(ConnHandle, ProtocolVersion, ReqData, Pid) -> + Req = {handle_trans_long_request, ConnHandle, ProtocolVersion, ReqData}, + request(Pid, Req). + +handle_trans_reply(ConnHandle, ProtocolVersion, ActualReply, ReplyData, Pid) -> + Req = {handle_trans_reply, ConnHandle, ProtocolVersion, + ActualReply, ReplyData}, + request(Pid, Req). + +handle_trans_ack(ConnHandle, ProtocolVersion, AckStatus, AckData, Pid) -> + Req = {handle_trans_ack, ConnHandle, ProtocolVersion, AckStatus, AckData}, + request(Pid, Req). + +handle_unexpected_trans(ConnHandle, ProtocolVersion, Trans, Pid) -> + Req = {handle_unexpected_trans, ConnHandle, ProtocolVersion, Trans}, + request(Pid, Req). + +handle_trans_request_abort(ConnHandle, ProtocolVersion, TransId, + Handler, Pid) -> + Req = {handle_trans_request_abort, + ConnHandle, ProtocolVersion, TransId, Handler}, + request(Pid, Req). + + +request(Pid, Request) -> + Pid ! {request, Request, self()}, + receive + {reply, {delay_reply, To, Reply}, Pid} -> + megaco:report_event(ignore, self(), Pid, + "reply: delay_reply", [To, Reply]), + sleep(To), + megaco:report_event(ignore, self(), Pid, + "reply: delay done now return", []), + Reply; + {reply, {exit, To, Reason}, Pid} -> + megaco:report_event(ignore, self(), Pid, + "reply: exit", [To, Reason]), + sleep(To), + megaco:report_event(ignore, self(), Pid, + "reply: sleep done now exit", []), + exit(Reason); + {reply, Reply, Pid} -> + megaco:report_event(ignore, self(), Pid, "reply", [Reply]), + Reply + end. + + +reply(To, Reply) -> + To ! {reply, Reply, self()}. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +sleep(X) -> + d("sleep -> ~w", [X]), + receive after X -> ok end. + + +error_msg(F,A) -> error_logger:error_msg("MGC: " ++ F ++ "~n",A). + + +get_encoding_module(RI) -> + case (catch get_conf(encoding_module, RI)) of + {error, _} -> + undefined; + Val -> + Val + end. + +get_encoding_config(RI, EM) -> + case text_codec(EM) of + true -> + case megaco:system_info(text_config) of + [Conf] when is_list(Conf) -> + Conf; + _ -> + [] + end; + + false -> + get_conf(encoding_config, RI) + end. + +text_codec(megaco_compact_text_encoder) -> + true; +text_codec(megaco_pretty_text_encoder) -> + true; +text_codec(_) -> + false. + + +get_transport_module(RI) -> + get_conf(transport_module, RI). + +get_transport_port(RI) -> + get_conf(port, RI). + +get_transport_opts(RI) -> + get_conf(transport_opts, RI, []). + + +get_conf(Key, Config) -> + case lists:keysearch(Key, 1, Config) of + {value, {Key, Val}} -> + Val; + _ -> + exit({error, {not_found, Key, Config}}) + end. + +get_conf(Key, Config, Default) -> + case lists:keysearch(Key, 1, Config) of + {value, {Key, Val}} -> + Val; + _ -> + Default + end. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +random_init() -> + {A,B,C} = now(), + random:seed(A,B,C). + +random(N) -> + random:uniform(N). + + +display_system_info(Mid) -> + display_system_info(Mid, ""). + +display_system_info(Mid, Pre) -> + TimeStr = format_timestamp(now()), + MibStr = lists:flatten(io_lib:format("~p ", [Mid])), + megaco_test_lib:display_system_info(MibStr ++ Pre ++ TimeStr). + + +create_timer(Time, Event) -> + erlang:send_after(Time, self(), {Event, Time}). + +cancel_timer(undefined) -> + ok; +cancel_timer(Ref) -> + erlang:cancel_timer(Ref). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +i(F) -> + i(F, []). + +i(F, A) -> + print(info, get(verbosity), "", F, A). + + +d(F) -> + d(F, []). + +d(F, A) -> + print(debug, get(verbosity), "DBG: ", F, A). + + +printable(_, debug) -> true; +printable(info, info) -> true; +printable(_,_) -> false. + +print(Severity, Verbosity, P, F, A) -> + print(printable(Severity,Verbosity), P, F, A). + +print(true, P, F, A) -> + print(P, F, A); +print(_, _, _, _) -> + ok. + +print(P, F, A) -> + io:format("*** [~s] ~s ~p ~s ***" + "~n " ++ F ++ "~n~n", + [format_timestamp(now()), P, self(), get(sname) | A]). + +format_timestamp({_N1, _N2, N3} = Now) -> + {Date, Time} = calendar:now_to_datetime(Now), + {YYYY,MM,DD} = Date, + {Hour,Min,Sec} = Time, + FormatDate = + io_lib:format("~.4w:~.2.0w:~.2.0w ~.2.0w:~.2.0w:~.2.0w 4~w", + [YYYY,MM,DD,Hour,Min,Sec,round(N3/1000)]), + lists:flatten(FormatDate). + -- cgit v1.2.3