aboutsummaryrefslogblamecommitdiffstats
path: root/lib/dialyzer/test/small_SUITE_data/src/comm_layer/comm_connection.erl
blob: 48cc50ae2138e8d547edc5e36cf53024264b3498 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16















                                                                            

                                                                      


























                                                                      


                                                                        



                                             

                                                                          





                                                                                             
                          
           
                                  
                 

                                                    








                                                                              
                                                                































                                                                                 
                                                          



























                                                                                          

                                                               






























                                                                                       

                                                                                              












                                                                                    
                                                                                         

                                                        
                                                                 

                                   
                                                                                 




                                                         
                                                                   


                                              
 



                                        
%  Copyright 2008 Konrad-Zuse-Zentrum f�r Informationstechnik Berlin
%
%   Licensed under the Apache License, Version 2.0 (the "License");
%   you may not use this file except in compliance with the License.
%   You may obtain a copy of the License at
%
%       http://www.apache.org/licenses/LICENSE-2.0
%
%   Unless required by applicable law or agreed to in writing, software
%   distributed under the License is distributed on an "AS IS" BASIS,
%   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%   See the License for the specific language governing permissions and
%   limitations under the License.
%%%-------------------------------------------------------------------
%%% File    : comm_connection.erl
%%% Author  : Thorsten Schuett <[email protected]>
%%% Description : creates and destroys connections and represents the
%%%           endpoint of a connection where messages are received and
%%            send from/to the network.
%%%
%%% Created : 18 Apr 2008 by Thorsten Schuett <[email protected]>
%%%-------------------------------------------------------------------
%% @author Thorsten Schuett <[email protected]>
%% @copyright 2008 Konrad-Zuse-Zentrum f�r Informationstechnik Berlin
%% @version $Id $
-module(comm_layer_dir.comm_connection).

-export([send/3, open_new/4, new/3, open_new_async/4]).

-import(config).
-import(gen_tcp).
-import(inet).
-import(io).
-import(io_lib).
-import(log).
-import(timer).

-include("comm_layer.hrl").

%% @doc new accepted connection. called by comm_acceptor
%% @spec new(inet:ip_address(), int(), socket()) -> pid()
new(Address, Port, Socket) ->
    spawn(fun () -> loop(Socket, Address, Port) end).

%% @doc open new connection
%% @spec open_new(inet:ip_address(), int(), inet:ip_address(), int()) ->
%%       {local_ip, inet:ip_address(), int(), pid(), inet:socket()}
%%     | fail
%%     | {connection, pid(), inet:socket()}
open_new(Address, Port, undefined, MyPort) ->
    Myself = self(),
    LocalPid = spawn(fun () ->
			     case new_connection(Address, Port, MyPort) of
				 fail ->
				     Myself ! {new_connection_failed};
				 Socket ->
				     {ok, {MyIP, _MyPort}} = inet:sockname(Socket),
				     Myself ! {new_connection_started, MyIP, MyPort, Socket},
				     loop(Socket, Address, Port)
			     end
		     end),
    receive
	{new_connection_failed} ->
	    fail;
	{new_connection_started, MyIP, MyPort, S} ->
	    {local_ip, MyIP, MyPort, LocalPid, S}
    end;
open_new(Address, Port, _MyAddress, MyPort) ->
    Owner = self(),
    LocalPid = spawn(fun () ->
			     case new_connection(Address, Port, MyPort) of
				 fail ->
				     Owner ! {new_connection_failed};
				 Socket ->
				     Owner ! {new_connection_started, Socket},
				     loop(Socket, Address, Port)
			     end
		     end),
    receive
	{new_connection_failed} ->
	    fail;
	{new_connection_started, Socket} ->
	    {connection, LocalPid, Socket}
    end.

% ===============================================================================
% @doc open a new connection asynchronously
% ===============================================================================
-spec(open_new_async/4 :: (any(), any(), any(), any()) -> pid()).
open_new_async(Address, Port, _MyAddr, MyPort) ->
    Pid = spawn(fun () ->
			case new_connection(Address, Port, MyPort) of
			    fail ->
				comm_port:unregister_connection(Address, Port),
				ok;
			    Socket ->
				loop(Socket, Address, Port)
			end
		end),
    Pid.


