%%--------------------------------------------------------------------
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2001-2015. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%%
%%----------------------------------------------------------------------
%% File : oe_CosEventComm_PullerS_impl.erl
%% Description :
%%
%%----------------------------------------------------------------------
-module(oe_CosEventComm_PullerS_impl).
%%----------------------------------------------------------------------
%% Include files
%%----------------------------------------------------------------------
-include_lib("orber/include/corba.hrl").
-include("CosEventChannelAdmin.hrl").
-include("CosEventComm.hrl").
-include("cosEventApp.hrl").
%%----------------------------------------------------------------------
%% External exports
%%----------------------------------------------------------------------
-export([init/1,
terminate/2,
code_change/3,
handle_info/2]).
%% Exports from "CosEventChannelAdmin::ProxyPullSupplier"
-export([connect_pull_consumer/4]).
%% Exports from "CosEventComm::PullSupplier"
-export([pull/3,
try_pull/3,
disconnect_pull_supplier/3]).
%%----------------------------------------------------------------------
%% Internal exports
%%----------------------------------------------------------------------
%% Exports from "oe_CosEventComm::Event
-export([send/3, send_sync/4]).
%%----------------------------------------------------------------------
%% Records
%%----------------------------------------------------------------------
-record(state, {admin_pid, client, db, respond_to, typecheck, maxevents}).
%%----------------------------------------------------------------------
%% Macros
%%----------------------------------------------------------------------
%%======================================================================
%% External functions
%%======================================================================
%%---------------------------------------------------------------------%
%% Function : init/1
%% Returns : {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%----------------------------------------------------------------------
init([AdminPid, TypeCheck, MaxEvents]) ->
process_flag(trap_exit, true),
{ok, #state{admin_pid = AdminPid,
db = ets:new(oe_ets, [set, private, ordered_set]),
typecheck = TypeCheck, maxevents = MaxEvents}}.
%%---------------------------------------------------------------------%
%% function : handle_info
%% Arguments:
%% Returns : {noreply, State} |
%% {stop, Reason, State}
%% Effect : Functions demanded by the gen_server module.
%% The CosEvent specification states:
%% "A nil object reference may be passed to the connect_pull_consumer operation;
%% if so a channel cannot invoke a disconnect_pull_consumer operation on the
%% consumer; the consumer may be disconnected from the channel without being
%% informed."
%% If we would invoke the disconnect_pull_consumer operation
%% at the same time as the client tries to pull an event it
%% would cause a dead-lock. We can solve this by spawning a process
%% but as is the client will discover that the object no longer exists
%% the next time it tries to pull an event.
%%----------------------------------------------------------------------
handle_info({'EXIT', Pid, Reason}, #state{admin_pid = Pid} = State) ->
orber:dbg("[~p] oe_CosEventComm_PullerS_impl:handle_info(~p);~n"
"My Admin terminated and so will I.",
[?LINE, Reason], ?DEBUG_LEVEL),
{stop, Reason, State};
handle_info(_Info, State) ->
?DBG("Unknown Info ~p~n", [_Info]),
{noreply, State}.
%%---------------------------------------------------------------------%
%% Function : terminate/2
%% Returns : any (ignored by gen_server)
%% Description: Shutdown the server
%%----------------------------------------------------------------------
terminate(_Reason, #state{client = undefined, respond_to = undefined, db = DB}) ->
?DBG("Terminating ~p; no client connected and no pending pull's.~n", [_Reason]),
ets:delete(DB),
ok;
terminate(_Reason, #state{client = undefined, respond_to = ReplyTo, db = DB}) ->
?DBG("Terminating ~p; no client connected but a pending pull.~n", [_Reason]),
corba:reply(ReplyTo, {'EXCEPTION', #'CosEventComm_Disconnected'{}}),
ets:delete(DB),
ok;
terminate(_Reason, #state{client = Client, respond_to = undefined, db = DB}) ->
?DBG("Terminating ~p; no pending pull~n", [_Reason]),
cosEventApp:disconnect('CosEventComm_PullConsumer',
disconnect_pull_consumer, Client),
ets:delete(DB),
ok;
terminate(_Reason, #state{client = Client, respond_to = ReplyTo, db = DB}) ->
?DBG("Terminating ~p; pending pull~n", [_Reason]),
corba:reply(ReplyTo, {'EXCEPTION', #'CosEventComm_Disconnected'{}}),
cosEventApp:disconnect('CosEventComm_PullConsumer',
disconnect_pull_consumer, Client),
ets:delete(DB),
ok.
%%---------------------------------------------------------------------%
%% Function : code_change/3
%% Returns : {ok, NewState}
%% Description: Convert process state when code is changed
%%----------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%---------------------------------------------------------------------%
%% Function : connect_pull_consumer
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
connect_pull_consumer(_OE_This, _OE_From, #state{client = undefined,
typecheck = TypeCheck} = State,
NewClient) ->
case corba_object:is_nil(NewClient) of
true ->
?DBG("A NIL client supplied.~n", []),
{reply, ok, State};
false ->
cosEventApp:type_check(NewClient, 'CosEventComm_PullConsumer', TypeCheck),
?DBG("Connected to client.~n", []),
{reply, ok, State#state{client = NewClient}}
end;
connect_pull_consumer(_, _, _, _) ->
corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}).
%%---------------------------------------------------------------------%
%% Function : pull
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
pull(_OE_This, OE_From, State) ->
case get_event(State#state.db) of
false ->
?DBG("pull invoked but no event stored; put the client on hold.~n", []),
{noreply, State#state{respond_to = OE_From}};
Event ->
?DBG("pull invoked and returned: ~p~n", [Event]),
{reply, Event, State}
end.
%%---------------------------------------------------------------------%
%% Function : try_pull
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
try_pull(_OE_This, _OE_From, State) ->
case get_event(State#state.db) of
false ->
?DBG("try_pull invoked but no event stored.~n", []),
{reply, {any:create(orber_tc:long(), 0), false}, State};
Event ->
?DBG("try_pull invoked and returned: ~p~n", [Event]),
{reply, {Event, true}, State}
end.
%%---------------------------------------------------------------------%
%% Function : disconnect_pull_supplier
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
disconnect_pull_supplier(_OE_This, _OE_From, State) ->
?DBG("Disconnect invoked ~p ~n", [State]),
{stop, normal, ok, State#state{client = undefined}}.
%%======================================================================
%% Internal functions
%%======================================================================
%%---------------------------------------------------------------------%
%% Function : send
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
send(_OE_This, #state{respond_to = undefined} = State, Any) ->
?DBG("Received event ~p and stored it.~n", [Any]),
store_event(State#state.db, State#state.maxevents, Any),
{noreply, State};
send(_OE_This, State, Any) ->
?DBG("Received event ~p and sent it to pending client.~n", [Any]),
corba:reply(State#state.respond_to, Any),
{noreply, State#state{respond_to = undefined}}.
%%---------------------------------------------------------------------%
%% Function : send_sync
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
send_sync(_OE_This, _OE_From, #state{respond_to = undefined} = State, Any) ->
?DBG("Received event ~p and stored it (sync).~n", [Any]),
store_event(State#state.db, State#state.maxevents, Any),
{reply, ok, State};
send_sync(_OE_This, _OE_From, State, Any) ->
?DBG("Received event ~p and sent it to pending client (sync).~n", [Any]),
corba:reply(State#state.respond_to, Any),
{reply, ok, State#state{respond_to = undefined}}.
%%---------------------------------------------------------------------%
%% Function : store_event
%% Arguments : DB - ets reference
%% Event - CORBA::Any
%% Returns : true
%% Description: Insert the event in FIFO order.
%%----------------------------------------------------------------------
store_event(DB, Max, Event) ->
case ets:info(DB, size) of
CurrentSize when CurrentSize < Max ->
ets:insert(DB, {{erlang:system_time(), erlang:unique_integer([positive])},
Event});
_ ->
orber:dbg("[~p] oe_CosEventComm_PullerS:store_event(~p); DB full drop event.",
[?LINE, Event], ?DEBUG_LEVEL),
true
end.
%%---------------------------------------------------------------------%
%% Function : get_event
%% Arguments : DB - ets reference
%% Event - CORBA::Any
%% Returns : false | Event (CORBA::Any)
%% Description: Lookup event in FIFO order; return false if no event exists.
%%----------------------------------------------------------------------
get_event(DB) ->
case ets:first(DB) of
'$end_of_table' ->
false;
Key ->
[{_, Event}] = ets:lookup(DB, Key),
ets:delete(DB, Key),
Event
end.
%%======================================================================
%% END OF MODULE
%%======================================================================