1 package org.xvsm.internal;
2
3 import java.net.URI;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.Properties;
7 import java.util.concurrent.ConcurrentHashMap;
8 import java.util.concurrent.ScheduledFuture;
9
10 import org.apache.log4j.Logger;
11 import org.xvsm.coordinators.KeyCoordinator;
12 import org.xvsm.coordinators.LindaCoordinator;
13 import org.xvsm.core.AtomicEntry;
14 import org.xvsm.core.Capi;
15 import org.xvsm.core.ContainerRef;
16 import org.xvsm.core.Entry;
17 import org.xvsm.core.Tuple;
18 import org.xvsm.core.aspect.IPoint;
19 import org.xvsm.core.aspect.LocalAspect;
20 import org.xvsm.core.aspect.LocalIPoint;
21 import org.xvsm.interfaces.ICapi;
22 import org.xvsm.interfaces.IContainerManager;
23 import org.xvsm.interfaces.ICoordinator;
24 import org.xvsm.interfaces.container.IContainer;
25 import org.xvsm.internal.exceptions.AspectNotOkException;
26 import org.xvsm.internal.exceptions.AspectRescheduleException;
27 import org.xvsm.internal.exceptions.AspectSkipException;
28 import org.xvsm.internal.exceptions.ContainerNameOccupiedException;
29 import org.xvsm.internal.exceptions.CountNotMetException;
30 import org.xvsm.internal.exceptions.FatalException;
31 import org.xvsm.internal.exceptions.InvalidContainerException;
32 import org.xvsm.internal.exceptions.XCoreException;
33 import org.xvsm.selectors.KeySelector;
34 import org.xvsm.selectors.LindaSelector;
35 import org.xvsm.selectors.RandomSelector;
36 import org.xvsm.selectors.Selector;
37 import org.xvsm.transactions.Transaction;
38
39 /***
40 *
41 * @author Christian Schreiber, Michael Proestler
42 *
43 */
44 public final class ContainerManager implements IContainerManager {
45
46 /***
47 * Updates the timeout of blocking events in a container.
48 *
49 * @author Christian Schreiber, Michael Proestler
50 *
51 */
52 private class TimeoutHandler implements Runnable {
53
54 /***
55 * The container.
56 */
57 private IContainer c;
58
59 /***
60 * Creates a new UpdateTimeoutThread.
61 *
62 * @param c
63 * the container to update.
64 */
65 public TimeoutHandler(IContainer c) {
66 this.c = c;
67 }
68
69 /***
70 *
71 * {@inheritDoc}.
72 */
73
74 public void run() {
75 Thread.currentThread().setName(this.getClass().getName());
76 c.updateTimeouts();
77 }
78
79 /***
80 * @return the ccontainer
81 */
82 public IContainer getC() {
83 return c;
84 }
85
86 /***
87 *
88 * {@inheritDoc}.
89 */
90 @Override
91 public int hashCode() {
92 final int prime = 31;
93 int result = 1;
94 result = prime * result + ((c == null) ? 0 : c.hashCode());
95 return result;
96 }
97
98 /***
99 *
100 * {@inheritDoc}.
101 */
102 @Override
103 public boolean equals(Object obj) {
104 if (this == obj) {
105 return true;
106 }
107 if (obj == null) {
108 return false;
109 }
110 if (getClass() != obj.getClass()) {
111 return false;
112 }
113 final TimeoutHandler other = (TimeoutHandler) obj;
114 if (c == null) {
115 if (other.c != null) {
116 return false;
117 }
118 } else if (!c.equals(other.c)) {
119 return false;
120 }
121 return true;
122 }
123
124 }
125
126 /***
127 * Contains all timeout handler.
128 */
129 private ConcurrentHashMap<ContainerRef, ScheduledFuture<?>> timeoutHandler;
130
131 /***
132 * The logger of this class.
133 */
134 private static Logger logger = Logger.getLogger(ContainerManager.class);
135
136 /***
137 * An Instance of the ContainerManager. Singleton-Pattern.
138 */
139 private static ContainerManager mngr;
140
141 /***
142 * The Container which has all ContainerRefs.
143 */
144 private IContainer containerContainer;
145
146 /***
147 * The container which is used for management of named container.
148 */
149 private IContainer lookupContainer;
150
151 /***
152 * The Capi to access the containerContainer.
153 */
154 private ICapi capi;
155
156 /***
157 * Private default constructor.
158 */
159 private ContainerManager() {
160
161 ContainerRef cccref = new ContainerRef();
162 cccref.setSite(null);
163 this.containerContainer = new BlockingLayer(IContainer.INFINITE_SIZE);
164 this.containerContainer.setCref(cccref);
165 ICoordinator keyCoord = new KeyCoordinator(new KeyCoordinator.KeyType(
166 "system", Object.class));
167 ICoordinator lindaCoord = new LindaCoordinator();
168 this.containerContainer.addCoordinator(keyCoord.getDefaultSelector(),
169 keyCoord);
170 this.containerContainer.addCoordinator(lindaCoord.getDefaultSelector(),
171 lindaCoord);
172
173
174 ContainerRef lccref = new ContainerRef();
175 lccref.setSite(null);
176 this.lookupContainer = new BlockingLayer(IContainer.INFINITE_SIZE);
177 this.lookupContainer.setCref(lccref);
178 ICoordinator lckeyCoord = new KeyCoordinator(
179 new KeyCoordinator.KeyType("lookup", String.class));
180 ICoordinator lclindaCoord = new LindaCoordinator();
181 this.lookupContainer.addCoordinator(lckeyCoord.getDefaultSelector(),
182 lckeyCoord);
183 this.lookupContainer.addCoordinator(lindaCoord.getDefaultSelector(),
184 lclindaCoord);
185
186
187
188 this.lookupContainer.addAspects(java.util.Arrays.asList(new IPoint[] {
189 LocalIPoint.PostWrite, LocalIPoint.PostDestroy }),
190 new LocalAspect() {
191
192 private ContainerRef metaCref;
193 private KeySelector<?> ks = null;
194
195 private void prepare(ContainerRef cref, Transaction tx,
196 List<Entry> entries, Properties contextProperties) {
197 try {
198 if (entries.size() == 1
199 && entries.get(0) instanceof AtomicEntry<?>) {
200 AtomicEntry<?> e = (AtomicEntry<?>) entries
201 .get(0);
202 ks = null;
203 for (Selector s : e.getSelectors()) {
204 if (s instanceof KeySelector
205 && ((KeySelector<?>) s)
206 .getKeyName().equals(
207 "lookup")) {
208 ks = (KeySelector<?>) s;
209 }
210 }
211
212
213 String containerUri = ((URI) ((AtomicEntry<?>) entries
214 .get(0)).getValue()).toASCIIString();
215 metaCref = new ContainerRef(new URI(
216 containerUri + "/meta"));
217
218
219 metaCref.setSite(cref.getSite());
220 }
221 } catch (Exception e) {
222 throw new FatalException(e);
223 }
224 }
225
226 public void postWrite(ContainerRef cref, Transaction tx,
227 List<Entry> entries, Properties contextProperties)
228 throws AspectNotOkException,
229 AspectRescheduleException, AspectSkipException {
230 try {
231 this.prepare(cref, tx, entries, contextProperties);
232 capi.shift(metaCref, tx, new AtomicEntry<String>(
233 (String) ks.getKeyValue(), String.class,
234 new KeySelector<String>("system", "name")));
235 } catch (Exception e) {
236 throw new FatalException(e);
237 }
238 }
239
240 public void postDestroy(ContainerRef cref, Transaction tx,
241 List<Selector> selectors, List<Entry> deleted,
242 Properties contextProperties)
243 throws AspectNotOkException,
244 AspectRescheduleException, AspectSkipException {
245 try {
246 this.prepare(cref, tx, deleted, contextProperties);
247 capi.destroy(metaCref, 0, tx,
248 new KeySelector<String>("system", "name"));
249 } catch (Exception e) {
250
251
252
253 }
254 }
255 }, null);
256 TimeoutSchedulerPool.schedule(new TimeoutHandler(lookupContainer));
257
258 try {
259 this.capi = new Capi();
260 } catch (XCoreException e) {
261 throw new FatalException(e);
262 }
263 this.timeoutHandler = new ConcurrentHashMap<ContainerRef, ScheduledFuture<?>>();
264 }
265
266 /***
267 * Singleton-Pattern. Returns an Instance of the ContainerManager.
268 *
269 * @return an Instance of ContainerManager.
270 */
271 public static synchronized ContainerManager getInstance() {
272 if (mngr == null) {
273 mngr = new ContainerManager();
274 }
275 return mngr;
276 }
277
278 public static synchronized boolean hasInstance() {
279 return (mngr != null);
280 }
281
282 /***
283 * Removes the instance.
284 *
285 */
286 public static synchronized void removeInstance() {
287 mngr = null;
288 }
289
290 /***
291 * {@inheritDoc}
292 */
293
294 public synchronized ContainerRef createContainer(Transaction tx, int size)
295 throws ContainerNameOccupiedException {
296
297 if (logger.isDebugEnabled()) {
298 logger.debug("createContainer(" + tx + ", " + size + ")");
299 }
300
301 if (tx != null && tx.isImplicit()) {
302 tx = null;
303 }
304
305 ContainerRef cref = new ContainerRef();
306 IContainer c = new BlockingLayer(size);
307 cref.setContainer(c);
308 c.setCref(cref);
309
310 ContainerRef metaCref = new ContainerRef();
311 IContainer metaC = new BlockingLayer(IContainer.INFINITE_SIZE);
312 metaC.addCoordinator(KeySelector.class, new KeyCoordinator(
313 new KeyCoordinator.KeyType("system", Object.class)));
314 metaC.setCref(metaCref);
315
316 Tuple tuple = new Tuple();
317
318
319 tuple.add(new AtomicEntry<ContainerRef>(cref));
320
321 tuple.add(new AtomicEntry<IContainer>(c));
322
323 tuple.add(new AtomicEntry<ContainerRef>(metaCref));
324
325 Tuple metaTuple = new Tuple();
326 metaTuple.add(new AtomicEntry<ContainerRef>(metaCref));
327 metaTuple.add(new AtomicEntry<IContainer>(metaC));
328
329 metaTuple.add(new AtomicEntry<ContainerRef>(metaCref));
330
331 try {
332 if (logger.isDebugEnabled()) {
333 logger.debug("createContainer() write new container "
334 + "into the containterContainer");
335 }
336 capi.write(this.containerContainer.getCref(), 0, tx, tuple,
337 metaTuple);
338 } catch (XCoreException e) {
339 throw new FatalException(e);
340 }
341
342 TimeoutHandler th = new TimeoutHandler(c);
343 this.timeoutHandler.put(cref, TimeoutSchedulerPool.schedule(th));
344
345 if (logger.isDebugEnabled()) {
346 logger.debug("createContainer() container created: " + cref);
347 }
348
349
350
351 LocalAspect aspect = new LocalAspect() {
352 public void preRead(ContainerRef cref, Transaction tx,
353 List<Selector> selectors, int retrycount,
354 java.util.Properties contextProperties)
355 throws AspectNotOkException, AspectRescheduleException,
356 AspectSkipException {
357 for (Selector s : selectors) {
358 if (s instanceof KeySelector) {
359 KeySelector<?> ks = (KeySelector<?>) s;
360 if (ks.getKeyName().equals("system")
361 && ks.getKeyValue().equals("current_size")) {
362 throw new AspectSkipException();
363 }
364 }
365 }
366 }
367
368 public void postRead(ContainerRef cref, Transaction tx,
369 List<Entry> entries, List<Selector> selectors,
370 java.util.Properties contextProperties)
371 throws AspectNotOkException, AspectRescheduleException,
372 AspectSkipException {
373 for (Selector s : selectors) {
374 if (s instanceof KeySelector) {
375 KeySelector<?> ks = (KeySelector<?>) s;
376 if (ks.getKeyName().equals("system")
377 && ks.getKeyValue().equals("current_size")) {
378 ContainerManager cm = ContainerManager
379 .getInstance();
380 IContainer c = cm.getContainer(cref);
381 Integer currentSize = c.currentSize();
382 entries.add(new AtomicEntry<Integer>(currentSize));
383 }
384 }
385 }
386 }
387
388 public Properties getProperties() {
389 return new Properties();
390 }
391
392 public void setProperties(Properties props) {
393
394 }
395 };
396 metaC.addAspects(java.util.Arrays.asList(new IPoint[] {
397 LocalIPoint.PreRead, LocalIPoint.PostRead }), aspect, null);
398
399 return cref;
400 }
401
402 /***
403 * {@inheritDoc}
404 */
405
406 public synchronized ContainerRef createContainer(Transaction tx, int size,
407 ICoordinator... coordinators) throws ContainerNameOccupiedException {
408
409 if (logger.isDebugEnabled()) {
410 logger.debug("createContainer(" + tx + ", " + size + coordinators
411 + ")");
412 }
413 try {
414 IContainer c = this
415 .getContainer(tx, this.createContainer(tx, size));
416 for (ICoordinator coord : coordinators) {
417 c.addCoordinator(coord.getDefaultSelector(), coord);
418 }
419
420
421
422
423
424
425 Tuple template = new Tuple();
426 template.add(new AtomicEntry<ContainerRef>(c.getCref()));
427 template.add((Entry) null);
428 template.add((Entry) null);
429
430 Entry[] entries = this.capi.read(this.containerContainer.getCref(),
431 0, tx, new LindaSelector(template));
432 ContainerRef metaCref = (ContainerRef) ((AtomicEntry<?>) ((Tuple) entries[0])
433 .getEntryAt(2)).getValue();
434
435 Tuple cTuple = new Tuple();
436 for (ICoordinator co : coordinators) {
437 cTuple.add(new AtomicEntry<String>(co.getClass()
438 .getSimpleName()));
439 }
440 cTuple.addSelectors(new KeySelector<String>("system",
441 "coordinators"));
442 capi.write(metaCref, 0, tx, cTuple);
443 capi.write(metaCref, 0, tx, new AtomicEntry<Integer>(size,
444 Integer.class,
445 new KeySelector<String>("system", "max_size")));
446 if (logger.isDebugEnabled()) {
447 logger.debug("createContainer() container created: "
448 + c.getCref());
449 }
450
451 return c.getCref();
452 } catch (XCoreException e) {
453 throw new FatalException(e);
454 }
455
456 }
457
458 /***
459 * {@inheritDoc}
460 */
461 public void deleteContainer(Transaction tx, ContainerRef cref) {
462 if (logger.isDebugEnabled()) {
463 logger.debug("deleteContainer(" + tx + "," + cref + ")");
464 }
465
466 if (cref.equals(this.containerContainer.getCref())) {
467 logger.warn("Sombody tries to "
468 + "remote the containercontainer ... abort");
469 return;
470 }
471
472 if (tx != null && tx.isImplicit()) {
473 tx = null;
474 }
475
476 Tuple template = new Tuple();
477 template.add(new AtomicEntry<ContainerRef>(cref));
478 template.add((Entry) null);
479 template.add((Entry) null);
480
481 Selector selector = new LindaSelector(template);
482 try {
483
484 Entry[] entries = capi.take(this.containerContainer.getCref(), 0,
485 tx, selector);
486 if (entries != null && entries.length > 0 && entries[0] != null) {
487 ContainerRef metaCref = (ContainerRef) ((AtomicEntry<?>) ((Tuple) entries[0])
488 .getEntryAt(2)).getValue();
489
490
491 IContainer cc = (IContainer) ((AtomicEntry<?>) ((Tuple) entries[0])
492 .getEntryAt(1)).getValue();
493 cc.destroy();
494
495 if (!metaCref.equals(cref)) {
496
497 this.timeoutHandler.get(cref).cancel(true);
498 this.timeoutHandler.remove(cref);
499
500 this.deleteContainer(tx, metaCref);
501 cref.setContainer(null);
502 if (logger.isDebugEnabled()) {
503 logger.debug("deleteContainer() removed the"
504 + " timeout scheduler for the container.");
505 }
506 }
507 }
508
509 capi.destroy(this.lookupContainer.getCref(), 0, tx,
510 new LindaSelector(new AtomicEntry<String>(cref.asURI()
511 .toASCIIString())));
512 } catch (CountNotMetException e) {
513 if (logger.isDebugEnabled()) {
514 logger.debug("deleteContainer() container (" + cref
515 + ") can not be found. (" + e + ")");
516 }
517 } catch (XCoreException e) {
518 throw new FatalException(e);
519 }
520 if (logger.isDebugEnabled()) {
521 logger.debug("deleteContainer() container (" + cref + ") deleted");
522 }
523 }
524
525 /***
526 * {@inheritDoc}
527 */
528
529 public IContainer getContainer(Transaction tx, ContainerRef cref)
530 throws InvalidContainerException {
531 if (logger.isDebugEnabled()) {
532 logger.debug("getContainer(" + tx + ", " + cref + ")");
533 }
534
535
536 String path = cref.asURI().getPath();
537 if (path.endsWith("/meta")) {
538 IContainer cc = this.getContainer(tx, this.getMetaContainer(tx,
539 new ContainerRef(path.substring("/containers/".length(),
540 path.lastIndexOf("/meta")))));
541 return cc;
542 }
543
544 if (path.endsWith("/lookup")
545 || cref.getId().equals(this.lookupContainer.getCref().getId())) {
546 return this.lookupContainer;
547 }
548
549
550 if (cref.getContainer() != null) {
551 if (logger.isDebugEnabled()) {
552 logger.debug("getContainer() the container is cached."
553 + "Do not have to read from the container.");
554 }
555 return cref.getContainer();
556 }
557
558 if (tx != null && tx.isImplicit()) {
559 tx = null;
560 }
561
562 if (cref.getId().equals(this.containerContainer.getCref().getId())) {
563 if (logger.isDebugEnabled()) {
564 logger.debug("getCotainer() asked for \"ContainerContainer\": "
565 + this.containerContainer);
566 }
567 return this.containerContainer;
568 }
569 Tuple template = new Tuple();
570 template.add(new AtomicEntry<ContainerRef>(cref));
571 template.add((Entry) null);
572 template.add((Entry) null);
573
574 Selector selector = new LindaSelector(1, template);
575 try {
576
577 Entry[] entries;
578 try {
579
580
581 boolean underRollback = false;
582 if (tx != null) {
583 underRollback = tx.isUnderRollback();
584 tx.setUnderRollback(false);
585 }
586 entries = capi.read(this.containerContainer.getCref(), 0, tx,
587 selector);
588 if (tx != null) {
589 tx.setUnderRollback(underRollback);
590 }
591 } catch (CountNotMetException e) {
592 return null;
593 }
594
595 if (logger.isDebugEnabled()) {
596 logger.debug("getContainer() "
597 + (IContainer) ((AtomicEntry<?>) ((Tuple) entries[0])
598 .getEntryAt(1)).getValue());
599 }
600
601 return (IContainer) ((AtomicEntry<?>) ((Tuple) entries[0])
602 .getEntryAt(1)).getValue();
603 } catch (XCoreException e) {
604 throw new FatalException(e);
605 }
606 }
607
608 /***
609 *
610 * {@inheritDoc}.
611 */
612 public ContainerRef getContainer(Transaction tx, String name)
613 throws InvalidContainerException {
614 if (logger.isDebugEnabled()) {
615 logger.debug("getContainer(" + tx + ", " + name + ")");
616 }
617
618 if (tx != null && tx.isImplicit()) {
619 tx = null;
620 }
621
622 Selector selector = new KeySelector<String>("names", name);
623 Entry[] entries;
624 try {
625 entries = capi.read(this.containerContainer.getCref(), 0, tx,
626 selector);
627 if (entries == null || entries.length < 1 || entries[0] == null) {
628 throw new XCoreException();
629 }
630 } catch (XCoreException e) {
631 throw new InvalidContainerException(
632 "There is no container with name: " + name);
633 }
634
635 if (logger.isDebugEnabled()) {
636 logger.debug("getContainer() returns: "
637 + (ContainerRef) ((AtomicEntry<?>) ((Tuple) entries[0])
638 .getEntryAt(0)).getValue());
639 }
640 return (ContainerRef) ((AtomicEntry<?>) ((Tuple) entries[0])
641 .getEntryAt(0)).getValue();
642
643 }
644
645 /***
646 *
647 * {@inheritDoc}.
648 */
649 public ContainerRef getMetaContainer(Transaction tx, ContainerRef cref)
650 throws InvalidContainerException {
651 if (logger.isDebugEnabled()) {
652 logger.debug("getMetaContainer(" + tx + ", " + cref + ")");
653 }
654
655 if (tx != null && tx.isImplicit()) {
656 tx = null;
657 }
658
659 Tuple template = new Tuple();
660 template.add(new AtomicEntry<ContainerRef>(cref));
661 template.add((Entry) null);
662 template.add((Entry) null);
663
664 Entry[] entries = null;
665 try {
666 entries = this.capi.read(this.containerContainer.getCref(), 0, tx,
667 new LindaSelector(template));
668 } catch (XCoreException e) {
669 if (e instanceof CountNotMetException) {
670 throw new InvalidContainerException(
671 "The meta container does not exist");
672 }
673 throw new FatalException(e);
674 }
675 if (entries != null && entries.length > 0 && entries[0] != null) {
676 if (logger.isDebugEnabled()) {
677 logger.debug("getMetaContainer() "
678 + (ContainerRef) ((AtomicEntry<?>) ((Tuple) entries[0])
679 .getEntryAt(0)).getValue());
680 }
681
682 return (ContainerRef) ((AtomicEntry<?>) ((Tuple) entries[0])
683 .getEntryAt(2)).getValue();
684 }
685
686 return null;
687 }
688
689 /***
690 *
691 * {@inheritDoc}.
692 */
693 @SuppressWarnings("unchecked")
694 public List<ContainerRef> getAllContainer(Transaction tx) {
695 if (logger.isDebugEnabled()) {
696 logger.debug("getAllContainer(" + tx + ")");
697 }
698
699 if (tx != null && tx.isImplicit()) {
700 tx = null;
701 }
702
703 List<ContainerRef> crefs = new ArrayList<ContainerRef>();
704 try {
705 Entry[] entries = this.capi.read(this.containerContainer.getCref(),
706 0, tx, new RandomSelector(Selector.CNT_ALL));
707
708
709 for (Entry e : entries) {
710 Tuple t = (Tuple) e;
711
712 if (!(((AtomicEntry<ContainerRef>) t.getEntryAt(0)).getValue()
713 .equals(((AtomicEntry<ContainerRef>) t.getEntryAt(2))
714 .getValue()))) {
715
716
717 crefs.add((ContainerRef) ((AtomicEntry) ((Tuple) e)
718 .getEntryAt(0)).getValue());
719 }
720 }
721 } catch (XCoreException e) {
722
723 }
724
725 if (logger.isDebugEnabled()) {
726 logger.debug("getAllContainer() returns (" + crefs.size()
727 + ") crefs: " + crefs);
728 }
729 return crefs;
730 }
731
732 /***
733 * Get the Container which has all ContainerRefs.
734 *
735 * @return the Container which has the overview over all existing
736 * ContainerRefs.
737 */
738 public IContainer getContainerContainer() {
739 return this.containerContainer;
740 }
741
742 /***
743 * Returns the container to which the cref of the meta container belongs.
744 *
745 * @param metaCref
746 * @return
747 */
748 private IContainer getContainer(ContainerRef metaCref) {
749 if (metaCref.equals(this.containerContainer.getCref())) {
750 return this.containerContainer;
751 }
752
753 Tuple template = new Tuple();
754 template.add((Entry) null);
755 template.add((Entry) null);
756 template.add(new AtomicEntry<ContainerRef>(metaCref));
757
758 Selector selector = new LindaSelector(2, template);
759 try {
760
761 Entry[] entries;
762 try {
763 entries = capi.read(this.containerContainer.getCref(), 0, null,
764 selector);
765 } catch (CountNotMetException e) {
766 return null;
767 }
768 for (Entry e : entries) {
769 Tuple t = (Tuple) e;
770 if (t.getEntryAt(0).equals(t.getEntryAt(2))) {
771
772
773 continue;
774 } else {
775 return (IContainer) ((AtomicEntry<?>) t.getEntryAt(1))
776 .getValue();
777 }
778 }
779 return (IContainer) ((AtomicEntry<?>) ((Tuple) entries[0])
780 .getEntryAt(1)).getValue();
781 } catch (XCoreException e) {
782 throw new FatalException(e);
783 }
784 }
785 }