send({Address, Port, Socket}, Pid, Message) ->
    BinaryMessage = term_to_binary({deliver, Pid, Message}),
    SendTimeout    = config:read(tcp_send_timeout),
    {Time, Result} = timer:tc(gen_tcp, send, [Socket, BinaryMessage]),
    if
	Time > 1200 * SendTimeout ->
	    log:log(error,"[ CC ] send to ~p took ~p: ~p",
		    [Address, Time, inet:getopts(Socket, [keep_alive, send_timeout])]);
	true ->
	    ok
    end,
    case Result of
	ok ->
	    ?LOG_MESSAGE(erlang:element(1, Message), byte_size(BinaryMessage)),
	    ok;
	{error, closed} ->
	    comm_port:unregister_connection(Address, Port),
	    close_connection(Socket);
	{error, _Reason} ->
	    %log:log(error,"[ CC ] couldn't send to ~p:~p (~p)", [Address, Port, Reason]),
	    comm_port:unregister_connection(Address, Port),
	    close_connection(Socket)
    end.

loop(fail, Address, Port) ->
    comm_port:unregister_connection(Address, Port),
    ok;
loop(Socket, Address, Port) ->
    receive
	{send, Pid, Message} ->
	    case send({Address, Port, Socket}, Pid, Message) of
		ok -> loop(Socket, Address, Port);
		_ -> ok
	    end;
	{tcp_closed, Socket} ->
		comm_port:unregister_connection(Address, Port),
		gen_tcp:close(Socket);
	{tcp, Socket, Data} ->
	    case binary_to_term(Data) of
	        {deliver, Process, Message} ->
		    Process ! Message,
		    inet:setopts(Socket, [{active, once}]),
		    loop(Socket, Address, Port);
		{user_close} ->
		    comm_port:unregister_connection(Address, Port),
		    gen_tcp:close(Socket);
		{youare, _Address, _Port} ->
		    %% @TODO what do we get from this information?
		    inet:setopts(Socket, [{active, once}]),
		    loop(Socket, Address, Port);
		Unknown ->
		    log:log(warn,"[ CC ] unknown message ~p", [Unknown]),
		    inet:setopts(Socket, [{active, once}]),
		    loop(Socket, Address, Port)
	    end;

	{youare, _IP, _Port} ->
	    loop(Socket, Address, Port);

        Unknown ->
	    log:log(warn,"[ CC ] unknown message2 ~p", [Unknown]) ,
	    loop(Socket, Address, Port)
    end.

% ===============================================================================

-spec(new_connection(inet:ip_address(), integer(), integer()) -> inet:socket() | fail).
new_connection(Address, Port, MyPort) ->
    case gen_tcp:connect(Address, Port, [binary, {packet, 4}, {nodelay, true}, {active, once},
					 {send_timeout, config:read(tcp_send_timeout)}],
			 config:read(tcp_connect_timeout)) of
        {ok, Socket} ->
	    % send end point data
	    case inet:sockname(Socket) of
		{ok, {MyAddress, _MyPort}} ->
		    Message = term_to_binary({endpoint, MyAddress, MyPort}),
	            gen_tcp:send(Socket, Message),
		    case inet:peername(Socket) of
			{ok, {RemoteIP, RemotePort}} ->
			    YouAre = term_to_binary({youare, RemoteIP, RemotePort}),
		            gen_tcp:send(Socket, YouAre),
		            Socket;
			{error, _Reason} ->
			    %log:log(error,"[ CC ] reconnect to ~p because socket is ~p",
				%    [Address, Reason]),
			    close_connection(Socket),
			    new_connection(Address, Port, MyPort)
		    end;
		{error, _Reason} ->
		    %log:log(error,"[ CC ] reconnect to ~p because socket is ~p",
			%    [Address, Reason]),
		    close_connection(Socket),
	            new_connection(Address, Port, MyPort)
	    end;
        {error, _Reason} ->
            %log:log(error,"[ CC ] couldn't connect to ~p:~p (~p)",
		    %[Address, Port, Reason]),
	    fail
    end.

close_connection(Socket) ->
    spawn( fun () ->
		   gen_tcp:close(Socket)
	   end ).