View Javadoc

1   package org.xvsm.remote.freepastry;
2   
3   import java.io.IOException;
4   import java.io.Serializable;
5   import java.net.InetAddress;
6   import java.net.URI;
7   import java.nio.ByteBuffer;
8   import java.util.Hashtable;
9   import java.util.WeakHashMap;
10  
11  import org.xvsm.configuration.ConfigurationManager;
12  import org.xvsm.coordinators.FifoCoordinator;
13  import org.xvsm.coordinators.KeyCoordinator;
14  import org.xvsm.core.Capi;
15  import org.xvsm.core.ContainerRef;
16  import org.xvsm.core.Entry;
17  import org.xvsm.interfaces.ICapi;
18  import org.xvsm.interfaces.container.IContainer;
19  import org.xvsm.internal.exceptions.XCoreException;
20  import org.xvsm.lookup.freepastry.FPPastContent;
21  import org.xvsm.lookup.freepastry.FreePastryLookup;
22  import org.xvsm.remote.TransportHandler;
23  import org.xvsm.selectors.FifoSelector;
24  
25  import rice.Continuation;
26  import rice.Continuation.ListenerContinuation;
27  import rice.Continuation.NamedContinuation;
28  import rice.Continuation.StandardContinuation;
29  import rice.environment.Environment;
30  import rice.environment.logging.Logger;
31  import rice.p2p.commonapi.CancellableTask;
32  import rice.p2p.commonapi.Endpoint;
33  import rice.p2p.commonapi.Id;
34  import rice.p2p.commonapi.IdRange;
35  import rice.p2p.commonapi.IdSet;
36  import rice.p2p.commonapi.Message;
37  import rice.p2p.commonapi.Node;
38  import rice.p2p.commonapi.NodeHandle;
39  import rice.p2p.commonapi.NodeHandleSet;
40  import rice.p2p.commonapi.RouteMessage;
41  import rice.p2p.commonapi.appsocket.AppSocket;
42  import rice.p2p.commonapi.appsocket.AppSocketReceiver;
43  import rice.p2p.past.PastContent;
44  import rice.p2p.past.PastContentHandle;
45  import rice.p2p.past.PastException;
46  import rice.p2p.past.PastImpl;
47  import rice.p2p.past.PastPolicy;
48  import rice.p2p.past.messaging.CacheMessage;
49  import rice.p2p.past.messaging.ContinuationMessage;
50  import rice.p2p.past.messaging.FetchHandleMessage;
51  import rice.p2p.past.messaging.FetchMessage;
52  import rice.p2p.past.messaging.InsertMessage;
53  import rice.p2p.past.messaging.LookupHandlesMessage;
54  import rice.p2p.past.messaging.LookupMessage;
55  import rice.p2p.past.messaging.MessageLostMessage;
56  import rice.p2p.past.messaging.PastMessage;
57  import rice.p2p.past.rawserialization.PastContentDeserializer;
58  import rice.p2p.past.rawserialization.PastContentHandleDeserializer;
59  import rice.p2p.past.rawserialization.SocketStrategy;
60  import rice.p2p.replication.Replication;
61  import rice.p2p.replication.manager.ReplicationManager;
62  import rice.p2p.replication.manager.ReplicationManagerImpl;
63  import rice.p2p.util.MathUtils;
64  import rice.p2p.util.rawserialization.SimpleOutputBuffer;
65  import rice.persistence.Cache;
66  import rice.persistence.StorageManager;
67  
68  /***
69   * overrides the PastImpl class of the FreePastry API and adds the functionality
70   * to replicate containers together with DHT entries
71   * 
72   * @author Hannu-Daniel Goiss
73   */
74  public class MyPastImpl extends PastImpl {
75  	private Hashtable outstanding;
76  
77  	private Hashtable timers;
78  
79  	private String deleteContentString;
80  
81  	public MyPastImpl(Node node, StorageManager manager, int replicas,
82  			String instance) {
83  		super(node, manager, replicas, instance);
84  
85  		this.outstanding = new Hashtable();
86  		this.timers = new Hashtable();
87  
88  	}
89  
90  	public MyPastImpl(Node node, StorageManager manager, int replicas,
91  			String instance, PastPolicy policy) {
92  		super(node, manager, replicas, instance, policy);
93  
94  		this.outstanding = new Hashtable();
95  		this.timers = new Hashtable();
96  
97  	}
98  
99  	public MyPastImpl(Node node, StorageManager manager, Cache backup,
100 			int replicas, String instance, PastPolicy policy,
101 			StorageManager trash) {
102 		super(node, manager, backup, replicas, instance, policy, trash);
103 
104 		this.outstanding = new Hashtable();
105 		this.timers = new Hashtable();
106 	}
107 
108 	public MyPastImpl(Node node, StorageManager manager, Cache backup,
109 			int replicas, String instance, PastPolicy policy,
110 			StorageManager trash, boolean useOwnSocket) {
111 		super(node, manager, backup, replicas, instance, policy, trash,
112 				useOwnSocket);
113 
114 		this.outstanding = new Hashtable();
115 		this.timers = new Hashtable();
116 	}
117 
118 	public MyPastImpl(Node node, StorageManager manager, Cache backup,
119 			int replicas, String instance, PastPolicy policy,
120 			StorageManager trash, SocketStrategy strategy) {
121 		super(node, manager, backup, replicas, instance, policy, trash,
122 				strategy);
123 
124 		this.outstanding = new Hashtable();
125 		this.timers = new Hashtable();
126 	}
127 
128 	public String toString() {
129 
130 		if (endpoint == null)
131 			return super.toString();
132 		return "MyPastImpl[" + endpoint.getInstance() + "]";
133 	}
134 
135 	public Environment getEnvironment() {
136 
137 		return environment;
138 	}
139 
140 	protected ReplicationManager buildReplicationManager(Node node,
141 			String instance) {
142 		return new ReplicationManagerImpl(node, this, replicationFactor,
143 				instance);
144 	}
145 
146 	public Continuation[] getOutstandingMessages() {
147 		return (Continuation[]) outstanding.values().toArray(
148 				new Continuation[0]);
149 	}
150 
151 	public Endpoint getEndpoint() {
152 		return endpoint;
153 	}
154 
155 	protected Continuation getResponseContinuation(final PastMessage msg) {
156 		if (logger.level <= Logger.FINER)
157 			logger.log("Getting the Continuation to respond to the message "
158 					+ msg);
159 		final ContinuationMessage cmsg = (ContinuationMessage) msg;
160 
161 		return new Continuation() {
162 			public void receiveResult(Object o) {
163 				cmsg.receiveResult(o);
164 				endpoint.route(null, cmsg, msg.getSource());
165 			}
166 
167 			public void receiveException(Exception e) {
168 				cmsg.receiveException(e);
169 				endpoint.route(null, cmsg, msg.getSource());
170 			}
171 		};
172 	}
173 
174 	protected Continuation getFetchResponseContinuation(final PastMessage msg) {
175 		final ContinuationMessage cmsg = (ContinuationMessage) msg;
176 
177 		return new Continuation() {
178 			public void receiveResult(Object o) {
179 				cmsg.receiveResult(o);
180 				PastContent content = (PastContent) o;
181 				if (socketStrategy.sendAlongSocket(SocketStrategy.TYPE_FETCH,
182 						content)) {
183 					sendViaSocket(msg.getSource(), cmsg, null);
184 				} else {
185 					endpoint.route(null, cmsg, msg.getSource());
186 				}
187 			}
188 
189 			public void receiveException(Exception e) {
190 				cmsg.receiveException(e);
191 				endpoint.route(null, cmsg, msg.getSource());
192 			}
193 		};
194 	}
195 
196 	WeakHashMap pendingSocketTransactions = new WeakHashMap();
197 
198 	private void sendViaSocket(final NodeHandle handle, final PastMessage m,
199 			final Continuation c) {
200 		if (c != null) {
201 			CancellableTask timer = endpoint.scheduleMessage(
202 					new MessageLostMessage(m.getUID(), getLocalNodeHandle(),
203 							null, m, handle), MESSAGE_TIMEOUT);
204 			insertPending(m.getUID(), timer, c);
205 		}
206 
207 		SimpleOutputBuffer sob = new SimpleOutputBuffer();
208 		try {
209 			sob.writeInt(0); // place holder for size...
210 			sob.writeShort(m.getType());
211 			m.serialize(sob);
212 		} catch (IOException ioe) {
213 			if (c != null)
214 				c.receiveException(ioe);
215 		}
216 
217 		int size = sob.getWritten() - 4;
218 		if (logger.level <= Logger.FINER)
219 			logger.log("Sending size of " + size + " to " + handle
220 					+ " to send " + m);
221 		byte[] bytes = sob.getBytes();
222 		MathUtils.intToByteArray(size, bytes, 0);
223 
224 		final ByteBuffer[] bb = new ByteBuffer[1];
225 		bb[0] = ByteBuffer.wrap(bytes, 0, sob.getWritten());
226 
227 		if (logger.level <= Logger.FINE)
228 			logger.log("Opening socket to " + handle + " to send " + m);
229 		endpoint.connect(handle, new AppSocketReceiver() {
230 
231 			public void receiveSocket(AppSocket socket) {
232 				if (logger.level <= Logger.FINER)
233 					logger.log("Opened socket to " + handle + ":" + socket
234 							+ " to send " + m);
235 				socket.register(false, true, 10000, this);
236 			}
237 
238 			public void receiveSelectResult(AppSocket socket, boolean canRead,
239 					boolean canWrite) {
240 				if (logger.level <= Logger.FINEST)
241 					logger.log("Writing to " + handle + ":" + socket
242 							+ " to send " + m);
243 
244 				try {
245 					socket.write(bb, 0, 1);
246 				} catch (IOException ioe) {
247 					if (c != null)
248 						c.receiveException(ioe);
249 					else if (logger.level <= Logger.WARNING)
250 						logger.logException("Error sending " + m, ioe);
251 					return;
252 				}
253 				if (bb[0].remaining() > 0) {
254 					socket.register(false, true, 10000, this);
255 				} else {
256 					socket.close();
257 				}
258 			}
259 
260 			public void receiveException(AppSocket socket, Exception e) {
261 				if (c != null)
262 					c.receiveException(e);
263 			}
264 		}, 10000);
265 	}
266 
267 	protected void sendRequest(Id id, PastMessage message, Continuation command) {
268 		sendRequest(id, message, null, command);
269 	}
270 
271 	protected void sendRequest(NodeHandle handle, PastMessage message,
272 			Continuation command) {
273 		sendRequest(null, message, handle, command);
274 	}
275 
276 	protected void sendRequest(Id id, PastMessage message, NodeHandle hint,
277 			Continuation command) {
278 		if (logger.level <= Logger.FINER)
279 			logger.log("Sending request message " + message + " {"
280 					+ message.getUID() + "} to id " + id + " via " + hint);
281 		CancellableTask timer = endpoint.scheduleMessage(
282 				new MessageLostMessage(message.getUID(), getLocalNodeHandle(),
283 						id, message, hint), MESSAGE_TIMEOUT);
284 		insertPending(message.getUID(), timer, command);
285 		endpoint.route(id, message, hint);
286 	}
287 
288 	private void insertPending(int uid, CancellableTask timer,
289 			Continuation command) {
290 		if (logger.level <= Logger.FINER)
291 			logger.log("Loading continuation " + uid + " into pending table");
292 		timers.put(new Integer(uid), timer);
293 		outstanding.put(new Integer(uid), command);
294 	}
295 
296 	private Continuation removePending(int uid) {
297 		if (logger.level <= Logger.FINER)
298 			logger.log("Removing and returning continuation " + uid
299 					+ " from pending table");
300 		CancellableTask timer = (CancellableTask) timers
301 				.remove(new Integer(uid));
302 
303 		if (timer != null)
304 			timer.cancel();
305 
306 		return (Continuation) outstanding.remove(new Integer(uid));
307 	}
308 
309 	private void handleResponse(PastMessage message) {
310 		if (logger.level <= Logger.FINE)
311 			logger.log("handling reponse message " + message
312 					+ " from the request");
313 
314 		Continuation command = removePending(message.getUID());
315 
316 		if (command != null) {
317 			message.returnResponse(command, environment, instance);
318 		}
319 	}
320 
321 	protected void getHandles(Id id, final int max, Continuation command) {
322 		NodeHandleSet set = endpoint.replicaSet(id, max);
323 
324 		if (set.size() == max) {
325 			command.receiveResult(set);
326 		} else {
327 			sendRequest(id, new LookupHandlesMessage(getUID(), id, max,
328 					getLocalNodeHandle(), id),
329 					new StandardContinuation(command) {
330 						public void receiveResult(Object o) {
331 							NodeHandleSet replicas = (NodeHandleSet) o;
332 
333 							if (Math.min(max, endpoint.replicaSet(
334 									endpoint.getLocalNodeHandle().getId(),
335 									replicationFactor + 1).size()) > replicas
336 									.size())
337 								parent
338 										.receiveException(new PastException(
339 												"Only received "
340 														+ replicas.size()
341 														+ " replicas - cannot insert as we know about more nodes."));
342 							else
343 								parent.receiveResult(replicas);
344 						}
345 					});
346 		}
347 	}
348 
349 	private void cache(final PastContent content) {
350 		cache(content, new ListenerContinuation("Caching of " + content,
351 				environment));
352 	}
353 
354 	public void cache(final PastContent content, final Continuation command) {
355 		if (logger.level <= Logger.FINER)
356 			logger.log("Inserting PastContent object " + content
357 					+ " into cache");
358 
359 		deleteContentString = ((FPPastContent) content).getContent();
360 
361 		if ((content != null) && (!content.isMutable())) {
362 			storage.cache(content.getId(), null, content, command);
363 		} else {
364 			command.receiveResult(new Boolean(true));
365 		}
366 	}
367 
368 	protected void doInsert(final Id id, final MessageBuilder builder,
369 			Continuation command, final boolean useSocket) {
370 		getHandles(id, replicationFactor + 1,
371 				new StandardContinuation(command) {
372 					public void receiveResult(Object o) {
373 						NodeHandleSet replicas = (NodeHandleSet) o;
374 						if (logger.level <= Logger.FINER)
375 							logger.log("Received replicas " + replicas
376 									+ " for id " + id);
377 
378 						MultiContinuation multi = new MultiContinuation(parent,
379 								replicas.size()) {
380 							public boolean isDone() throws Exception {
381 								int numSuccess = 0;
382 								for (int i = 0; i < haveResult.length; i++)
383 									if ((haveResult[i])
384 											&& (Boolean.TRUE.equals(result[i])))
385 										numSuccess++;
386 
387 								if (numSuccess >= (SUCCESSFUL_INSERT_THRESHOLD * haveResult.length))
388 									return true;
389 
390 								if (super.isDone()) {
391 									for (int i = 0; i < result.length; i++)
392 										if (result[i] instanceof Exception)
393 											if (logger.level <= Logger.WARNING)
394 												logger.logException("result["
395 														+ i + "]:",
396 														(Exception) result[i]);
397 
398 									throw new PastException("Had only "
399 											+ numSuccess
400 											+ " successful inserts out of "
401 											+ result.length + " - aborting.");
402 								}
403 								return false;
404 							}
405 
406 							public Object getResult() {
407 								Boolean[] b = new Boolean[result.length];
408 								for (int i = 0; i < b.length; i++)
409 									b[i] = new Boolean((result[i] == null)
410 											|| Boolean.TRUE.equals(result[i]));
411 
412 								return b;
413 							}
414 						};
415 
416 						for (int i = 0; i < replicas.size(); i++) {
417 							NodeHandle handle = replicas.getHandle(i);
418 							PastMessage m = builder.buildMessage();
419 							Continuation c = new NamedContinuation(
420 									"InsertMessage to " + replicas.getHandle(i)
421 											+ " for " + id, multi
422 											.getSubContinuation(i));
423 							if (useSocket) {
424 								sendViaSocket(handle, m, c);
425 							} else {
426 								sendRequest(handle, m, c);
427 							}
428 						}
429 					}
430 				});
431 	}
432 
433 	public void insert(final PastContent obj, final Continuation command) {
434 		if (logger.level <= Logger.FINER)
435 			logger.log("Inserting the object " + obj + " with the id "
436 					+ obj.getId());
437 
438 		if (logger.level <= Logger.FINEST)
439 			logger.log(" Inserting data of class " + obj.getClass().getName()
440 					+ " under " + obj.getId().toStringFull());
441 
442 		doInsert(obj.getId(), new MessageBuilder() {
443 			public PastMessage buildMessage() {
444 				return new InsertMessage(getUID(), obj, getLocalNodeHandle(),
445 						obj.getId());
446 			}
447 		}, new StandardContinuation(command) {
448 			public void receiveResult(final Object array) {
449 				cache(obj, new SimpleContinuation() {
450 					public void receiveResult(Object o) {
451 						parent.receiveResult(array);
452 					}
453 				});
454 			}
455 		}, socketStrategy.sendAlongSocket(SocketStrategy.TYPE_INSERT, obj));
456 	}
457 
458 	public void lookup(final Id id, final Continuation command) {
459 		lookup(id, true, command);
460 	}
461 
462 	public void lookup(final Id id, final boolean cache,
463 			final Continuation command) {
464 		if (logger.level <= Logger.FINER)
465 			logger.log(" Performing lookup on " + id.toStringFull());
466 
467 		storage.getObject(id, new StandardContinuation(command) {
468 			public void receiveResult(Object o) {
469 				if (o != null) {
470 					o = changeResult(o);
471 					command.receiveResult(o);
472 				} else {
473 					sendRequest(id, new LookupMessage(getUID(), id,
474 							getLocalNodeHandle(), id), new NamedContinuation(
475 							"LookupMessage for " + id, this) {
476 						public void receiveResult(final Object o) {
477 							if (o != null) {
478 								if (cache) {
479 									cache((PastContent) o,
480 											new SimpleContinuation() {
481 												public void receiveResult(
482 														Object object) {
483 													command.receiveResult(o);
484 												}
485 											});
486 								} else {
487 									command.receiveResult(o);
488 								}
489 							} else {
490 								lookupHandles(id, replicationFactor + 1,
491 										new Continuation() {
492 											public void receiveResult(Object o) {
493 												PastContentHandle[] handles = (PastContentHandle[]) o;
494 
495 												for (int i = 0; i < handles.length; i++) {
496 													if (handles[i] != null) {
497 														fetch(
498 																handles[i],
499 																new StandardContinuation(
500 																		parent) {
501 																	public void receiveResult(
502 																			final Object o) {
503 																		if (cache) {
504 																			cache(
505 																					(PastContent) o,
506 																					new SimpleContinuation() {
507 																						public void receiveResult(
508 																								Object object) {
509 																							command
510 																									.receiveResult(o);
511 																						}
512 																					});
513 																		} else {
514 																			command
515 																					.receiveResult(o);
516 																		}
517 																	}
518 																});
519 
520 														return;
521 													}
522 												}
523 
524 												command.receiveResult(null);
525 											}
526 
527 											public void receiveException(
528 													Exception e) {
529 												command.receiveException(e);
530 											}
531 										});
532 							}
533 						}
534 
535 						public void receiveException(Exception e) {
536 							receiveResult(null);
537 						}
538 					});
539 				}
540 			}
541 		});
542 	}
543 
544 	public void lookupHandles(final Id id, int max, final Continuation command) {
545 		if (logger.level <= Logger.FINE)
546 			logger.log("Retrieving handles of up to " + max
547 					+ " replicas of the object stored in Past with id " + id);
548 
549 		if (logger.level <= Logger.FINER)
550 			logger.log("Fetching up to " + max + " handles of "
551 					+ id.toStringFull());
552 
553 		getHandles(id, max, new StandardContinuation(command) {
554 			public void receiveResult(Object o) {
555 				NodeHandleSet replicas = (NodeHandleSet) o;
556 				if (logger.level <= Logger.FINER)
557 					logger.log("Receiving replicas " + replicas
558 							+ " for lookup Id " + id);
559 
560 				MultiContinuation multi = new MultiContinuation(parent,
561 						replicas.size()) {
562 					public Object getResult() {
563 						PastContentHandle[] p = new PastContentHandle[result.length];
564 
565 						for (int i = 0; i < result.length; i++)
566 							if (result[i] instanceof PastContentHandle)
567 								p[i] = (PastContentHandle) result[i];
568 
569 						return p;
570 					}
571 				};
572 
573 				for (int i = 0; i < replicas.size(); i++)
574 					lookupHandle(id, replicas.getHandle(i), multi
575 							.getSubContinuation(i));
576 			}
577 		});
578 	}
579 
580 	public void lookupHandle(Id id, NodeHandle handle, Continuation command) {
581 		if (logger.level <= Logger.FINE)
582 			logger.log("Retrieving handle for id " + id + " from node "
583 					+ handle);
584 
585 		sendRequest(handle, new FetchHandleMessage(getUID(), id,
586 				getLocalNodeHandle(), handle.getId()), new NamedContinuation(
587 				"FetchHandleMessage to " + handle + " for " + id, command));
588 	}
589 
590 	public void fetch(PastContentHandle handle, Continuation command) {
591 		if (logger.level <= Logger.FINE)
592 			logger.log("Retrieving object associated with content handle "
593 					+ handle);
594 
595 		if (logger.level <= Logger.FINER)
596 			logger.log("Fetching object under id "
597 					+ handle.getId().toStringFull() + " on "
598 					+ handle.getNodeHandle());
599 
600 		NodeHandle han = handle.getNodeHandle();
601 		sendRequest(han, new FetchMessage(getUID(), handle,
602 				getLocalNodeHandle(), han.getId()), new NamedContinuation(
603 				"FetchMessage to " + handle.getNodeHandle() + " for "
604 						+ handle.getId(), command));
605 	}
606 
607 	public NodeHandle getLocalNodeHandle() {
608 		return endpoint.getLocalNodeHandle();
609 	}
610 
611 	public int getReplicationFactor() {
612 		return replicationFactor;
613 	}
614 
615 	public boolean forward(final RouteMessage message) {
616 		Message internal;
617 		try {
618 			internal = message.getMessage(endpoint.getDeserializer());
619 		} catch (IOException ioe) {
620 			throw new RuntimeException(ioe);
621 		}
622 
623 		if (internal instanceof LookupMessage) {
624 
625 			final LookupMessage lmsg = (LookupMessage) internal;
626 			Id id = lmsg.getId();
627 
628 			if (!lmsg.isResponse()) {
629 				if (logger.level <= Logger.FINER)
630 					logger.log("Lookup message " + lmsg
631 							+ " is a request; look in the cache");
632 				if (storage.exists(id)) {
633 					if (logger.level <= Logger.FINE)
634 						logger.log("Request for " + id
635 								+ " satisfied locally - responding");
636 					deliver(endpoint.getId(), lmsg);
637 
638 					return false;
639 				}
640 			}
641 		} else if (internal instanceof LookupHandlesMessage) {
642 			LookupHandlesMessage lmsg = (LookupHandlesMessage) internal;
643 
644 			if (!lmsg.isResponse()) {
645 				if (endpoint.replicaSet(lmsg.getId(), lmsg.getMax()).size() == lmsg
646 						.getMax()) {
647 					if (logger.level <= Logger.FINE)
648 						logger.log("Hijacking lookup handles request for "
649 								+ lmsg.getId());
650 
651 					deliver(endpoint.getId(), lmsg);
652 
653 					return false;
654 				}
655 			}
656 		}
657 		return true;
658 	}
659 
660 	public void deliver(Id id, Message message) {
661 		final PastMessage msg = (PastMessage) message;
662 
663 		if (msg.isResponse()) {
664 			handleResponse((PastMessage) message);
665 		} else {
666 			if (logger.level <= Logger.INFO)
667 				logger.log("Received message " + message + " with destination "
668 						+ id);
669 
670 			if (msg instanceof InsertMessage) {
671 				final InsertMessage imsg = (InsertMessage) msg;
672 
673 				if (policy.allowInsert(imsg.getContent())) {
674 					inserts++;
675 					final Id msgid = imsg.getContent().getId();
676 
677 					lockManager.lock(msgid, new StandardContinuation(
678 							getResponseContinuation(msg)) {
679 
680 						public void receiveResult(Object result) {
681 							storage.getObject(msgid, new StandardContinuation(
682 									parent) {
683 								public void receiveResult(Object o) {
684 									try {
685 										final PastContent content = imsg
686 												.getContent().checkInsert(
687 														msgid, (PastContent) o);
688 										storage
689 												.store(
690 														msgid,
691 														null,
692 														content,
693 														new StandardContinuation(
694 																parent) {
695 															public void receiveResult(
696 																	Object result) {
697 																getResponseContinuation(
698 																		msg)
699 																		.receiveResult(
700 																				result);
701 																lockManager
702 																		.unlock(msgid);
703 
704 																try {
705 																	createReplica(
706 																			content,
707 																			false,
708 																			new URI(
709 																					"tcpjava://"
710 																							+ InetAddress
711 																									.getLocalHost()
712 																									.getHostAddress()
713 																							+ ":"
714 																							+ (FreePastryConnection
715 																									.getInstance()
716 																									.getLocalport() + 1000)));
717 																} catch (Exception e) {
718 																	e
719 																			.printStackTrace();
720 																}
721 
722 															}
723 														});
724 									} catch (PastException e) {
725 										parent.receiveException(e);
726 									}
727 								}
728 							});
729 						}
730 					});
731 				} else {
732 					getResponseContinuation(msg).receiveResult(
733 							new Boolean(false));
734 				}
735 			} else if (msg instanceof LookupMessage) {
736 				final LookupMessage lmsg = (LookupMessage) msg;
737 				lookups++;
738 
739 				storage.getObject(lmsg.getId(), new StandardContinuation(
740 						getResponseContinuation(lmsg)) {
741 					public void receiveResult(Object o) {
742 						if (logger.level <= Logger.FINE)
743 							logger.log("Received object " + o + " for id "
744 									+ lmsg.getId());
745 
746 						o = changeResult(o);
747 
748 						parent.receiveResult(o);
749 
750 					}
751 				});
752 			} else if (msg instanceof LookupHandlesMessage) {
753 				LookupHandlesMessage lmsg = (LookupHandlesMessage) msg;
754 				NodeHandleSet set = endpoint.replicaSet(lmsg.getId(), lmsg
755 						.getMax());
756 				if (logger.level <= Logger.FINER)
757 					logger.log("Returning replica set " + set
758 							+ " for lookup handles of id " + lmsg.getId()
759 							+ " max " + lmsg.getMax() + " at "
760 							+ endpoint.getId());
761 				getResponseContinuation(msg).receiveResult(set);
762 			} else if (msg instanceof FetchMessage) {
763 				FetchMessage fmsg = (FetchMessage) msg;
764 				lookups++;
765 
766 				Continuation c;
767 				c = getFetchResponseContinuation(msg);
768 
769 				storage.getObject(fmsg.getHandle().getId(), c);
770 			} else if (msg instanceof FetchHandleMessage) {
771 				final FetchHandleMessage fmsg = (FetchHandleMessage) msg;
772 				fetchHandles++;
773 
774 				storage.getObject(fmsg.getId(), new StandardContinuation(
775 						getResponseContinuation(msg)) {
776 					public void receiveResult(Object o) {
777 						PastContent content = (PastContent) o;
778 
779 						if (content != null) {
780 							if (logger.level <= Logger.FINE)
781 								logger
782 										.log("Retrieved data for fetch handles of id "
783 												+ fmsg.getId());
784 							parent.receiveResult(content
785 									.getHandle(MyPastImpl.this));
786 						} else {
787 							parent.receiveResult(null);
788 						}
789 					}
790 				});
791 			} else if (msg instanceof CacheMessage) {
792 				cache(((CacheMessage) msg).getContent());
793 			} else {
794 				if (logger.level <= Logger.SEVERE)
795 					logger.log("ERROR - Received message " + msg
796 							+ "of unknown type.");
797 			}
798 		}
799 	}
800 
801 	public void update(NodeHandle handle, boolean joined) {
802 	}
803 
804 	public static ContainerRef createReplica(Object o, boolean checkFirst) {
805 		return createReplica((((FPPastContent) o).getContent()).toString(),
806 				checkFirst, null);
807 	}
808 
809 	public static ContainerRef createReplica(String contentString,
810 			boolean checkFirst) {
811 		return createReplica(contentString, checkFirst, null);
812 	}
813 
814 	public static ContainerRef createReplica(Object o, boolean checkFirst,
815 			URI homeSite) {
816 		return createReplica((((FPPastContent) o).getContent()).toString(),
817 				checkFirst, homeSite);
818 	}
819 
820 	public static ContainerRef createReplica(String contentString,
821 			boolean checkFirst, URI homeSite) {
822 		ContainerRef cref = null;
823 
824 		// ICapi capi = TransportManager.getCapi();
825 		ICapi capi;
826 		try {
827 			// TODO: ersetzen
828 			capi = new Capi();
829 		} catch (XCoreException xce) {
830 			throw new RuntimeException("this should not happen");
831 		}
832 
833 		if (checkFirst = true) {
834 
835 			// cref = capi.createContainer(null, null, contentString,
836 			// IContainer.INFINITE_SIZE, new FifoCoordinator());
837 			try {
838 				cref = capi.lookupContainer(null, homeSite, contentString);
839 
840 				return cref;
841 			} catch (Exception e) {
842 				try {
843 					URI contentURI = new URI(contentString);
844 					if (contentURI.getPort() > 0) {
845 						contentString = FreePastryLookup
846 								.getContainerName(new ContainerRef(contentURI));
847 					}
848 
849 				} catch (Exception e1) {
850 				}
851 
852 			}
853 		}
854 
855 		try {
856 			cref = capi.createContainer(null, homeSite, contentString,
857 					IContainer.INFINITE_SIZE, new FifoCoordinator(),
858 					new KeyCoordinator());
859 
860 			boolean gemacht = false;
861 
862 			FreePastryConnection fp = FreePastryConnection.getInstance();
863 			NodeHandleSet handles = fp.getEndpoint().replicaSet(
864 					fp.getPastryIdFactory().buildId(contentString),
865 					fp.getNumberOfContainers()); // 2. param number of
866 			// replicas
867 
868 			for (int i = 0; i < handles.size() && gemacht == false; i++) {
869 				String handleString = handles.getHandle(i).toString();
870 				String ip = handleString.substring(handleString
871 						.lastIndexOf("/") + 1, handleString.lastIndexOf(":"));
872 				int port = Integer.parseInt(handleString.substring(handleString
873 						.lastIndexOf(":") + 1,
874 						handleString.lastIndexOf(":") + 5));
875 
876 				int compPort;
877 
878 				if (homeSite == null)
879 					compPort = TransportHandler.getInstance()
880 							.getListener(
881 									ConfigurationManager.getInstance()
882 											.getStringSetting(
883 													"DefaultAnswerToProtocol"))
884 							.getUri().getPort();
885 				// compPort=TransportManagerInstance.getInstance().getDefaultListener().getUri().getPort();
886 				else
887 					compPort = homeSite.getPort();
888 
889 				port = port + 1000;
890 
891 				if (compPort != port) {
892 
893 					URI site = new URI("tcpjava://" + ip + ":" + port);
894 
895 					try {
896 						ContainerRef nCref = capi.lookupContainer(null, site,
897 								contentString);
898 
899 						Entry[] entries = capi.read(nCref, 0, null,
900 								new FifoSelector(FifoSelector.CNT_ALL));
901 
902 						for (Entry e : entries) {
903 							capi.write(cref, 0L, null, e);
904 						}
905 
906 						gemacht = true;
907 					} catch (Exception ex) {
908 					}
909 
910 				}
911 			}
912 			/*
913 			 * LocalAspect aspect = new MyAspect(); List<LocalIPoint> p = new
914 			 * ArrayList<LocalIPoint>(); p.add(LocalIPoint.PreWrite);
915 			 * capi.addAspect(cref, p, aspect);
916 			 */
917 		} catch (Exception e) {
918 			e.printStackTrace();
919 		}
920 		return cref;
921 
922 	}
923 
924 	public static ContainerRef createReplica(String contentString,
925 			URI homeSite, URI originalSite) {
926 		ContainerRef cref = null;
927 
928 		// ICapi capi = TransportManager.getCapi();
929 		ICapi capi;
930 		try {
931 			// TODO: ersetzen
932 			capi = new Capi();
933 		} catch (XCoreException xce) {
934 			throw new RuntimeException("this should not happen");
935 		}
936 
937 		try {
938 			cref = capi.lookupContainer(null, homeSite, contentString);
939 
940 			return cref;
941 		} catch (Exception e) {
942 			try {
943 				URI contentURI = new URI(contentString);
944 				if (contentURI.getPort() > 0) {
945 					contentString = FreePastryLookup
946 							.getContainerName(new ContainerRef(contentURI));
947 				}
948 
949 			} catch (Exception e1) {
950 			}
951 
952 		}
953 
954 		try {
955 			cref = capi.createContainer(null, homeSite, contentString,
956 					IContainer.INFINITE_SIZE, new FifoCoordinator(),
957 					new KeyCoordinator());
958 
959 			boolean gemacht = false;
960 
961 			try {
962 				ContainerRef nCref = capi.lookupContainer(null, originalSite,
963 						contentString);
964 
965 				Entry[] entries = capi.read(nCref, 0, null, new FifoSelector(
966 						FifoSelector.CNT_ALL));
967 
968 				for (Entry e : entries) {
969 					capi.write(cref, 0L, null, e);
970 				}
971 
972 				gemacht = true;
973 			} catch (Exception ex) {
974 			}
975 
976 			/*
977 			 * LocalAspect aspect = new MyAspect(); List<LocalIPoint> p = new
978 			 * ArrayList<LocalIPoint>(); p.add(LocalIPoint.PreWrite);
979 			 * capi.addAspect(cref, p, aspect);
980 			 */
981 		} catch (Exception e) {
982 			e.printStackTrace();
983 		}
984 		return cref;
985 
986 	}
987 
988 	private Object changeResult(Object o) {
989 
990 		if (o == null || ((FPPastContent) o).getContent() == null)
991 			return o;
992 
993 		try {
994 			ContainerRef newCref = null;
995 			// ICapi capi = TransportManager.getCapi();
996 			ICapi capi;
997 			try {
998 				// TODO: ersetzen
999 				capi = new Capi();
1000 			} catch (XCoreException xce) {
1001 				throw new RuntimeException("this should not happen");
1002 			}
1003 
1004 			newCref = createReplica(o, true, new URI(
1005 					"tcpjava://"
1006 							+ InetAddress.getLocalHost().getHostAddress()
1007 							+ ":"
1008 							+ (FreePastryConnection.getInstance()
1009 									.getLocalport() + 1000)));
1010 
1011 			/*
1012 			 * try { newCref = capi.lookupContainer(null,
1013 			 * TransportManagerInstance.getInstance().getDefaultListener().getUri(),
1014 			 * (((FPPastContent)o).getContent()).toString()); } catch(Exception
1015 			 * e) { createReplica(o, false); newCref =
1016 			 * capi.lookupContainer(null,
1017 			 * TransportManagerInstance.getInstance().getDefaultListener().getUri(),
1018 			 * (((FPPastContent)o).getContent()).toString()); }
1019 			 */
1020 
1021 			((FPPastContent) o).setContent(newCref.asURI().toString());
1022 
1023 		} catch (Exception e) {
1024 			e.printStackTrace();
1025 		}
1026 
1027 		return o;
1028 	}
1029 
1030 	public void fetch(final Id id, NodeHandle hint, Continuation command) {
1031 		if (logger.level <= Logger.FINER)
1032 			logger
1033 					.log("Sending out replication fetch request for the id "
1034 							+ id);
1035 
1036 		policy.fetch(id, hint, backup, this, new StandardContinuation(command) {
1037 			public void receiveResult(Object o) {
1038 				if (o == null) {
1039 					if (logger.level <= Logger.WARNING)
1040 						logger.log("Could not fetch id " + id
1041 								+ " - policy returned null in namespace "
1042 								+ instance);
1043 					parent.receiveResult(new Boolean(false));
1044 				} else {
1045 					if (logger.level <= Logger.FINEST)
1046 						logger.log("inserting replica of id " + id);
1047 
1048 					createReplica(o, true);
1049 
1050 					if (!(o instanceof PastContent))
1051 						if (logger.level <= Logger.WARNING)
1052 							logger.log("ERROR! Not PastContent "
1053 									+ o.getClass().getName() + " " + o);
1054 					storage.getStorage().store(((PastContent) o).getId(), null,
1055 							(PastContent) o, parent);
1056 				}
1057 			}
1058 		});
1059 	}
1060 
1061 	public void remove(final Id id, Continuation command) {
1062 		/*
1063 		 * try { ICapi capi = TransportManager.getCapi();
1064 		 * capi.destroyContainer(null, capi.lookupContainer(null, null,
1065 		 * this.deleteContentString)); } catch(Exception e) {
1066 		 * e.printStackTrace(); }
1067 		 */
1068 
1069 		if (backup != null) {
1070 			storage.getObject(id, new StandardContinuation(command) {
1071 				public void receiveResult(Object o) {
1072 
1073 					try {
1074 						ICapi capi;
1075 						try {
1076 							// TODO: ersetzen
1077 							capi = new Capi();
1078 						} catch (XCoreException xce) {
1079 							throw new RuntimeException("this should not happen");
1080 						}
1081 						// ICapi capi = TransportManager.getCapi();
1082 						capi.destroyContainer(null, capi.lookupContainer(null,
1083 								null, (((FPPastContent) o).getContent())
1084 										.toString()));
1085 					} catch (Exception e) {
1086 						throw new RuntimeException(e);
1087 					}
1088 
1089 					backup.cache(id, storage.getMetadata(id), (Serializable) o,
1090 							new StandardContinuation(parent) {
1091 								public void receiveResult(Object o) {
1092 									storage.unstore(id, parent);
1093 								}
1094 							});
1095 				}
1096 			});
1097 		} else {
1098 
1099 			storage.getObject(id, new StandardContinuation(command) {
1100 				public void receiveResult(Object o) {
1101 
1102 					try {
1103 						// ICapi capi = TransportManager.getCapi();
1104 						ICapi capi;
1105 						try {
1106 							// TODO: ersetzen
1107 							capi = new Capi();
1108 						} catch (XCoreException xce) {
1109 							throw new RuntimeException("this should not happen");
1110 						}
1111 						capi.destroyContainer(null, capi.lookupContainer(null,
1112 								null, (((FPPastContent) o).getContent())
1113 										.toString()));
1114 					} catch (Exception e) {
1115 						e.printStackTrace();
1116 					}
1117 				}
1118 			});
1119 
1120 			storage.unstore(id, command);
1121 		}
1122 	}
1123 
1124 	public IdSet scan(IdRange range) {
1125 		return storage.getStorage().scan(range);
1126 	}
1127 
1128 	public IdSet scan() {
1129 		return storage.getStorage().scan();
1130 	}
1131 
1132 	public boolean exists(Id id) {
1133 		return storage.getStorage().exists(id);
1134 	}
1135 
1136 	public void existsInOverlay(Id id, Continuation command) {
1137 		lookupHandles(id, replicationFactor + 1, new StandardContinuation(
1138 				command) {
1139 			public void receiveResult(Object result) {
1140 				Object results[] = (Object[]) result;
1141 				for (int i = 0; i < results.length; i++) {
1142 					if (results[i] instanceof PastContentHandle) {
1143 						parent.receiveResult(Boolean.TRUE);
1144 						return;
1145 					}
1146 				}
1147 				parent.receiveResult(Boolean.FALSE);
1148 			}
1149 		});
1150 	}
1151 
1152 	public void reInsert(Id id, Continuation command) {
1153 		storage.getObject(id, new StandardContinuation(command) {
1154 			public void receiveResult(final Object o) {
1155 				insert((PastContent) o, new StandardContinuation(parent) {
1156 					public void receiveResult(Object result) {
1157 						Boolean results[] = (Boolean[]) result;
1158 						for (int i = 0; i < results.length; i++) {
1159 							if (results[i].booleanValue()) {
1160 								parent.receiveResult(Boolean.TRUE);
1161 								return;
1162 							}
1163 						}
1164 						parent.receiveResult(Boolean.FALSE);
1165 					}
1166 				});
1167 			}
1168 		});
1169 	}
1170 
1171 	public Replication getReplication() {
1172 		return replicaManager.getReplication();
1173 	}
1174 
1175 	public StorageManager getStorageManager() {
1176 		return storage;
1177 	}
1178 
1179 	public interface MessageBuilder {
1180 		public PastMessage buildMessage();
1181 	}
1182 
1183 	public String getInstance() {
1184 		return instance;
1185 	}
1186 
1187 	public void setContentDeserializer(PastContentDeserializer deserializer) {
1188 		contentDeserializer = deserializer;
1189 	}
1190 
1191 	public void setContentHandleDeserializer(
1192 			PastContentHandleDeserializer deserializer) {
1193 		contentHandleDeserializer = deserializer;
1194 	}
1195 }