diff options
Diffstat (limited to 'lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java')
-rw-r--r-- | lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java index 1b0fe3e2e6..ab8fa06c1b 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java @@ -20,7 +20,7 @@ package com.ericsson.otp.erlang; import java.io.IOException; -import java.net.Socket; +import java.io.OutputStream; import java.util.Random; /** @@ -84,7 +84,7 @@ public abstract class AbstractConnection extends Thread { private volatile boolean done = false; protected boolean connected = false; // connection status - protected Socket socket; // communication channel + protected OtpTransport socket; // communication channel protected OtpPeer peer; // who are we connected to protected OtpLocalNode localNode; // this nodes id String name; // local name of this connection @@ -126,7 +126,7 @@ public abstract class AbstractConnection extends Thread { * Accept an incoming connection from a remote node. Used by * {@link OtpSelf#accept() OtpSelf.accept()} to create a connection based on * data received when handshaking with the peer node, when the remote node - * is the connection intitiator. + * is the connection initiator. * * @exception java.io.IOException * if it was not possible to connect to the peer. @@ -134,20 +134,17 @@ public abstract class AbstractConnection extends Thread { * @exception OtpAuthException * if handshake resulted in an authentication error */ - protected AbstractConnection(final OtpLocalNode self, final Socket s) + protected AbstractConnection(final OtpLocalNode self, final OtpTransport s) throws IOException, OtpAuthException { localNode = self; - peer = new OtpPeer(); + peer = new OtpPeer(self.transportFactory); socket = s; - socket.setTcpNoDelay(true); - traceLevel = defaultLevel; setDaemon(true); if (traceLevel >= handshakeThreshold) { - System.out.println("<- ACCEPT FROM " + s.getInetAddress() + ":" - + s.getPort()); + System.out.println("<- ACCEPT FROM " + s); } // get his info @@ -189,6 +186,8 @@ public abstract class AbstractConnection extends Thread { // now get a connection between the two... port = OtpEpmd.lookupPort(peer); + if (port == 0) + throw new IOException("No remote node found - cannot connect"); // now find highest common dist value if (peer.proto != self.proto || self.distHigh < peer.distLow @@ -523,7 +522,9 @@ public abstract class AbstractConnection extends Thread { // received tick? send tock! if (len == 0) { synchronized (this) { - socket.getOutputStream().write(tock); + OutputStream out = socket.getOutputStream(); + out.write(tock); + out.flush(); } } @@ -837,8 +838,11 @@ public abstract class AbstractConnection extends Thread { } } - header.writeTo(socket.getOutputStream()); - payload.writeTo(socket.getOutputStream()); + // group flush op in favour of possible ssh-tunneled stream + OutputStream out = socket.getOutputStream(); + header.writeTo(out); + payload.writeTo(out); + out.flush(); } catch (final IOException e) { close(); throw e; @@ -859,7 +863,7 @@ public abstract class AbstractConnection extends Thread { + e); } } - header.writeTo(socket.getOutputStream()); + header.writeToAndFlush(socket.getOutputStream()); } catch (final IOException e) { close(); throw e; @@ -913,7 +917,8 @@ public abstract class AbstractConnection extends Thread { } /* this method now throws exception if we don't get full read */ - protected int readSock(final Socket s, final byte[] b) throws IOException { + protected int readSock(final OtpTransport s, final byte[] b) + throws IOException { int got = 0; final int len = b.length; int i; @@ -980,8 +985,7 @@ public abstract class AbstractConnection extends Thread { protected void doConnect(final int port) throws IOException, OtpAuthException { try { - socket = new Socket(peer.host(), port); - socket.setTcpNoDelay(true); + socket = peer.createTransport(peer.host(), port); if (traceLevel >= handshakeThreshold) { System.out.println("-> MD5 CONNECT TO " + peer.host() + ":" @@ -1077,7 +1081,7 @@ public abstract class AbstractConnection extends Thread { obuf.write4BE(aflags); obuf.write(str.getBytes()); - obuf.writeTo(socket.getOutputStream()); + obuf.writeToAndFlush(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendName" + " flags=" @@ -1098,7 +1102,7 @@ public abstract class AbstractConnection extends Thread { obuf.write4BE(challenge); obuf.write(str.getBytes()); - obuf.writeTo(socket.getOutputStream()); + obuf.writeToAndFlush(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendChallenge" + " flags=" @@ -1232,7 +1236,7 @@ public abstract class AbstractConnection extends Thread { obuf.write1(ChallengeReply); obuf.write4BE(challenge); obuf.write(digest); - obuf.writeTo(socket.getOutputStream()); + obuf.writeToAndFlush(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendChallengeReply" @@ -1294,7 +1298,7 @@ public abstract class AbstractConnection extends Thread { obuf.write1(ChallengeAck); obuf.write(digest); - obuf.writeTo(socket.getOutputStream()); + obuf.writeToAndFlush(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendChallengeAck" @@ -1341,7 +1345,7 @@ public abstract class AbstractConnection extends Thread { obuf.write1(ChallengeStatus); obuf.write(status.getBytes()); - obuf.writeTo(socket.getOutputStream()); + obuf.writeToAndFlush(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendStatus" + " status=" |