1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
|
/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2000-2009. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
package com.ericsson.otp.erlang;
import java.io.IOException;
/**
* <p>
* Maintains a connection between a Java process and a remote Erlang, Java or C
* node. The object maintains connection state and allows data to be sent to and
* received from the peer.
* </p>
*
* <p>
* Once a connection is established between the local node and a remote node,
* the connection object can be used to send and receive messages between the
* nodes.
* </p>
*
* <p>
* The various receive methods are all blocking and will return only when a
* valid message has been received or an exception is raised.
* </p>
*
* <p>
* If an exception occurs in any of the methods in this class, the connection
* will be closed and must be reopened in order to resume communication with the
* peer.
* </p>
*
* <p>
* The message delivery methods in this class deliver directly to
* {@link OtpMbox mailboxes} in the {@link OtpNode OtpNode} class.
* </p>
*
* <p>
* It is not possible to create an instance of this class directly.
* OtpCookedConnection objects are created as needed by the underlying mailbox
* mechanism.
* </p>
*/
public class OtpCookedConnection extends AbstractConnection {
protected OtpNode self;
/*
* The connection needs to know which local pids have links that pass
* through here, so that they can be notified in case of connection failure
*/
protected Links links = null;
/*
* Accept an incoming connection from a remote node. Used by {@link
* OtpSelf#accept() OtpSelf.accept()} to create a connection based on data
* received when handshaking with the peer node, when the remote node is the
* connection intitiator.
*
* @exception java.io.IOException if it was not possible to connect to the
* peer.
*
* @exception OtpAuthException if handshake resulted in an authentication
* error
*/
// package scope
OtpCookedConnection(final OtpNode self, final OtpTransport s)
throws IOException, OtpAuthException {
super(self, s);
this.self = self;
links = new Links(25);
start();
}
/*
* Intiate and open a connection to a remote node.
*
* @exception java.io.IOException if it was not possible to connect to the
* peer.
*
* @exception OtpAuthException if handshake resulted in an authentication
* error.
*/
// package scope
OtpCookedConnection(final OtpNode self, final OtpPeer other)
throws IOException, OtpAuthException {
super(self, other);
this.self = self;
links = new Links(25);
start();
}
// pass the error to the node
@Override
public void deliver(final Exception e) {
self.deliverError(this, e);
return;
}
/*
* pass the message to the node for final delivery. Note that the connection
* itself needs to know about links (in case of connection failure), so we
* snoop for link/unlink too here.
*/
@Override
public void deliver(final OtpMsg msg) {
final boolean delivered = self.deliver(msg);
switch (msg.type()) {
case OtpMsg.linkTag:
if (delivered) {
links.addLink(msg.getRecipientPid(), msg.getSenderPid());
} else {
try {
// no such pid - send exit to sender
super.sendExit(msg.getRecipientPid(), msg.getSenderPid(),
new OtpErlangAtom("noproc"));
} catch (final IOException e) {
}
}
break;
case OtpMsg.unlinkTag:
case OtpMsg.exitTag:
links.removeLink(msg.getRecipientPid(), msg.getSenderPid());
break;
case OtpMsg.exit2Tag:
break;
}
return;
}
/*
* send to pid
*/
@SuppressWarnings("resource")
void send(final OtpErlangPid from, final OtpErlangPid dest,
final OtpErlangObject msg) throws IOException {
// encode and send the message
sendBuf(from, dest, new OtpOutputStream(msg));
}
/*
* send to remote name dest is recipient's registered name, the nodename is
* implied by the choice of connection.
*/
@SuppressWarnings("resource")
void send(final OtpErlangPid from, final String dest,
final OtpErlangObject msg) throws IOException {
// encode and send the message
sendBuf(from, dest, new OtpOutputStream(msg));
}
@Override
public void close() {
super.close();
breakLinks();
}
@Override
protected void finalize() {
close();
}
/*
* this one called by dying/killed process
*/
void exit(final OtpErlangPid from, final OtpErlangPid to,
final OtpErlangObject reason) {
try {
super.sendExit(from, to, reason);
} catch (final Exception e) {
}
}
/*
* this one called explicitely by user code => use exit2
*/
void exit2(final OtpErlangPid from, final OtpErlangPid to,
final OtpErlangObject reason) {
try {
super.sendExit2(from, to, reason);
} catch (final Exception e) {
}
}
/*
* snoop for outgoing links and update own table
*/
synchronized void link(final OtpErlangPid from, final OtpErlangPid to)
throws OtpErlangExit {
try {
super.sendLink(from, to);
links.addLink(from, to);
} catch (final IOException e) {
throw new OtpErlangExit("noproc", to);
}
}
/*
* snoop for outgoing unlinks and update own table
*/
synchronized void unlink(final OtpErlangPid from, final OtpErlangPid to) {
links.removeLink(from, to);
try {
super.sendUnlink(from, to);
} catch (final IOException e) {
}
}
/*
* When the connection fails - send exit to all local pids with links
* through this connection
*/
synchronized void breakLinks() {
if (links != null) {
final Link[] l = links.clearLinks();
if (l != null) {
final int len = l.length;
for (int i = 0; i < len; i++) {
// send exit "from" remote pids to local ones
self.deliver(new OtpMsg(OtpMsg.exitTag, l[i].remote(), l[i]
.local(), new OtpErlangAtom("noconnection")));
}
}
}
}
}
|