1 package org.xvsm.remote.freepastry;
2
3 import java.io.IOException;
4 import java.io.Serializable;
5 import java.net.InetAddress;
6 import java.net.URI;
7 import java.nio.ByteBuffer;
8 import java.util.Hashtable;
9 import java.util.WeakHashMap;
10
11 import org.xvsm.configuration.ConfigurationManager;
12 import org.xvsm.coordinators.FifoCoordinator;
13 import org.xvsm.coordinators.KeyCoordinator;
14 import org.xvsm.core.Capi;
15 import org.xvsm.core.ContainerRef;
16 import org.xvsm.core.Entry;
17 import org.xvsm.interfaces.ICapi;
18 import org.xvsm.interfaces.container.IContainer;
19 import org.xvsm.internal.exceptions.XCoreException;
20 import org.xvsm.lookup.freepastry.FPPastContent;
21 import org.xvsm.lookup.freepastry.FreePastryLookup;
22 import org.xvsm.remote.TransportHandler;
23 import org.xvsm.selectors.FifoSelector;
24
25 import rice.Continuation;
26 import rice.Continuation.ListenerContinuation;
27 import rice.Continuation.NamedContinuation;
28 import rice.Continuation.StandardContinuation;
29 import rice.environment.Environment;
30 import rice.environment.logging.Logger;
31 import rice.p2p.commonapi.CancellableTask;
32 import rice.p2p.commonapi.Endpoint;
33 import rice.p2p.commonapi.Id;
34 import rice.p2p.commonapi.IdRange;
35 import rice.p2p.commonapi.IdSet;
36 import rice.p2p.commonapi.Message;
37 import rice.p2p.commonapi.Node;
38 import rice.p2p.commonapi.NodeHandle;
39 import rice.p2p.commonapi.NodeHandleSet;
40 import rice.p2p.commonapi.RouteMessage;
41 import rice.p2p.commonapi.appsocket.AppSocket;
42 import rice.p2p.commonapi.appsocket.AppSocketReceiver;
43 import rice.p2p.past.PastContent;
44 import rice.p2p.past.PastContentHandle;
45 import rice.p2p.past.PastException;
46 import rice.p2p.past.PastImpl;
47 import rice.p2p.past.PastPolicy;
48 import rice.p2p.past.messaging.CacheMessage;
49 import rice.p2p.past.messaging.ContinuationMessage;
50 import rice.p2p.past.messaging.FetchHandleMessage;
51 import rice.p2p.past.messaging.FetchMessage;
52 import rice.p2p.past.messaging.InsertMessage;
53 import rice.p2p.past.messaging.LookupHandlesMessage;
54 import rice.p2p.past.messaging.LookupMessage;
55 import rice.p2p.past.messaging.MessageLostMessage;
56 import rice.p2p.past.messaging.PastMessage;
57 import rice.p2p.past.rawserialization.PastContentDeserializer;
58 import rice.p2p.past.rawserialization.PastContentHandleDeserializer;
59 import rice.p2p.past.rawserialization.SocketStrategy;
60 import rice.p2p.replication.Replication;
61 import rice.p2p.replication.manager.ReplicationManager;
62 import rice.p2p.replication.manager.ReplicationManagerImpl;
63 import rice.p2p.util.MathUtils;
64 import rice.p2p.util.rawserialization.SimpleOutputBuffer;
65 import rice.persistence.Cache;
66 import rice.persistence.StorageManager;
67
68 /***
69 * overrides the PastImpl class of the FreePastry API and adds the functionality
70 * to replicate containers together with DHT entries
71 *
72 * @author Hannu-Daniel Goiss
73 */
74 public class MyPastImpl extends PastImpl {
75 private Hashtable outstanding;
76
77 private Hashtable timers;
78
79 private String deleteContentString;
80
81 public MyPastImpl(Node node, StorageManager manager, int replicas,
82 String instance) {
83 super(node, manager, replicas, instance);
84
85 this.outstanding = new Hashtable();
86 this.timers = new Hashtable();
87
88 }
89
90 public MyPastImpl(Node node, StorageManager manager, int replicas,
91 String instance, PastPolicy policy) {
92 super(node, manager, replicas, instance, policy);
93
94 this.outstanding = new Hashtable();
95 this.timers = new Hashtable();
96
97 }
98
99 public MyPastImpl(Node node, StorageManager manager, Cache backup,
100 int replicas, String instance, PastPolicy policy,
101 StorageManager trash) {
102 super(node, manager, backup, replicas, instance, policy, trash);
103
104 this.outstanding = new Hashtable();
105 this.timers = new Hashtable();
106 }
107
108 public MyPastImpl(Node node, StorageManager manager, Cache backup,
109 int replicas, String instance, PastPolicy policy,
110 StorageManager trash, boolean useOwnSocket) {
111 super(node, manager, backup, replicas, instance, policy, trash,
112 useOwnSocket);
113
114 this.outstanding = new Hashtable();
115 this.timers = new Hashtable();
116 }
117
118 public MyPastImpl(Node node, StorageManager manager, Cache backup,
119 int replicas, String instance, PastPolicy policy,
120 StorageManager trash, SocketStrategy strategy) {
121 super(node, manager, backup, replicas, instance, policy, trash,
122 strategy);
123
124 this.outstanding = new Hashtable();
125 this.timers = new Hashtable();
126 }
127
128 public String toString() {
129
130 if (endpoint == null)
131 return super.toString();
132 return "MyPastImpl[" + endpoint.getInstance() + "]";
133 }
134
135 public Environment getEnvironment() {
136
137 return environment;
138 }
139
140 protected ReplicationManager buildReplicationManager(Node node,
141 String instance) {
142 return new ReplicationManagerImpl(node, this, replicationFactor,
143 instance);
144 }
145
146 public Continuation[] getOutstandingMessages() {
147 return (Continuation[]) outstanding.values().toArray(
148 new Continuation[0]);
149 }
150
151 public Endpoint getEndpoint() {
152 return endpoint;
153 }
154
155 protected Continuation getResponseContinuation(final PastMessage msg) {
156 if (logger.level <= Logger.FINER)
157 logger.log("Getting the Continuation to respond to the message "
158 + msg);
159 final ContinuationMessage cmsg = (ContinuationMessage) msg;
160
161 return new Continuation() {
162 public void receiveResult(Object o) {
163 cmsg.receiveResult(o);
164 endpoint.route(null, cmsg, msg.getSource());
165 }
166
167 public void receiveException(Exception e) {
168 cmsg.receiveException(e);
169 endpoint.route(null, cmsg, msg.getSource());
170 }
171 };
172 }
173
174 protected Continuation getFetchResponseContinuation(final PastMessage msg) {
175 final ContinuationMessage cmsg = (ContinuationMessage) msg;
176
177 return new Continuation() {
178 public void receiveResult(Object o) {
179 cmsg.receiveResult(o);
180 PastContent content = (PastContent) o;
181 if (socketStrategy.sendAlongSocket(SocketStrategy.TYPE_FETCH,
182 content)) {
183 sendViaSocket(msg.getSource(), cmsg, null);
184 } else {
185 endpoint.route(null, cmsg, msg.getSource());
186 }
187 }
188
189 public void receiveException(Exception e) {
190 cmsg.receiveException(e);
191 endpoint.route(null, cmsg, msg.getSource());
192 }
193 };
194 }
195
196 WeakHashMap pendingSocketTransactions = new WeakHashMap();
197
198 private void sendViaSocket(final NodeHandle handle, final PastMessage m,
199 final Continuation c) {
200 if (c != null) {
201 CancellableTask timer = endpoint.scheduleMessage(
202 new MessageLostMessage(m.getUID(), getLocalNodeHandle(),
203 null, m, handle), MESSAGE_TIMEOUT);
204 insertPending(m.getUID(), timer, c);
205 }
206
207 SimpleOutputBuffer sob = new SimpleOutputBuffer();
208 try {
209 sob.writeInt(0);
210 sob.writeShort(m.getType());
211 m.serialize(sob);
212 } catch (IOException ioe) {
213 if (c != null)
214 c.receiveException(ioe);
215 }
216
217 int size = sob.getWritten() - 4;
218 if (logger.level <= Logger.FINER)
219 logger.log("Sending size of " + size + " to " + handle
220 + " to send " + m);
221 byte[] bytes = sob.getBytes();
222 MathUtils.intToByteArray(size, bytes, 0);
223
224 final ByteBuffer[] bb = new ByteBuffer[1];
225 bb[0] = ByteBuffer.wrap(bytes, 0, sob.getWritten());
226
227 if (logger.level <= Logger.FINE)
228 logger.log("Opening socket to " + handle + " to send " + m);
229 endpoint.connect(handle, new AppSocketReceiver() {
230
231 public void receiveSocket(AppSocket socket) {
232 if (logger.level <= Logger.FINER)
233 logger.log("Opened socket to " + handle + ":" + socket
234 + " to send " + m);
235 socket.register(false, true, 10000, this);
236 }
237
238 public void receiveSelectResult(AppSocket socket, boolean canRead,
239 boolean canWrite) {
240 if (logger.level <= Logger.FINEST)
241 logger.log("Writing to " + handle + ":" + socket
242 + " to send " + m);
243
244 try {
245 socket.write(bb, 0, 1);
246 } catch (IOException ioe) {
247 if (c != null)
248 c.receiveException(ioe);
249 else if (logger.level <= Logger.WARNING)
250 logger.logException("Error sending " + m, ioe);
251 return;
252 }
253 if (bb[0].remaining() > 0) {
254 socket.register(false, true, 10000, this);
255 } else {
256 socket.close();
257 }
258 }
259
260 public void receiveException(AppSocket socket, Exception e) {
261 if (c != null)
262 c.receiveException(e);
263 }
264 }, 10000);
265 }
266
267 protected void sendRequest(Id id, PastMessage message, Continuation command) {
268 sendRequest(id, message, null, command);
269 }
270
271 protected void sendRequest(NodeHandle handle, PastMessage message,
272 Continuation command) {
273 sendRequest(null, message, handle, command);
274 }
275
276 protected void sendRequest(Id id, PastMessage message, NodeHandle hint,
277 Continuation command) {
278 if (logger.level <= Logger.FINER)
279 logger.log("Sending request message " + message + " {"
280 + message.getUID() + "} to id " + id + " via " + hint);
281 CancellableTask timer = endpoint.scheduleMessage(
282 new MessageLostMessage(message.getUID(), getLocalNodeHandle(),
283 id, message, hint), MESSAGE_TIMEOUT);
284 insertPending(message.getUID(), timer, command);
285 endpoint.route(id, message, hint);
286 }
287
288 private void insertPending(int uid, CancellableTask timer,
289 Continuation command) {
290 if (logger.level <= Logger.FINER)
291 logger.log("Loading continuation " + uid + " into pending table");
292 timers.put(new Integer(uid), timer);
293 outstanding.put(new Integer(uid), command);
294 }
295
296 private Continuation removePending(int uid) {
297 if (logger.level <= Logger.FINER)
298 logger.log("Removing and returning continuation " + uid
299 + " from pending table");
300 CancellableTask timer = (CancellableTask) timers
301 .remove(new Integer(uid));
302
303 if (timer != null)
304 timer.cancel();
305
306 return (Continuation) outstanding.remove(new Integer(uid));
307 }
308
309 private void handleResponse(PastMessage message) {
310 if (logger.level <= Logger.FINE)
311 logger.log("handling reponse message " + message
312 + " from the request");
313
314 Continuation command = removePending(message.getUID());
315
316 if (command != null) {
317 message.returnResponse(command, environment, instance);
318 }
319 }
320
321 protected void getHandles(Id id, final int max, Continuation command) {
322 NodeHandleSet set = endpoint.replicaSet(id, max);
323
324 if (set.size() == max) {
325 command.receiveResult(set);
326 } else {
327 sendRequest(id, new LookupHandlesMessage(getUID(), id, max,
328 getLocalNodeHandle(), id),
329 new StandardContinuation(command) {
330 public void receiveResult(Object o) {
331 NodeHandleSet replicas = (NodeHandleSet) o;
332
333 if (Math.min(max, endpoint.replicaSet(
334 endpoint.getLocalNodeHandle().getId(),
335 replicationFactor + 1).size()) > replicas
336 .size())
337 parent
338 .receiveException(new PastException(
339 "Only received "
340 + replicas.size()
341 + " replicas - cannot insert as we know about more nodes."));
342 else
343 parent.receiveResult(replicas);
344 }
345 });
346 }
347 }
348
349 private void cache(final PastContent content) {
350 cache(content, new ListenerContinuation("Caching of " + content,
351 environment));
352 }
353
354 public void cache(final PastContent content, final Continuation command) {
355 if (logger.level <= Logger.FINER)
356 logger.log("Inserting PastContent object " + content
357 + " into cache");
358
359 deleteContentString = ((FPPastContent) content).getContent();
360
361 if ((content != null) && (!content.isMutable())) {
362 storage.cache(content.getId(), null, content, command);
363 } else {
364 command.receiveResult(new Boolean(true));
365 }
366 }
367
368 protected void doInsert(final Id id, final MessageBuilder builder,
369 Continuation command, final boolean useSocket) {
370 getHandles(id, replicationFactor + 1,
371 new StandardContinuation(command) {
372 public void receiveResult(Object o) {
373 NodeHandleSet replicas = (NodeHandleSet) o;
374 if (logger.level <= Logger.FINER)
375 logger.log("Received replicas " + replicas
376 + " for id " + id);
377
378 MultiContinuation multi = new MultiContinuation(parent,
379 replicas.size()) {
380 public boolean isDone() throws Exception {
381 int numSuccess = 0;
382 for (int i = 0; i < haveResult.length; i++)
383 if ((haveResult[i])
384 && (Boolean.TRUE.equals(result[i])))
385 numSuccess++;
386
387 if (numSuccess >= (SUCCESSFUL_INSERT_THRESHOLD * haveResult.length))
388 return true;
389
390 if (super.isDone()) {
391 for (int i = 0; i < result.length; i++)
392 if (result[i] instanceof Exception)
393 if (logger.level <= Logger.WARNING)
394 logger.logException("result["
395 + i + "]:",
396 (Exception) result[i]);
397
398 throw new PastException("Had only "
399 + numSuccess
400 + " successful inserts out of "
401 + result.length + " - aborting.");
402 }
403 return false;
404 }
405
406 public Object getResult() {
407 Boolean[] b = new Boolean[result.length];
408 for (int i = 0; i < b.length; i++)
409 b[i] = new Boolean((result[i] == null)
410 || Boolean.TRUE.equals(result[i]));
411
412 return b;
413 }
414 };
415
416 for (int i = 0; i < replicas.size(); i++) {
417 NodeHandle handle = replicas.getHandle(i);
418 PastMessage m = builder.buildMessage();
419 Continuation c = new NamedContinuation(
420 "InsertMessage to " + replicas.getHandle(i)
421 + " for " + id, multi
422 .getSubContinuation(i));
423 if (useSocket) {
424 sendViaSocket(handle, m, c);
425 } else {
426 sendRequest(handle, m, c);
427 }
428 }
429 }
430 });
431 }
432
433 public void insert(final PastContent obj, final Continuation command) {
434 if (logger.level <= Logger.FINER)
435 logger.log("Inserting the object " + obj + " with the id "
436 + obj.getId());
437
438 if (logger.level <= Logger.FINEST)
439 logger.log(" Inserting data of class " + obj.getClass().getName()
440 + " under " + obj.getId().toStringFull());
441
442 doInsert(obj.getId(), new MessageBuilder() {
443 public PastMessage buildMessage() {
444 return new InsertMessage(getUID(), obj, getLocalNodeHandle(),
445 obj.getId());
446 }
447 }, new StandardContinuation(command) {
448 public void receiveResult(final Object array) {
449 cache(obj, new SimpleContinuation() {
450 public void receiveResult(Object o) {
451 parent.receiveResult(array);
452 }
453 });
454 }
455 }, socketStrategy.sendAlongSocket(SocketStrategy.TYPE_INSERT, obj));
456 }
457
458 public void lookup(final Id id, final Continuation command) {
459 lookup(id, true, command);
460 }
461
462 public void lookup(final Id id, final boolean cache,
463 final Continuation command) {
464 if (logger.level <= Logger.FINER)
465 logger.log(" Performing lookup on " + id.toStringFull());
466
467 storage.getObject(id, new StandardContinuation(command) {
468 public void receiveResult(Object o) {
469 if (o != null) {
470 o = changeResult(o);
471 command.receiveResult(o);
472 } else {
473 sendRequest(id, new LookupMessage(getUID(), id,
474 getLocalNodeHandle(), id), new NamedContinuation(
475 "LookupMessage for " + id, this) {
476 public void receiveResult(final Object o) {
477 if (o != null) {
478 if (cache) {
479 cache((PastContent) o,
480 new SimpleContinuation() {
481 public void receiveResult(
482 Object object) {
483 command.receiveResult(o);
484 }
485 });
486 } else {
487 command.receiveResult(o);
488 }
489 } else {
490 lookupHandles(id, replicationFactor + 1,
491 new Continuation() {
492 public void receiveResult(Object o) {
493 PastContentHandle[] handles = (PastContentHandle[]) o;
494
495 for (int i = 0; i < handles.length; i++) {
496 if (handles[i] != null) {
497 fetch(
498 handles[i],
499 new StandardContinuation(
500 parent) {
501 public void receiveResult(
502 final Object o) {
503 if (cache) {
504 cache(
505 (PastContent) o,
506 new SimpleContinuation() {
507 public void receiveResult(
508 Object object) {
509 command
510 .receiveResult(o);
511 }
512 });
513 } else {
514 command
515 .receiveResult(o);
516 }
517 }
518 });
519
520 return;
521 }
522 }
523
524 command.receiveResult(null);
525 }
526
527 public void receiveException(
528 Exception e) {
529 command.receiveException(e);
530 }
531 });
532 }
533 }
534
535 public void receiveException(Exception e) {
536 receiveResult(null);
537 }
538 });
539 }
540 }
541 });
542 }
543
544 public void lookupHandles(final Id id, int max, final Continuation command) {
545 if (logger.level <= Logger.FINE)
546 logger.log("Retrieving handles of up to " + max
547 + " replicas of the object stored in Past with id " + id);
548
549 if (logger.level <= Logger.FINER)
550 logger.log("Fetching up to " + max + " handles of "
551 + id.toStringFull());
552
553 getHandles(id, max, new StandardContinuation(command) {
554 public void receiveResult(Object o) {
555 NodeHandleSet replicas = (NodeHandleSet) o;
556 if (logger.level <= Logger.FINER)
557 logger.log("Receiving replicas " + replicas
558 + " for lookup Id " + id);
559
560 MultiContinuation multi = new MultiContinuation(parent,
561 replicas.size()) {
562 public Object getResult() {
563 PastContentHandle[] p = new PastContentHandle[result.length];
564
565 for (int i = 0; i < result.length; i++)
566 if (result[i] instanceof PastContentHandle)
567 p[i] = (PastContentHandle) result[i];
568
569 return p;
570 }
571 };
572
573 for (int i = 0; i < replicas.size(); i++)
574 lookupHandle(id, replicas.getHandle(i), multi
575 .getSubContinuation(i));
576 }
577 });
578 }
579
580 public void lookupHandle(Id id, NodeHandle handle, Continuation command) {
581 if (logger.level <= Logger.FINE)
582 logger.log("Retrieving handle for id " + id + " from node "
583 + handle);
584
585 sendRequest(handle, new FetchHandleMessage(getUID(), id,
586 getLocalNodeHandle(), handle.getId()), new NamedContinuation(
587 "FetchHandleMessage to " + handle + " for " + id, command));
588 }
589
590 public void fetch(PastContentHandle handle, Continuation command) {
591 if (logger.level <= Logger.FINE)
592 logger.log("Retrieving object associated with content handle "
593 + handle);
594
595 if (logger.level <= Logger.FINER)
596 logger.log("Fetching object under id "
597 + handle.getId().toStringFull() + " on "
598 + handle.getNodeHandle());
599
600 NodeHandle han = handle.getNodeHandle();
601 sendRequest(han, new FetchMessage(getUID(), handle,
602 getLocalNodeHandle(), han.getId()), new NamedContinuation(
603 "FetchMessage to " + handle.getNodeHandle() + " for "
604 + handle.getId(), command));
605 }
606
607 public NodeHandle getLocalNodeHandle() {
608 return endpoint.getLocalNodeHandle();
609 }
610
611 public int getReplicationFactor() {
612 return replicationFactor;
613 }
614
615 public boolean forward(final RouteMessage message) {
616 Message internal;
617 try {
618 internal = message.getMessage(endpoint.getDeserializer());
619 } catch (IOException ioe) {
620 throw new RuntimeException(ioe);
621 }
622
623 if (internal instanceof LookupMessage) {
624
625 final LookupMessage lmsg = (LookupMessage) internal;
626 Id id = lmsg.getId();
627
628 if (!lmsg.isResponse()) {
629 if (logger.level <= Logger.FINER)
630 logger.log("Lookup message " + lmsg
631 + " is a request; look in the cache");
632 if (storage.exists(id)) {
633 if (logger.level <= Logger.FINE)
634 logger.log("Request for " + id
635 + " satisfied locally - responding");
636 deliver(endpoint.getId(), lmsg);
637
638 return false;
639 }
640 }
641 } else if (internal instanceof LookupHandlesMessage) {
642 LookupHandlesMessage lmsg = (LookupHandlesMessage) internal;
643
644 if (!lmsg.isResponse()) {
645 if (endpoint.replicaSet(lmsg.getId(), lmsg.getMax()).size() == lmsg
646 .getMax()) {
647 if (logger.level <= Logger.FINE)
648 logger.log("Hijacking lookup handles request for "
649 + lmsg.getId());
650
651 deliver(endpoint.getId(), lmsg);
652
653 return false;
654 }
655 }
656 }
657 return true;
658 }
659
660 public void deliver(Id id, Message message) {
661 final PastMessage msg = (PastMessage) message;
662
663 if (msg.isResponse()) {
664 handleResponse((PastMessage) message);
665 } else {
666 if (logger.level <= Logger.INFO)
667 logger.log("Received message " + message + " with destination "
668 + id);
669
670 if (msg instanceof InsertMessage) {
671 final InsertMessage imsg = (InsertMessage) msg;
672
673 if (policy.allowInsert(imsg.getContent())) {
674 inserts++;
675 final Id msgid = imsg.getContent().getId();
676
677 lockManager.lock(msgid, new StandardContinuation(
678 getResponseContinuation(msg)) {
679
680 public void receiveResult(Object result) {
681 storage.getObject(msgid, new StandardContinuation(
682 parent) {
683 public void receiveResult(Object o) {
684 try {
685 final PastContent content = imsg
686 .getContent().checkInsert(
687 msgid, (PastContent) o);
688 storage
689 .store(
690 msgid,
691 null,
692 content,
693 new StandardContinuation(
694 parent) {
695 public void receiveResult(
696 Object result) {
697 getResponseContinuation(
698 msg)
699 .receiveResult(
700 result);
701 lockManager
702 .unlock(msgid);
703
704 try {
705 createReplica(
706 content,
707 false,
708 new URI(
709 "tcpjava://"
710 + InetAddress
711 .getLocalHost()
712 .getHostAddress()
713 + ":"
714 + (FreePastryConnection
715 .getInstance()
716 .getLocalport() + 1000)));
717 } catch (Exception e) {
718 e
719 .printStackTrace();
720 }
721
722 }
723 });
724 } catch (PastException e) {
725 parent.receiveException(e);
726 }
727 }
728 });
729 }
730 });
731 } else {
732 getResponseContinuation(msg).receiveResult(
733 new Boolean(false));
734 }
735 } else if (msg instanceof LookupMessage) {
736 final LookupMessage lmsg = (LookupMessage) msg;
737 lookups++;
738
739 storage.getObject(lmsg.getId(), new StandardContinuation(
740 getResponseContinuation(lmsg)) {
741 public void receiveResult(Object o) {
742 if (logger.level <= Logger.FINE)
743 logger.log("Received object " + o + " for id "
744 + lmsg.getId());
745
746 o = changeResult(o);
747
748 parent.receiveResult(o);
749
750 }
751 });
752 } else if (msg instanceof LookupHandlesMessage) {
753 LookupHandlesMessage lmsg = (LookupHandlesMessage) msg;
754 NodeHandleSet set = endpoint.replicaSet(lmsg.getId(), lmsg
755 .getMax());
756 if (logger.level <= Logger.FINER)
757 logger.log("Returning replica set " + set
758 + " for lookup handles of id " + lmsg.getId()
759 + " max " + lmsg.getMax() + " at "
760 + endpoint.getId());
761 getResponseContinuation(msg).receiveResult(set);
762 } else if (msg instanceof FetchMessage) {
763 FetchMessage fmsg = (FetchMessage) msg;
764 lookups++;
765
766 Continuation c;
767 c = getFetchResponseContinuation(msg);
768
769 storage.getObject(fmsg.getHandle().getId(), c);
770 } else if (msg instanceof FetchHandleMessage) {
771 final FetchHandleMessage fmsg = (FetchHandleMessage) msg;
772 fetchHandles++;
773
774 storage.getObject(fmsg.getId(), new StandardContinuation(
775 getResponseContinuation(msg)) {
776 public void receiveResult(Object o) {
777 PastContent content = (PastContent) o;
778
779 if (content != null) {
780 if (logger.level <= Logger.FINE)
781 logger
782 .log("Retrieved data for fetch handles of id "
783 + fmsg.getId());
784 parent.receiveResult(content
785 .getHandle(MyPastImpl.this));
786 } else {
787 parent.receiveResult(null);
788 }
789 }
790 });
791 } else if (msg instanceof CacheMessage) {
792 cache(((CacheMessage) msg).getContent());
793 } else {
794 if (logger.level <= Logger.SEVERE)
795 logger.log("ERROR - Received message " + msg
796 + "of unknown type.");
797 }
798 }
799 }
800
801 public void update(NodeHandle handle, boolean joined) {
802 }
803
804 public static ContainerRef createReplica(Object o, boolean checkFirst) {
805 return createReplica((((FPPastContent) o).getContent()).toString(),
806 checkFirst, null);
807 }
808
809 public static ContainerRef createReplica(String contentString,
810 boolean checkFirst) {
811 return createReplica(contentString, checkFirst, null);
812 }
813
814 public static ContainerRef createReplica(Object o, boolean checkFirst,
815 URI homeSite) {
816 return createReplica((((FPPastContent) o).getContent()).toString(),
817 checkFirst, homeSite);
818 }
819
820 public static ContainerRef createReplica(String contentString,
821 boolean checkFirst, URI homeSite) {
822 ContainerRef cref = null;
823
824
825 ICapi capi;
826 try {
827
828 capi = new Capi();
829 } catch (XCoreException xce) {
830 throw new RuntimeException("this should not happen");
831 }
832
833 if (checkFirst = true) {
834
835
836
837 try {
838 cref = capi.lookupContainer(null, homeSite, contentString);
839
840 return cref;
841 } catch (Exception e) {
842 try {
843 URI contentURI = new URI(contentString);
844 if (contentURI.getPort() > 0) {
845 contentString = FreePastryLookup
846 .getContainerName(new ContainerRef(contentURI));
847 }
848
849 } catch (Exception e1) {
850 }
851
852 }
853 }
854
855 try {
856 cref = capi.createContainer(null, homeSite, contentString,
857 IContainer.INFINITE_SIZE, new FifoCoordinator(),
858 new KeyCoordinator());
859
860 boolean gemacht = false;
861
862 FreePastryConnection fp = FreePastryConnection.getInstance();
863 NodeHandleSet handles = fp.getEndpoint().replicaSet(
864 fp.getPastryIdFactory().buildId(contentString),
865 fp.getNumberOfContainers());
866
867
868 for (int i = 0; i < handles.size() && gemacht == false; i++) {
869 String handleString = handles.getHandle(i).toString();
870 String ip = handleString.substring(handleString
871 .lastIndexOf("/") + 1, handleString.lastIndexOf(":"));
872 int port = Integer.parseInt(handleString.substring(handleString
873 .lastIndexOf(":") + 1,
874 handleString.lastIndexOf(":") + 5));
875
876 int compPort;
877
878 if (homeSite == null)
879 compPort = TransportHandler.getInstance()
880 .getListener(
881 ConfigurationManager.getInstance()
882 .getStringSetting(
883 "DefaultAnswerToProtocol"))
884 .getUri().getPort();
885
886 else
887 compPort = homeSite.getPort();
888
889 port = port + 1000;
890
891 if (compPort != port) {
892
893 URI site = new URI("tcpjava://" + ip + ":" + port);
894
895 try {
896 ContainerRef nCref = capi.lookupContainer(null, site,
897 contentString);
898
899 Entry[] entries = capi.read(nCref, 0, null,
900 new FifoSelector(FifoSelector.CNT_ALL));
901
902 for (Entry e : entries) {
903 capi.write(cref, 0L, null, e);
904 }
905
906 gemacht = true;
907 } catch (Exception ex) {
908 }
909
910 }
911 }
912
913
914
915
916
917 } catch (Exception e) {
918 e.printStackTrace();
919 }
920 return cref;
921
922 }
923
924 public static ContainerRef createReplica(String contentString,
925 URI homeSite, URI originalSite) {
926 ContainerRef cref = null;
927
928
929 ICapi capi;
930 try {
931
932 capi = new Capi();
933 } catch (XCoreException xce) {
934 throw new RuntimeException("this should not happen");
935 }
936
937 try {
938 cref = capi.lookupContainer(null, homeSite, contentString);
939
940 return cref;
941 } catch (Exception e) {
942 try {
943 URI contentURI = new URI(contentString);
944 if (contentURI.getPort() > 0) {
945 contentString = FreePastryLookup
946 .getContainerName(new ContainerRef(contentURI));
947 }
948
949 } catch (Exception e1) {
950 }
951
952 }
953
954 try {
955 cref = capi.createContainer(null, homeSite, contentString,
956 IContainer.INFINITE_SIZE, new FifoCoordinator(),
957 new KeyCoordinator());
958
959 boolean gemacht = false;
960
961 try {
962 ContainerRef nCref = capi.lookupContainer(null, originalSite,
963 contentString);
964
965 Entry[] entries = capi.read(nCref, 0, null, new FifoSelector(
966 FifoSelector.CNT_ALL));
967
968 for (Entry e : entries) {
969 capi.write(cref, 0L, null, e);
970 }
971
972 gemacht = true;
973 } catch (Exception ex) {
974 }
975
976
977
978
979
980
981 } catch (Exception e) {
982 e.printStackTrace();
983 }
984 return cref;
985
986 }
987
988 private Object changeResult(Object o) {
989
990 if (o == null || ((FPPastContent) o).getContent() == null)
991 return o;
992
993 try {
994 ContainerRef newCref = null;
995
996 ICapi capi;
997 try {
998
999 capi = new Capi();
1000 } catch (XCoreException xce) {
1001 throw new RuntimeException("this should not happen");
1002 }
1003
1004 newCref = createReplica(o, true, new URI(
1005 "tcpjava://"
1006 + InetAddress.getLocalHost().getHostAddress()
1007 + ":"
1008 + (FreePastryConnection.getInstance()
1009 .getLocalport() + 1000)));
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021 ((FPPastContent) o).setContent(newCref.asURI().toString());
1022
1023 } catch (Exception e) {
1024 e.printStackTrace();
1025 }
1026
1027 return o;
1028 }
1029
1030 public void fetch(final Id id, NodeHandle hint, Continuation command) {
1031 if (logger.level <= Logger.FINER)
1032 logger
1033 .log("Sending out replication fetch request for the id "
1034 + id);
1035
1036 policy.fetch(id, hint, backup, this, new StandardContinuation(command) {
1037 public void receiveResult(Object o) {
1038 if (o == null) {
1039 if (logger.level <= Logger.WARNING)
1040 logger.log("Could not fetch id " + id
1041 + " - policy returned null in namespace "
1042 + instance);
1043 parent.receiveResult(new Boolean(false));
1044 } else {
1045 if (logger.level <= Logger.FINEST)
1046 logger.log("inserting replica of id " + id);
1047
1048 createReplica(o, true);
1049
1050 if (!(o instanceof PastContent))
1051 if (logger.level <= Logger.WARNING)
1052 logger.log("ERROR! Not PastContent "
1053 + o.getClass().getName() + " " + o);
1054 storage.getStorage().store(((PastContent) o).getId(), null,
1055 (PastContent) o, parent);
1056 }
1057 }
1058 });
1059 }
1060
1061 public void remove(final Id id, Continuation command) {
1062
1063
1064
1065
1066
1067
1068
1069 if (backup != null) {
1070 storage.getObject(id, new StandardContinuation(command) {
1071 public void receiveResult(Object o) {
1072
1073 try {
1074 ICapi capi;
1075 try {
1076
1077 capi = new Capi();
1078 } catch (XCoreException xce) {
1079 throw new RuntimeException("this should not happen");
1080 }
1081
1082 capi.destroyContainer(null, capi.lookupContainer(null,
1083 null, (((FPPastContent) o).getContent())
1084 .toString()));
1085 } catch (Exception e) {
1086 throw new RuntimeException(e);
1087 }
1088
1089 backup.cache(id, storage.getMetadata(id), (Serializable) o,
1090 new StandardContinuation(parent) {
1091 public void receiveResult(Object o) {
1092 storage.unstore(id, parent);
1093 }
1094 });
1095 }
1096 });
1097 } else {
1098
1099 storage.getObject(id, new StandardContinuation(command) {
1100 public void receiveResult(Object o) {
1101
1102 try {
1103
1104 ICapi capi;
1105 try {
1106
1107 capi = new Capi();
1108 } catch (XCoreException xce) {
1109 throw new RuntimeException("this should not happen");
1110 }
1111 capi.destroyContainer(null, capi.lookupContainer(null,
1112 null, (((FPPastContent) o).getContent())
1113 .toString()));
1114 } catch (Exception e) {
1115 e.printStackTrace();
1116 }
1117 }
1118 });
1119
1120 storage.unstore(id, command);
1121 }
1122 }
1123
1124 public IdSet scan(IdRange range) {
1125 return storage.getStorage().scan(range);
1126 }
1127
1128 public IdSet scan() {
1129 return storage.getStorage().scan();
1130 }
1131
1132 public boolean exists(Id id) {
1133 return storage.getStorage().exists(id);
1134 }
1135
1136 public void existsInOverlay(Id id, Continuation command) {
1137 lookupHandles(id, replicationFactor + 1, new StandardContinuation(
1138 command) {
1139 public void receiveResult(Object result) {
1140 Object results[] = (Object[]) result;
1141 for (int i = 0; i < results.length; i++) {
1142 if (results[i] instanceof PastContentHandle) {
1143 parent.receiveResult(Boolean.TRUE);
1144 return;
1145 }
1146 }
1147 parent.receiveResult(Boolean.FALSE);
1148 }
1149 });
1150 }
1151
1152 public void reInsert(Id id, Continuation command) {
1153 storage.getObject(id, new StandardContinuation(command) {
1154 public void receiveResult(final Object o) {
1155 insert((PastContent) o, new StandardContinuation(parent) {
1156 public void receiveResult(Object result) {
1157 Boolean results[] = (Boolean[]) result;
1158 for (int i = 0; i < results.length; i++) {
1159 if (results[i].booleanValue()) {
1160 parent.receiveResult(Boolean.TRUE);
1161 return;
1162 }
1163 }
1164 parent.receiveResult(Boolean.FALSE);
1165 }
1166 });
1167 }
1168 });
1169 }
1170
1171 public Replication getReplication() {
1172 return replicaManager.getReplication();
1173 }
1174
1175 public StorageManager getStorageManager() {
1176 return storage;
1177 }
1178
1179 public interface MessageBuilder {
1180 public PastMessage buildMessage();
1181 }
1182
1183 public String getInstance() {
1184 return instance;
1185 }
1186
1187 public void setContentDeserializer(PastContentDeserializer deserializer) {
1188 contentDeserializer = deserializer;
1189 }
1190
1191 public void setContentHandleDeserializer(
1192 PastContentHandleDeserializer deserializer) {
1193 contentHandleDeserializer = deserializer;
1194 }
1195 }