%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2016-2016. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
-module(dtls_udp_listener).
-behaviour(gen_server).
%% API
-export([start_link/4, active_once/3, accept/2, sockname/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state,
{port,
listner,
dtls_options,
emulated_options,
dtls_msq_queues = kv_new(),
clients = set_new(),
dtls_processes = kv_new(),
accepters = queue:new(),
first
}).
%%%===================================================================
%%% API
%%%===================================================================
start_link(Port, EmOpts, InetOptions, DTLSOptions) ->
gen_server:start_link(?MODULE, [Port, EmOpts, InetOptions, DTLSOptions], []).
active_once(UDPConnection, Client, Pid) ->
gen_server:cast(UDPConnection, {active_once, Client, Pid}).
accept(UDPConnection, Accepter) ->
gen_server:call(UDPConnection, {accept, Accepter}, infinity).
sockname(UDPConnection) ->
gen_server:call(UDPConnection, sockname, infinity).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([Port, EmOpts, InetOptions, DTLSOptions]) ->
try
{ok, Socket} = gen_udp:open(Port, InetOptions),
{ok, #state{port = Port,
first = true,
dtls_options = DTLSOptions,
emulated_options = EmOpts,
listner = Socket}}
catch _:_ ->
{error, closed}
end.
handle_call({accept, Accepter}, From, #state{first = true,
accepters = Accepters,
listner = Socket} = State0) ->
next_datagram(Socket),
State = State0#state{first = false,
accepters = queue:in({Accepter, From}, Accepters)},
{noreply, State};
handle_call({accept, Accepter}, From, #state{accepters = Accepters} = State0) ->
State = State0#state{accepters = queue:in({Accepter, From}, Accepters)},
{noreply, State};
handle_call(sockname, _, #state{listner = Socket} = State) ->
Reply = inet:sockname(Socket),
{reply, Reply, State}.
handle_cast({active_once, Client, Pid}, State0) ->
State = handle_active_once(Client, Pid, State0),
{noreply, State}.
handle_info({udp, Socket, IP, InPortNo, _} = Msg, #state{listner = Socket} = State0) ->
State = handle_datagram({IP, InPortNo}, Msg, State0),
next_datagram(Socket),
{noreply, State};
handle_info({'DOWN', _, process, Pid, _}, #state{clients = Clients,
dtls_processes = Processes0} = State) ->
Client = kv_get(Pid, Processes0),
Processes = kv_delete(Pid, Processes0),
{noreply, State#state{clients = set_delete(Client, Clients),
dtls_processes = Processes}}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
handle_datagram(Client, Msg, #state{clients = Clients,
accepters = AcceptorsQueue0} = State) ->
case set_is_member(Client, Clients) of
false ->
case queue:out(AcceptorsQueue0) of
{{value, {UserPid, From}}, AcceptorsQueue} ->
setup_new_connection(UserPid, From, Client, Msg,
State#state{accepters = AcceptorsQueue});
{empty, _} ->
%% Drop packet client will resend
State
end;
true ->
dispatch(Client, Msg, State)
end.
dispatch(Client, Msg, #state{dtls_msq_queues = MsgQueues} = State) ->
case kv_lookup(Client, MsgQueues) of
{value, Queue0} ->
case queue:out(Queue0) of
{{value, Pid}, Queue} when is_pid(Pid) ->
Pid ! Msg,
State#state{dtls_msq_queues =
kv_update(Client, Queue, MsgQueues)};
{{value, _}, Queue} ->
State#state{dtls_msq_queues =
kv_update(Client, queue:in(Msg, Queue), MsgQueues)};
{empty, Queue} ->
State#state{dtls_msq_queues =
kv_update(Client, queue:in(Msg, Queue), MsgQueues)}
end
end.
next_datagram(Socket) ->
inet:setopts(Socket, [{active, once}]).
handle_active_once(Client, Pid, #state{dtls_msq_queues = MsgQueues} = State0) ->
Queue0 = kv_get(Client, MsgQueues),
case queue:out(Queue0) of
{{value, Pid}, _} when is_pid(Pid) ->
State0;
{{value, Msg}, Queue} ->
Pid ! Msg,
State0#state{dtls_msq_queues = kv_update(Client, Queue, MsgQueues)};
{empty, Queue0} ->
State0#state{dtls_msq_queues = kv_update(Client, queue:in(Pid, Queue0), MsgQueues)}
end.
setup_new_connection(User, From, Client, Msg, #state{dtls_processes = Processes,
clients = Clients,
dtls_msq_queues = MsgQueues,
dtls_options = DTLSOpts,
port = Port,
listner = Socket,
emulated_options = EmOpts} = State) ->
ConnArgs = [server, "localhost", Port, {self(), {Client, Socket}},
{DTLSOpts, EmOpts, udp_listner}, User, dtls_socket:default_cb_info()],
case dtls_connection_sup:start_child(ConnArgs) of
{ok, Pid} ->
erlang:monitor(process, Pid),
gen_server:reply(From, {ok, Pid, {Client, Socket}}),
Pid ! Msg,
State#state{clients = set_insert(Client, Clients),
dtls_msq_queues = kv_insert(Client, queue:new(), MsgQueues),
dtls_processes = kv_insert(Pid, Client, Processes)};
{error, Reason} ->
gen_server:reply(From, {error, Reason}),
State
end.
kv_update(Key, Value, Store) ->
gb_trees:update(Key, Value, Store).
kv_lookup(Key, Store) ->
gb_trees:lookup(Key, Store).
kv_insert(Key, Value, Store) ->
gb_trees:insert(Key, Value, Store).
kv_get(Key, Store) ->
gb_trees:get(Key, Store).
kv_delete(Key, Store) ->
gb_trees:delete(Key, Store).
kv_new() ->
gb_trees:empty().
set_new() ->
gb_sets:empty().
set_insert(Item, Set) ->
gb_sets:insert(Item, Set).
set_delete(Item, Set) ->
gb_sets:delete(Item, Set).
set_is_member(Item, Set) ->
gb_sets:is_member(Item, Set).