1 package org.xvsm.core;
2
3 import java.lang.reflect.Constructor;
4 import java.net.URI;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.Properties;
8 import java.util.StringTokenizer;
9 import java.util.UUID;
10
11 import org.apache.log4j.Logger;
12 import org.xvsm.configuration.ConfigurationManager;
13 import org.xvsm.coordinators.FifoCoordinator;
14 import org.xvsm.coordinators.RandomCoordinator;
15 import org.xvsm.core.aspect.GlobalAspect;
16 import org.xvsm.core.aspect.IPoint;
17 import org.xvsm.core.aspect.LocalAspect;
18 import org.xvsm.core.aspect.LocalIPoint;
19 import org.xvsm.core.notifications.NotificationAspect;
20 import org.xvsm.core.notifications.Operation;
21 import org.xvsm.interfaces.ICapi;
22 import org.xvsm.interfaces.ICoordinator;
23 import org.xvsm.interfaces.NotificationListener;
24 import org.xvsm.internal.EventProcessingPool;
25 import org.xvsm.internal.ReplySenderPool;
26 import org.xvsm.internal.TimeoutSchedulerPool;
27 import org.xvsm.internal.exceptions.ContainerFullException;
28 import org.xvsm.internal.exceptions.ContainerNameOccupiedException;
29 import org.xvsm.internal.exceptions.CountNotMetException;
30 import org.xvsm.internal.exceptions.FatalException;
31 import org.xvsm.internal.exceptions.InvalidContainerException;
32 import org.xvsm.internal.exceptions.InvalidTransactionException;
33 import org.xvsm.internal.exceptions.XCoreException;
34 import org.xvsm.internal.tasks.AspectTask;
35 import org.xvsm.internal.tasks.AspectTaskType;
36 import org.xvsm.internal.tasks.ClearTask;
37 import org.xvsm.internal.tasks.ContainerTask;
38 import org.xvsm.internal.tasks.ContainerTaskType;
39 import org.xvsm.internal.tasks.OperationTask;
40 import org.xvsm.internal.tasks.OperationTaskType;
41 import org.xvsm.internal.tasks.ShutdownTask;
42 import org.xvsm.internal.tasks.Task;
43 import org.xvsm.internal.tasks.TransactionTask;
44 import org.xvsm.internal.tasks.TransactionTaskType;
45 import org.xvsm.remote.TransportHandler;
46 import org.xvsm.selectors.KeySelector;
47 import org.xvsm.selectors.LindaSelector;
48 import org.xvsm.selectors.RandomSelector;
49 import org.xvsm.selectors.Selector;
50 import org.xvsm.transactions.Transaction;
51
52 /***
53 *
54 * @author Christian Schreiber, Michael Proestler
55 *
56 */
57 public class Capi implements ICapi {
58
59 /***
60 * The logger for this Class.
61 */
62 private static Logger logger = Logger.getLogger(Capi.class);
63
64 /***
65 * This property will be passed within every operation to the aspects.
66 */
67 private Properties aspectContext;
68
69 /***
70 * Used to create the virtual answer to container reference.
71 */
72 private ThreadLocal<VirtualAnswerContainer> answerContainer = new ThreadLocal<VirtualAnswerContainer>();
73
74 /***
75 * Default Constructor.
76 *
77 * @throws XCoreException
78 * thrown if there problems initializing the capi.
79 */
80 public Capi() throws XCoreException {
81
82 TransportHandler.getInstance().init();
83 EventProcessingPool.getInstance();
84 TimeoutSchedulerPool.init();
85 ReplySenderPool.init();
86 }
87
88 /***
89 * {@inheritDoc}
90 */
91 public void clearSpace(URI site) throws XCoreException {
92 if (logger.isDebugEnabled()) {
93 logger.debug("clearSpace(" + site + ")");
94 }
95
96 Task task = new ClearTask();
97 task.setAspectContext(this.aspectContext);
98 this.send(site, task);
99 task.waitUntilFinished();
100 }
101
102 /***
103 * {@inheritDoc}
104 */
105 public void commitTransaction(Transaction txn) throws XCoreException {
106 if (logger.isDebugEnabled()) {
107 logger.debug("commitTransaction(" + txn + ")");
108 }
109 TransactionTask task = new TransactionTask(TransactionTaskType.COMMIT,
110 txn);
111 task.setAspectContext(this.aspectContext);
112 this.send(txn.getSite(), task);
113 task.waitUntilFinished();
114 try {
115 this.throwException(task);
116 } catch (InvalidTransactionException e) {
117 throw e;
118 } catch (Exception e) {
119 throw new FatalException(e);
120 }
121
122 }
123
124 /***
125 * {@inheritDoc}
126 */
127
128 public ContainerRef createContainer(Transaction tx, URI site, String name,
129 int size, ICoordinator... coordinators) throws XCoreException {
130 if (logger.isDebugEnabled()) {
131 logger.debug("createContainer(" + tx + ", " + site + ", " + size
132 + ", " + java.util.Arrays.toString(coordinators));
133 }
134 Task task;
135 if (coordinators != null && coordinators.length > 0
136 && coordinators[0] != null) {
137 task = new ContainerTask(tx, ContainerTaskType.CREATE, size,
138 coordinators);
139 } else {
140 task = new ContainerTask(tx, ContainerTaskType.CREATE, size,
141 new RandomCoordinator());
142 }
143 task.setAspectContext(this.aspectContext);
144 this.send(site, task);
145 this.throwException(task);
146
147 Object o = task.readResult();
148 ContainerRef result = null;
149 if (o instanceof URI) {
150
151 result = new ContainerRef((URI) o);
152 } else {
153
154 result = new ContainerRef((URI) ((AtomicEntry<?>) ((List<?>) o)
155 .get(0)).getValue());
156 }
157
158 result.setSite(site);
159 if (name != null) {
160 ContainerRef cref = new ContainerRef("lookup");
161 cref.setSite(site);
162 try {
163 this.write(cref, 0, tx, new AtomicEntry<URI>(result.asURI(),
164 URI.class, new KeySelector<String>("lookup", name)));
165 } catch (ContainerFullException e) {
166 throw new ContainerNameOccupiedException(name
167 + " already exists");
168 }
169 }
170 return result;
171 }
172
173 /***
174 * {@inheritDoc}
175 */
176 public Transaction createTransaction(URI site, long timeout)
177 throws XCoreException {
178 if (logger.isDebugEnabled()) {
179 logger.debug("createTransaction(" + site + ", " + timeout + ")");
180 }
181 TransactionTask task = new TransactionTask();
182 task.setTimeout(timeout);
183 task.setAspectContext(this.aspectContext);
184 this.send(site, task);
185
186 Object o = task.readResult();
187 Transaction tx;
188 if (o instanceof URI) {
189
190 tx = new Transaction((URI) o);
191 } else {
192
193 tx = new Transaction((URI) ((AtomicEntry<?>) ((List<?>) o).get(0))
194 .getValue());
195 }
196 tx.setSite(site);
197
198 return tx;
199 }
200
201 /***
202 * {@inheritDoc}
203 */
204 public void destroy(ContainerRef cref, long timeout, Transaction tx,
205 Selector... sel) throws XCoreException {
206 if (logger.isDebugEnabled()) {
207 logger.debug("destroy(" + cref + ", " + timeout + ", " + tx + ", "
208 + java.util.Arrays.toString(sel));
209 }
210 List<Selector> selectors = null;
211 if (sel == null) {
212 selectors = new ArrayList<Selector>();
213 } else {
214 selectors = java.util.Arrays.asList(sel);
215 }
216 OperationTask task = new OperationTask(OperationTaskType.DESTROY, null,
217 cref, tx, selectors, timeout);
218 task.setAspectContext(this.aspectContext);
219 this.send(cref.getSite(), task);
220 this.throwException(task);
221 task.waitUntilFinished();
222 }
223
224 /***
225 * {@inheritDoc}
226 */
227
228 public void destroyContainer(Transaction tx, ContainerRef cref)
229 throws XCoreException {
230 if (logger.isDebugEnabled()) {
231 logger.debug("destroyContainer(" + tx + "," + cref + ")");
232 }
233 Task task = new ContainerTask(tx, ContainerTaskType.DELETE, cref);
234 task.setAspectContext(this.aspectContext);
235 this.send(cref.getSite(), task);
236
237 try {
238 this.throwException(task);
239
240 } catch (Exception e) {
241 throw new FatalException(e);
242 }
243 try {
244
245
246 ContainerRef lookupC = new ContainerRef("lookup");
247 lookupC.setSite(cref.getSite());
248 this.destroy(lookupC, 0, null, new LindaSelector(
249 new AtomicEntry<URI>(cref.asURI())));
250 } catch (CountNotMetException e) {
251
252
253 } catch (ContainerFullException e) {
254
255
256 }
257 }
258
259 /***
260 * {@inheritDoc}
261 */
262
263 public ContainerRef lookupContainer(Transaction tx, URI site,
264 String containerName) throws XCoreException {
265 if (logger.isDebugEnabled()) {
266 logger.debug("getNamedContainer(" + tx + ", " + site + ","
267 + containerName + ")");
268 }
269 Entry[] entries;
270 try {
271 ContainerRef cref = new ContainerRef("lookup");
272 cref.setSite(site);
273 entries = this.read(cref, 0, tx, new KeySelector<String>("lookup",
274 containerName));
275 } catch (XCoreException e1) {
276 throw new InvalidContainerException(containerName
277 + " does not exist");
278 }
279 if (entries.length < 1) {
280 throw new InvalidContainerException(containerName
281 + " does not exist");
282 }
283 ContainerRef cref = new ContainerRef(
284 (URI) ((AtomicEntry<?>) entries[0]).getValue());
285 cref.setSite(site);
286 return cref;
287
288 }
289
290 /***
291 * {@inheritDoc}
292 *
293 */
294 @SuppressWarnings("unchecked")
295 public Entry[] read(ContainerRef cref, long timeout, Transaction tx,
296 Selector... sel) throws XCoreException {
297 if (logger.isDebugEnabled()) {
298 logger.debug("read(" + cref + ", " + timeout + ", " + tx + ", "
299 + java.util.Arrays.toString(sel));
300 }
301 List<Selector> selectors = null;
302 if (sel == null) {
303 selectors = new ArrayList<Selector>();
304 } else {
305 selectors = java.util.Arrays.asList(sel);
306 }
307
308 OperationTask task = new OperationTask(OperationTaskType.READ, null,
309 cref, tx, selectors, timeout);
310 task.setAspectContext(this.aspectContext);
311 this.send(cref.getSite(), task);
312 this.throwException(task);
313 return ((List<Entry>) task.readResult()).toArray(new Entry[0]);
314 }
315
316 /***
317 * {@inheritDoc}
318 */
319 public void rollbackTransaction(Transaction txn) throws XCoreException {
320 if (logger.isDebugEnabled()) {
321 logger.debug("rollbackTransaction(" + txn + ")");
322 }
323 TransactionTask task = new TransactionTask(
324 TransactionTaskType.ROLLBACK, txn);
325 task.setAspectContext(this.aspectContext);
326 this.send(txn.getSite(), task);
327 try {
328 this.throwException(task);
329 } catch (InvalidTransactionException e) {
330 throw e;
331 } catch (Exception e) {
332 throw new FatalException(e);
333 }
334 }
335
336 /***
337 * {@inheritDoc}
338 */
339 public void shift(ContainerRef cref, Transaction tx, Entry... entries)
340 throws XCoreException {
341 if (logger.isDebugEnabled()) {
342 logger.debug("shift(" + cref + ", "
343 + java.util.Arrays.toString(entries) + ")");
344 }
345 OperationTask task = new OperationTask(OperationTaskType.SHIFT,
346 java.util.Arrays.asList(entries), cref, tx, null, 0);
347 task.setAspectContext(this.aspectContext);
348 this.send(cref.getSite(), task);
349 this.throwException(task);
350 }
351
352 /***
353 * {@inheritDoc}
354 */
355 public void shutdown(URI site, boolean clearSpace) throws XCoreException {
356 if (logger.isDebugEnabled()) {
357 logger.debug("shutdown(" + site + "," + clearSpace + ")");
358 }
359 if (clearSpace) {
360 this.clearSpace(site);
361 }
362 Task task = new ShutdownTask();
363 task.setAspectContext(this.aspectContext);
364 this.send(site, task);
365 task.waitUntilFinished();
366 }
367
368 /***
369 * {@inheritDoc}
370 */
371 @SuppressWarnings("unchecked")
372 public Entry[] take(ContainerRef cref, long timeout, Transaction tx,
373 Selector... sel) throws XCoreException {
374 if (logger.isDebugEnabled()) {
375 logger.debug("take(" + cref + ", " + timeout + ", " + tx + ", "
376 + java.util.Arrays.toString(sel));
377 }
378 List<Selector> selectors = null;
379 if (sel == null) {
380 selectors = new ArrayList<Selector>();
381 } else {
382 selectors = java.util.Arrays.asList(sel);
383 }
384 OperationTask task = new OperationTask(OperationTaskType.TAKE, null,
385 cref, tx, selectors, timeout);
386 task.setAspectContext(this.aspectContext);
387 this.send(cref.getSite(), task);
388 this.throwException(task);
389 return ((List<Entry>) task.readResult()).toArray(new Entry[0]);
390
391 }
392
393 /***
394 * {@inheritDoc}
395 */
396 public void write(ContainerRef cref, long timeout, Transaction tx,
397 Entry... entries) throws XCoreException {
398 if (logger.isDebugEnabled()) {
399 logger.debug("write(" + cref + ", " + timeout + ", " + tx + ", "
400 + java.util.Arrays.toString(entries));
401 }
402 OperationTask task = new OperationTask(OperationTaskType.WRITE,
403 java.util.Arrays.asList(entries), cref, tx, null, timeout);
404 task.setAspectContext(this.aspectContext);
405 this.send(cref.getSite(), task);
406 this.throwException(task);
407
408 }
409
410 /***
411 * Method blocks till the result of the task is available and checks if the
412 * result is an exception. If it is an exception it will be thrown.<br>
413 * If the answer is a Throwable which does not inherit from
414 * {@link XCoreException} a {@link XCoreException} is thrown.
415 *
416 * @param task
417 * the task to check.
418 * @throws XCoreException
419 * thrown if the answer of the task is an exception.
420 */
421 @SuppressWarnings("unchecked")
422 private void throwException(Task task) throws XCoreException {
423 Object xx = task.readResult();
424
425 if (xx instanceof Throwable) {
426 Throwable t = (Throwable) xx;
427 if (t instanceof XCoreException) {
428 throw (XCoreException) t;
429 }
430 throw new FatalException(t);
431 }
432 }
433
434 /***
435 * {@inheritDoc}
436 */
437 public URI addAspect(ContainerRef cref, List<LocalIPoint> p,
438 LocalAspect aspect) throws XCoreException {
439 if (logger.isDebugEnabled()) {
440 logger.debug("addAspect(" + cref + ", " + p + ", " + aspect + ")");
441 }
442 List<IPoint> points = new ArrayList<IPoint>();
443 for (IPoint pp : p) {
444 points.add(pp);
445 }
446 AspectTask task = new AspectTask(AspectTaskType.ADD, cref, aspect,
447 points);
448 task.setAspectContext(this.aspectContext);
449 this.send(cref.getSite(), task);
450 task.waitUntilFinished();
451 URI result;
452
453 if (task.getResult() instanceof URI) {
454 result = (URI) task.getResult();
455 } else {
456 result = (URI) ((AtomicEntry<?>) ((List<?>) task.getResult())
457 .get(0)).getValue();
458 }
459
460 return result;
461
462 }
463
464 /***
465 * {@inheritDoc}
466 */
467 @Deprecated
468 public void removeAspect(ContainerRef cref, List<LocalIPoint> p,
469 LocalAspect aspect) throws XCoreException {
470 if (logger.isDebugEnabled()) {
471 logger.debug("addAspect(" + cref + ", " + p + ", " + aspect + ")");
472 }
473
474 List<IPoint> points = new ArrayList<IPoint>();
475 for (IPoint pp : p) {
476 points.add(pp);
477 }
478 AspectTask task = new AspectTask(AspectTaskType.DELETE, cref, aspect,
479 points);
480 task.setAspectContext(this.aspectContext);
481 this.send(cref.getSite(), task);
482 task.waitUntilFinished();
483 }
484
485 /***
486 *
487 * {@inheritDoc}.
488 */
489 @Deprecated
490 public void removeAspect(URI site, List<IPoint> p, GlobalAspect aspect)
491 throws XCoreException {
492 if (logger.isDebugEnabled()) {
493 logger.debug("removeAspect(" + site + ", " + p + ", " + aspect
494 + ")");
495 }
496 AspectTask task = new AspectTask(AspectTaskType.DELETE, aspect, p);
497 task.setAspectContext(this.aspectContext);
498 this.send(site, task);
499 task.waitUntilFinished();
500 }
501
502 /***
503 * {@inheritDoc}
504 */
505 public void removeAspect(ContainerRef cref, List<LocalIPoint> p, URI auri)
506 throws XCoreException {
507 if (logger.isDebugEnabled()) {
508 logger.debug("addAspect(" + cref + ", " + p + ", " + auri + ")");
509 }
510
511 List<IPoint> points = new ArrayList<IPoint>();
512 for (IPoint pp : p) {
513 points.add(pp);
514 }
515 AspectTask task = new AspectTask(AspectTaskType.DELETE, cref, auri,
516 points);
517 task.setAspectContext(this.aspectContext);
518 this.send(cref.getSite(), task);
519 task.waitUntilFinished();
520 }
521
522 /***
523 *
524 * {@inheritDoc}.
525 */
526 public void removeAspect(URI site, List<IPoint> p, URI auri)
527 throws XCoreException {
528 if (logger.isDebugEnabled()) {
529 logger.debug("removeAspect(" + site + ", " + p + ", " + auri + ")");
530 }
531 AspectTask task = new AspectTask(AspectTaskType.DELETE, auri, p);
532 task.setAspectContext(this.aspectContext);
533 this.send(site, task);
534 task.waitUntilFinished();
535 }
536
537 /***
538 *
539 * {@inheritDoc}.
540 */
541 public URI addAspect(URI site, List<IPoint> p, GlobalAspect aspect)
542 throws XCoreException {
543 if (logger.isDebugEnabled()) {
544 logger.debug("addAspect(" + site + ", " + p + ", " + aspect + ")");
545 }
546 AspectTask task = new AspectTask(AspectTaskType.ADD, aspect, p);
547 task.setAspectContext(this.aspectContext);
548 this.send(site, task);
549 task.waitUntilFinished();
550 URI result;
551 if (task.getResult() instanceof URI) {
552 result = (URI) task.getResult();
553 } else {
554 result = (URI) ((AtomicEntry<?>) ((List<?>) task.getResult())
555 .get(0)).getValue();
556 }
557 return result;
558 }
559
560 public URI createNotification(ContainerRef cref,
561 final NotificationListener listener, Operation... operations) {
562 List<LocalIPoint> ipoints = new ArrayList<LocalIPoint>();
563 for (Operation o : operations) {
564 switch (o) {
565 case Write:
566 ipoints.add(LocalIPoint.PostWrite);
567 break;
568 case Read:
569 ipoints.add(LocalIPoint.PostRead);
570 break;
571 case Take:
572 ipoints.add(LocalIPoint.PostTake);
573 break;
574 case Destroy:
575 ipoints.add(LocalIPoint.PostDestroy);
576 break;
577 case Shift:
578 ipoints.add(LocalIPoint.PostShift);
579 break;
580 }
581 }
582 try {
583 final ContainerRef ncref = this.createContainer(null, cref
584 .getSite(), null, -1, new FifoCoordinator());
585 new Thread() {
586 public void run() {
587 try {
588 while (true) {
589 Entry[] entries = take(ncref, -1, null,
590 new RandomSelector(1));
591 Tuple t = (Tuple) entries[0];
592 Operation o = Operation
593 .valueOf((String) (((AtomicEntry<?>) t
594 .getEntryAt(0)).getValue()));
595 listener.handleNotification(o, ((Tuple) t
596 .getEntryAt(1)).toArray());
597 }
598 } catch (InvalidContainerException e) {
599
600 } catch (XCoreException e) {
601 throw new FatalException(e);
602 }
603 }
604 }.start();
605 return this.addAspect(cref, ipoints, new NotificationAspect(cref,
606 ncref));
607 } catch (XCoreException e) {
608 throw new FatalException(e);
609 }
610 }
611
612 /***
613 * Sends the task t to the url. If the URI is <code>null</code> it will be
614 * written in the local core.
615 *
616 * @param url
617 * the URI of the remote core or <code>null</code> for the local
618 * core.
619 * @param t
620 * the task which has to be send.
621 */
622 private void send(URI url, Task t) {
623
624
625 boolean local = false;
626
627
628 ConfigurationManager cm = ConfigurationManager.getInstance();
629 if (url != null) {
630 String p = cm.getStringSetting("RemoteProtocols");
631 StringTokenizer st = new StringTokenizer(p, "|");
632 TransportHandler tm = TransportHandler.getInstance();
633 while (st.hasMoreTokens()) {
634 String s = st.nextToken();
635 String sproto = tm.getListener(s).getUri().toASCIIString();
636 local = url.toString().equalsIgnoreCase(sproto);
637 if (local) {
638 break;
639 }
640 }
641 } else {
642
643 local = true;
644 }
645
646 if (local) {
647 EventProcessingPool.getInstance().execute(t);
648 } else {
649 try {
650 URI answerToUri = null;
651 if (this.answerContainer.get() == null) {
652 answerToUri = this.getAnswerContainerURI();
653 this.answerContainer.set(VirtualAnswerContainer
654 .getOrCreateInstance(answerToUri));
655 } else {
656 answerToUri = this.answerContainer.get().getContainerURI();
657 }
658
659 t.setAnswerToContainer(answerToUri);
660 TransportHandler.getInstance().send(url, t);
661
662 Entry[] result = this.answerContainer.get().wait(answerToUri);
663
664
665
666
667 t.setAnswerToContainer(null);
668 if (result[0].getEntryType().equals(Entry.EntryTypes.VOID)) {
669
670
671 t.setResult(new ArrayList<Entry>());
672 } else if (result[0].getEntryType().equals(
673 Entry.EntryTypes.EXCEPTION)) {
674
675
676 ExceptionEntry exceptionEntry = (ExceptionEntry) result[0];
677 Class<?> ec;
678 try {
679 ec = Class.forName("org.xvsm.internal.exceptions."
680 + exceptionEntry.getName());
681 } catch (ClassNotFoundException e) {
682 ec = FatalException.class;
683 }
684 Constructor<?> constructor = ec
685 .getConstructor(String.class);
686 Throwable ex = (Throwable) constructor
687 .newInstance(exceptionEntry.getDesription());
688 t.setResult(ex);
689 } else {
690 t.setResult(java.util.Arrays.asList(result));
691 }
692 } catch (Exception e) {
693 throw new FatalException(e);
694 }
695 }
696 }
697
698 /***
699 * @return the aspectContext
700 */
701 public Properties getAspectContext() {
702 return aspectContext;
703 }
704
705 /***
706 * @param aspectContext
707 * the aspectContext to set
708 */
709 public void setAspectContext(Properties aspectContext) {
710 this.aspectContext = aspectContext;
711 }
712
713 /***
714 * Get the URI of the answertoContainer used by this thread.
715 *
716 * @return the uri of the virtual answer to container
717 */
718 public URI getAnswerContainerURI() {
719 String uri = TransportHandler.getInstance().getListener(
720 ConfigurationManager.getInstance().getStringSetting(
721 "DefaultAnswerToProtocol")).getUri().toString();
722
723 if (uri.charAt(uri.length() - 1) != '/') {
724
725 uri += '/';
726 }
727 try {
728 return new URI(uri + "containers/" + UUID.randomUUID().toString());
729 } catch (Exception e) {
730 throw new FatalException(e);
731 }
732 }
733 }