1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
|
/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2000-2009. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
package com.ericsson.otp.erlang;
/**
* <p>
* Provides a simple mechanism for exchanging messages with Erlang processes or
* other instances of this class.
* </p>
*
* <p>
* Each mailbox is associated with a unique {@link OtpErlangPid pid} that
* contains information necessary for delivery of messages. When sending
* messages to named processes or mailboxes, the sender pid is made available to
* the recipient of the message. When sending messages to other mailboxes, the
* recipient can only respond if the sender includes the pid as part of the
* message contents. The sender can determine his own pid by calling
* {@link #self self()}.
* </p>
*
* <p>
* Mailboxes can be named, either at creation or later. Messages can be sent to
* named mailboxes and named Erlang processes without knowing the
* {@link OtpErlangPid pid} that identifies the mailbox. This is neccessary in
* order to set up initial communication between parts of an application. Each
* mailbox can have at most one name.
* </p>
*
* <p>
* Since this class was intended for communication with Erlang, all of the send
* methods take {@link OtpErlangObject OtpErlangObject} arguments. However this
* class can also be used to transmit arbitrary Java objects (as long as they
* implement one of java.io.Serializable or java.io.Externalizable) by
* encapsulating the object in a {@link OtpErlangBinary OtpErlangBinary}.
* </p>
*
* <p>
* Messages to remote nodes are externalized for transmission, and as a result
* the recipient receives a <b>copy</b> of the original Java object. To ensure
* consistent behaviour when messages are sent between local mailboxes, such
* messages are cloned before delivery.
* </p>
*
* <p>
* Additionally, mailboxes can be linked in much the same way as Erlang
* processes. If a link is active when a mailbox is {@link #close closed}, any
* linked Erlang processes or OtpMboxes will be sent an exit signal. As well,
* exit signals will be (eventually) sent if a mailbox goes out of scope and its
* {@link #finalize finalize()} method called. However due to the nature of
* finalization (i.e. Java makes no guarantees about when {@link #finalize
* finalize()} will be called) it is recommended that you always explicitly
* close mailboxes if you are using links instead of relying on finalization to
* notify other parties in a timely manner.
* </p>
*
* When retrieving messages from a mailbox that has received an exit signal, an
* {@link OtpErlangExit OtpErlangExit} exception will be raised. Note that the
* exception is queued in the mailbox along with other messages, and will not be
* raised until it reaches the head of the queue and is about to be retrieved.
* </p>
*
*/
public class OtpMbox {
OtpNode home;
OtpErlangPid self;
GenericQueue queue;
String name;
Links links;
// package constructor: called by OtpNode:createMbox(name)
// to create a named mbox
OtpMbox(final OtpNode home, final OtpErlangPid self, final String name) {
this.self = self;
this.home = home;
this.name = name;
queue = new GenericQueue();
links = new Links(10);
}
// package constructor: called by OtpNode:createMbox()
// to create an anonymous
OtpMbox(final OtpNode home, final OtpErlangPid self) {
this(home, self, null);
}
/**
* <p>
* Get the identifying {@link OtpErlangPid pid} associated with this
* mailbox.
* </p>
*
* <p>
* The {@link OtpErlangPid pid} associated with this mailbox uniquely
* identifies the mailbox and can be used to address the mailbox. You can
* send the {@link OtpErlangPid pid} to a remote communicating part so that
* he can know where to send his response.
* </p>
*
* @return the self pid for this mailbox.
*/
public OtpErlangPid self() {
return self;
}
/**
* <p>
* Register or remove a name for this mailbox. Registering a name for a
* mailbox enables others to send messages without knowing the
* {@link OtpErlangPid pid} of the mailbox. A mailbox can have at most one
* name; if the mailbox already had a name, calling this method will
* supercede that name.
* </p>
*
* @param name
* the name to register for the mailbox. Specify null to
* unregister the existing name from this mailbox.
*
* @return true if the name was available, or false otherwise.
*/
public synchronized boolean registerName(final String name) {
return home.registerName(name, this);
}
/**
* Get the registered name of this mailbox.
*
* @return the registered name of this mailbox, or null if the mailbox had
* no registerd name.
*/
public String getName() {
return name;
}
/**
* Block until a message arrives for this mailbox.
*
* @return an {@link OtpErlangObject OtpErlangObject} representing the body
* of the next message waiting in this mailbox.
*
* @exception OtpErlangDecodeException
* if the message can not be decoded.
*
* @exception OtpErlangExit
* if a linked {@link OtpErlangPid pid} has exited or has
* sent an exit signal to this mailbox.
*/
public OtpErlangObject receive() throws OtpErlangExit,
OtpErlangDecodeException {
try {
return receiveMsg().getMsg();
} catch (final OtpErlangExit e) {
throw e;
} catch (final OtpErlangDecodeException f) {
throw f;
}
}
/**
* Wait for a message to arrive for this mailbox.
*
* @param timeout
* the time, in milliseconds, to wait for a message before
* returning null.
*
* @return an {@link OtpErlangObject OtpErlangObject} representing the body
* of the next message waiting in this mailbox.
*
* @exception OtpErlangDecodeException
* if the message can not be decoded.
*
* @exception OtpErlangExit
* if a linked {@link OtpErlangPid pid} has exited or has
* sent an exit signal to this mailbox.
*/
public OtpErlangObject receive(final long timeout) throws OtpErlangExit,
OtpErlangDecodeException {
try {
final OtpMsg m = receiveMsg(timeout);
if (m != null) {
return m.getMsg();
}
} catch (final OtpErlangExit e) {
throw e;
} catch (final OtpErlangDecodeException f) {
throw f;
} catch (final InterruptedException g) {
}
return null;
}
/**
* Block until a message arrives for this mailbox.
*
* @return a byte array representing the still-encoded body of the next
* message waiting in this mailbox.
*
* @exception OtpErlangExit
* if a linked {@link OtpErlangPid pid} has exited or has
* sent an exit signal to this mailbox.
*
*/
public OtpInputStream receiveBuf() throws OtpErlangExit {
return receiveMsg().getMsgBuf();
}
/**
* Wait for a message to arrive for this mailbox.
*
* @param timeout
* the time, in milliseconds, to wait for a message before
* returning null.
*
* @return a byte array representing the still-encoded body of the next
* message waiting in this mailbox.
*
* @exception OtpErlangExit
* if a linked {@link OtpErlangPid pid} has exited or has
* sent an exit signal to this mailbox.
*
* @exception InterruptedException
* if no message if the method times out before a message
* becomes available.
*/
public OtpInputStream receiveBuf(final long timeout)
throws InterruptedException, OtpErlangExit {
final OtpMsg m = receiveMsg(timeout);
if (m != null) {
return m.getMsgBuf();
}
return null;
}
/**
* Block until a message arrives for this mailbox.
*
* @return an {@link OtpMsg OtpMsg} containing the header information as
* well as the body of the next message waiting in this mailbox.
*
* @exception OtpErlangExit
* if a linked {@link OtpErlangPid pid} has exited or has
* sent an exit signal to this mailbox.
*
*/
public OtpMsg receiveMsg() throws OtpErlangExit {
final OtpMsg m = (OtpMsg) queue.get();
switch (m.type()) {
case OtpMsg.exitTag:
case OtpMsg.exit2Tag:
try {
final OtpErlangObject o = m.getMsg();
throw new OtpErlangExit(o, m.getSenderPid());
} catch (final OtpErlangDecodeException e) {
throw new OtpErlangExit("unknown", m.getSenderPid());
}
default:
return m;
}
}
/**
* Wait for a message to arrive for this mailbox.
*
* @param timeout
* the time, in milliseconds, to wait for a message.
*
* @return an {@link OtpMsg OtpMsg} containing the header information as
* well as the body of the next message waiting in this mailbox.
*
* @exception OtpErlangExit
* if a linked {@link OtpErlangPid pid} has exited or has
* sent an exit signal to this mailbox.
*
* @exception InterruptedException
* if no message if the method times out before a message
* becomes available.
*/
public OtpMsg receiveMsg(final long timeout) throws InterruptedException,
OtpErlangExit {
final OtpMsg m = (OtpMsg) queue.get(timeout);
if (m == null) {
return null;
}
switch (m.type()) {
case OtpMsg.exitTag:
case OtpMsg.exit2Tag:
try {
final OtpErlangObject o = m.getMsg();
throw new OtpErlangExit(o, m.getSenderPid());
} catch (final OtpErlangDecodeException e) {
throw new OtpErlangExit("unknown", m.getSenderPid());
}
default:
return m;
}
}
/**
* Send a message to a remote {@link OtpErlangPid pid}, representing either
* another {@link OtpMbox mailbox} or an Erlang process.
*
* @param to
* the {@link OtpErlangPid pid} identifying the intended
* recipient of the message.
*
* @param msg
* the body of the message to send.
*
*/
public void send(final OtpErlangPid to, final OtpErlangObject msg) {
try {
final String node = to.node();
if (node.equals(home.node())) {
home.deliver(new OtpMsg(to, (OtpErlangObject) msg.clone()));
} else {
final OtpCookedConnection conn = home.getConnection(node);
if (conn == null) {
return;
}
conn.send(self, to, msg);
}
} catch (final Exception e) {
}
}
/**
* Send a message to a named mailbox created from the same node as this
* mailbox.
*
* @param name
* the registered name of recipient mailbox.
*
* @param msg
* the body of the message to send.
*
*/
public void send(final String name, final OtpErlangObject msg) {
home.deliver(new OtpMsg(self, name, (OtpErlangObject) msg.clone()));
}
/**
* Send a message to a named mailbox created from another node.
*
* @param name
* the registered name of recipient mailbox.
*
* @param node
* the name of the remote node where the recipient mailbox is
* registered.
*
* @param msg
* the body of the message to send.
*
*/
public void send(final String name, final String node,
final OtpErlangObject msg) {
try {
final String currentNode = home.node();
if (node.equals(currentNode)) {
send(name, msg);
} else if (node.indexOf('@', 0) < 0
&& node.equals(currentNode.substring(0, currentNode
.indexOf('@', 0)))) {
send(name, msg);
} else {
// other node
final OtpCookedConnection conn = home.getConnection(node);
if (conn == null) {
return;
}
conn.send(self, name, msg);
}
} catch (final Exception e) {
}
}
/**
* Close this mailbox with the given reason.
*
* <p>
* After this operation, the mailbox will no longer be able to receive
* messages. Any delivered but as yet unretrieved messages can still be
* retrieved however.
* </p>
*
* <p>
* If there are links from this mailbox to other {@link OtpErlangPid pids},
* they will be broken when this method is called and exit signals will be
* sent.
* </p>
*
* @param reason
* an Erlang term describing the reason for the exit.
*/
public void exit(final OtpErlangObject reason) {
home.closeMbox(this, reason);
}
/**
* Equivalent to <code>exit(new OtpErlangAtom(reason))</code>.
* </p>
*
* @see #exit(OtpErlangObject)
*/
public void exit(final String reason) {
exit(new OtpErlangAtom(reason));
}
/**
* <p>
* Send an exit signal to a remote {@link OtpErlangPid pid}. This method
* does not cause any links to be broken, except indirectly if the remote
* {@link OtpErlangPid pid} exits as a result of this exit signal.
* </p>
*
* @param to
* the {@link OtpErlangPid pid} to which the exit signal
* should be sent.
*
* @param reason
* an Erlang term indicating the reason for the exit.
*/
// it's called exit, but it sends exit2
public void exit(final OtpErlangPid to, final OtpErlangObject reason) {
exit(2, to, reason);
}
/**
* <p>
* Equivalent to <code>exit(to, new
* OtpErlangAtom(reason))</code>.
* </p>
*
* @see #exit(OtpErlangPid, OtpErlangObject)
*/
public void exit(final OtpErlangPid to, final String reason) {
exit(to, new OtpErlangAtom(reason));
}
// this function used internally when "process" dies
// since Erlang discerns between exit and exit/2.
private void exit(final int arity, final OtpErlangPid to,
final OtpErlangObject reason) {
try {
final String node = to.node();
if (node.equals(home.node())) {
home.deliver(new OtpMsg(OtpMsg.exitTag, self, to, reason));
} else {
final OtpCookedConnection conn = home.getConnection(node);
if (conn == null) {
return;
}
switch (arity) {
case 1:
conn.exit(self, to, reason);
break;
case 2:
conn.exit2(self, to, reason);
break;
}
}
} catch (final Exception e) {
}
}
/**
* <p>
* Link to a remote mailbox or Erlang process. Links are idempotent, calling
* this method multiple times will not result in more than one link being
* created.
* </p>
*
* <p>
* If the remote process subsequently exits or the mailbox is closed, a
* subsequent attempt to retrieve a message through this mailbox will cause
* an {@link OtpErlangExit OtpErlangExit} exception to be raised. Similarly,
* if the sending mailbox is closed, the linked mailbox or process will
* receive an exit signal.
* </p>
*
* <p>
* If the remote process cannot be reached in order to set the link, the
* exception is raised immediately.
* </p>
*
* @param to
* the {@link OtpErlangPid pid} representing the object to
* link to.
*
* @exception OtpErlangExit
* if the {@link OtpErlangPid pid} referred to does not
* exist or could not be reached.
*
*/
public void link(final OtpErlangPid to) throws OtpErlangExit {
try {
final String node = to.node();
if (node.equals(home.node())) {
if (!home.deliver(new OtpMsg(OtpMsg.linkTag, self, to))) {
throw new OtpErlangExit("noproc", to);
}
} else {
final OtpCookedConnection conn = home.getConnection(node);
if (conn != null) {
conn.link(self, to);
} else {
throw new OtpErlangExit("noproc", to);
}
}
} catch (final OtpErlangExit e) {
throw e;
} catch (final Exception e) {
}
links.addLink(self, to);
}
/**
* <p>
* Remove a link to a remote mailbox or Erlang process. This method removes
* a link created with {@link #link link()}. Links are idempotent; calling
* this method once will remove all links between this mailbox and the
* remote {@link OtpErlangPid pid}.
* </p>
*
* @param to
* the {@link OtpErlangPid pid} representing the object to
* unlink from.
*
*/
public void unlink(final OtpErlangPid to) {
links.removeLink(self, to);
try {
final String node = to.node();
if (node.equals(home.node())) {
home.deliver(new OtpMsg(OtpMsg.unlinkTag, self, to));
} else {
final OtpCookedConnection conn = home.getConnection(node);
if (conn != null) {
conn.unlink(self, to);
}
}
} catch (final Exception e) {
}
}
/**
* <p>
* Create a connection to a remote node.
* </p>
*
* <p>
* Strictly speaking, this method is not necessary simply to set up a
* connection, since connections are created automatically first time a
* message is sent to a {@link OtpErlangPid pid} on the remote node.
* </p>
*
* <p>
* This method makes it possible to wait for a node to come up, however, or
* check that a node is still alive.
* </p>
*
* <p>
* This method calls a method with the same name in {@link OtpNode#ping
* Otpnode} but is provided here for convenience.
* </p>
*
* @param node
* the name of the node to ping.
*
* @param timeout
* the time, in milliseconds, before reporting failure.
*/
public boolean ping(final String node, final long timeout) {
return home.ping(node, timeout);
}
/**
* <p>
* Get a list of all known registered names on the same {@link OtpNode node}
* as this mailbox.
* </p>
*
* <p>
* This method calls a method with the same name in {@link OtpNode#getNames
* Otpnode} but is provided here for convenience.
* </p>
*
* @return an array of Strings containing all registered names on this
* {@link OtpNode node}.
*/
public String[] getNames() {
return home.getNames();
}
/**
* Determine the {@link OtpErlangPid pid} corresponding to a registered name
* on this {@link OtpNode node}.
*
* <p>
* This method calls a method with the same name in {@link OtpNode#whereis
* Otpnode} but is provided here for convenience.
* </p>
*
* @return the {@link OtpErlangPid pid} corresponding to the registered
* name, or null if the name is not known on this node.
*/
public OtpErlangPid whereis(final String name) {
return home.whereis(name);
}
/**
* Close this mailbox.
*
* <p>
* After this operation, the mailbox will no longer be able to receive
* messages. Any delivered but as yet unretrieved messages can still be
* retrieved however.
* </p>
*
* <p>
* If there are links from this mailbox to other {@link OtpErlangPid pids},
* they will be broken when this method is called and exit signals with
* reason 'normal' will be sent.
* </p>
*
* <p>
* This is equivalent to {@link #exit(String) exit("normal")}.
* </p>
*/
public void close() {
home.closeMbox(this);
}
@Override
protected void finalize() {
close();
queue.flush();
}
/**
* Determine if two mailboxes are equal.
*
* @return true if both Objects are mailboxes with the same identifying
* {@link OtpErlangPid pids}.
*/
@Override
public boolean equals(final Object o) {
if (!(o instanceof OtpMbox)) {
return false;
}
final OtpMbox m = (OtpMbox) o;
return m.self.equals(self);
}
/*
* called by OtpNode to deliver message to this mailbox.
*
* About exit and exit2: both cause exception to be raised upon receive().
* However exit (not 2) causes any link to be removed as well, while exit2
* leaves any links intact.
*/
void deliver(final OtpMsg m) {
switch (m.type()) {
case OtpMsg.linkTag:
links.addLink(self, m.getSenderPid());
break;
case OtpMsg.unlinkTag:
links.removeLink(self, m.getSenderPid());
break;
case OtpMsg.exitTag:
links.removeLink(self, m.getSenderPid());
queue.put(m);
break;
case OtpMsg.exit2Tag:
default:
queue.put(m);
break;
}
}
// used to break all known links to this mbox
void breakLinks(final OtpErlangObject reason) {
final Link[] l = links.clearLinks();
if (l != null) {
final int len = l.length;
for (int i = 0; i < len; i++) {
exit(1, l[i].remote(), reason);
}
}
}
}
|