View Javadoc

1   package org.xvsm.core.aspect;
2   
3   import static org.junit.Assert.assertFalse;
4   import static org.junit.Assert.assertNotNull;
5   import static org.junit.Assert.assertTrue;
6   
7   import java.net.URI;
8   import java.util.ArrayList;
9   import java.util.Collection;
10  import java.util.List;
11  import java.util.Properties;
12  
13  import org.junit.Before;
14  import org.junit.Test;
15  import org.junit.runner.RunWith;
16  import org.junit.runners.Parameterized;
17  import org.junit.runners.Parameterized.Parameters;
18  import org.xvsm.TestParameterConfiguration;
19  import org.xvsm.coordinators.FifoCoordinator;
20  import org.xvsm.coordinators.GenericKeyCoordinator;
21  import org.xvsm.coordinators.LindaCoordinator;
22  import org.xvsm.core.AtomicEntry;
23  import org.xvsm.core.Capi;
24  import org.xvsm.core.ContainerRef;
25  import org.xvsm.core.Entry;
26  import org.xvsm.core.Tuple;
27  import org.xvsm.core.notifications.Operation;
28  import org.xvsm.interfaces.ICapi;
29  import org.xvsm.interfaces.ICoordinator;
30  import org.xvsm.interfaces.NotificationListener;
31  import org.xvsm.interfaces.container.IContainer;
32  import org.xvsm.internal.ContainerManager;
33  import org.xvsm.internal.exceptions.AspectNotOkException;
34  import org.xvsm.internal.exceptions.AspectRescheduleException;
35  import org.xvsm.internal.exceptions.AspectSkipException;
36  import org.xvsm.internal.exceptions.FatalException;
37  import org.xvsm.internal.exceptions.InvalidContainerException;
38  import org.xvsm.internal.exceptions.XCoreException;
39  import org.xvsm.selectors.FifoSelector;
40  import org.xvsm.selectors.GenericKeySelector;
41  import org.xvsm.selectors.KeySelector;
42  import org.xvsm.selectors.LindaSelector;
43  import org.xvsm.selectors.Selector;
44  import org.xvsm.transactions.Transaction;
45  
46  @RunWith(Parameterized.class)
47  public class CustomNotificationTest implements NotificationListener {
48  
49  	/***
50  	 * The URI of the core instance used for testing.
51  	 */
52  	private static URI defaultURI;
53  
54  	/***
55  	 * The ICapi instance used by this test case.
56  	 */
57  	private ICapi capi;
58  
59  	/***
60  	 * This field is set to true when the notification fires and is used to
61  	 * check if the notification is executed as expected.
62  	 */
63  	private Boolean fired = false;
64  
65  	/***
66  	 * The aspect used for handling the notification. This implementation only
67  	 * support notifications on write.
68  	 * 
69  	 * @author Christian Schreiber
70  	 * 
71  	 */
72  	public static class WriteNotification extends LocalAspect {
73  		/***
74  		 * Auto generated serial version uid.
75  		 */
76  		private static final long serialVersionUID = -6750217343530283108L;
77  		/***
78  		 * The Capi instance used by the aspect.
79  		 */
80  		private ICapi capi;
81  
82  		/***
83  		 * The container on which the notification is registered.
84  		 */
85  		private ContainerRef cref;
86  
87  		/***
88  		 * The notification container which is used to collect the
89  		 * notifications.
90  		 */
91  		private ContainerRef ncref;
92  
93  		/***
94  		 * Default constructor.
95  		 */
96  		public WriteNotification() {
97  			try {
98  				capi = new Capi();
99  			} catch (Exception e) {
100 				throw new FatalException(e);
101 			}
102 		}
103 
104 		public WriteNotification(ContainerRef cref, ContainerRef ncref) {
105 			this();
106 			super.properties.put("cref", new AtomicEntry<URI>(cref.asURI()));
107 			super.properties.put("ncref", new AtomicEntry<URI>(ncref.asURI()));
108 
109 			this.cref = cref;
110 			this.ncref = ncref;
111 		}
112 
113 		@Override
114 		public void finalize() throws Throwable {
115 			super.finalize();
116 			// remove the notification container.
117 			if (this.ncref != null) {
118 				ContainerManager.getInstance().deleteContainer(null, ncref);
119 			}
120 		}
121 
122 		@Override
123 		public void postWrite(ContainerRef cref, Transaction tx,
124 				List<Entry> entries, Properties contextProperties)
125 				throws AspectNotOkException, AspectRescheduleException,
126 				AspectSkipException {
127 			try {
128 				for (Entry entry : entries) {
129 					// If no copy is used for writing it might happen that
130 					// transaction logs are messed up when the container is
131 					// within the same core instance.
132 					Entry e = entry.getFlatCopy();
133 					// use the same selectors as used for the "original entry".
134 					// Please note that it might cause an exception when "ncref"
135 					// does not provide the coordinators.
136 					e.setSelectors(entry.getSelectors());
137 					this.capi.write(ncref, 0, tx, e);
138 				}
139 			} catch (XCoreException e) {
140 				throw new FatalException(e);
141 			}
142 		}
143 
144 		// the following two methods are required when the aspect is
145 		// transaported using the XML protocol.
146 		@Override
147 		public void setProperties(Properties props) {
148 			this.cref = new ContainerRef((URI) ((AtomicEntry<?>) props
149 					.get("cref")).getValue());
150 			this.ncref = new ContainerRef((URI) ((AtomicEntry<?>) props
151 					.get("ncref")).getValue());
152 			this.properties = props;
153 		}
154 
155 		@Override
156 		public Properties getProperties() {
157 			return this.properties;
158 		}
159 	}
160 
161 	/***
162 	 * Constructor.
163 	 * 
164 	 * @param defaultURI
165 	 *            the URI of the space instance used for testing.
166 	 * @throws Exception
167 	 */
168 	public CustomNotificationTest(URI defaultURI) throws Exception {
169 		CustomNotificationTest.defaultURI = defaultURI;
170 		this.capi = new Capi();
171 	}
172 
173 	@Parameters
174 	public static Collection<?> regExValues() throws Exception {
175 		return TestParameterConfiguration.getURIs();
176 	}
177 
178 	@Before
179 	public void setUp() throws Exception {
180 		this.fired = false;
181 	}
182 
183 	/***
184 	 * Creates a new notification.
185 	 * 
186 	 * @param cref
187 	 *            the ContainerRef on which the notification shall be
188 	 *            registered.
189 	 * @param selectors
190 	 *            the selectors which have to match in order to fire the
191 	 *            notification.
192 	 * @return the URI of the notification aspect (required to unregister the
193 	 *         notification).
194 	 */
195 	private URI createNotification(ContainerRef cref,
196 			final NotificationListener listener, final Selector... selectors)
197 			throws Exception {
198 		// read the coordinators from the meta container and create a new
199 		// container which has the same coordinators (if possible, see below).
200 
201 		// the constructor Container(String s); assumes that s is only the id of
202 		// the container without the rest of the URI. Therefore we have to use
203 		// Contaner(URI uri) here.
204 		ContainerRef metaCref = new ContainerRef(new URI(cref.asURI()
205 				.toASCIIString()
206 				+ "/meta"));
207 		Entry[] result = capi.read(metaCref, 0, null, new KeySelector<String>(
208 				"system", "coordinators"));
209 		Tuple t = (Tuple) result[0];
210 
211 		List<ICoordinator> coordinators = new ArrayList<ICoordinator>();
212 		for (Entry e : t) {
213 			AtomicEntry<?> ae = (AtomicEntry<?>) e;
214 			String coordinatorName = (String) ae.getValue();
215 
216 			if (coordinatorName.equals("LabelCoordinator")
217 					|| coordinatorName.equals("KeyCoordinator")) {
218 				// TODO implement the functionality to the properties from
219 				// coordinators.
220 				throw new FatalException("Cannot create a notification for \""
221 						+ coordinatorName + "\" because this coordinator "
222 						+ "needs extra properties.");
223 			}
224 			Class<?> clazz = Class.forName("org.xvsm.coordinators."
225 					+ coordinatorName);
226 			ICoordinator coordinator = (ICoordinator) clazz.newInstance();
227 			coordinators.add(coordinator);
228 		}
229 		// always add a linda coordinator in order to enable template matching.
230 		coordinators.add(new LindaCoordinator());
231 
232 		// If you need a KeyCoordintaor or LabelCoordinator you can add it here
233 		// (do not forget the KeyType!).
234 
235 		// create the notification container. This container will contain all
236 		// notification events.
237 		final ContainerRef ncref = this.capi.createContainer(null, null, null,
238 				IContainer.INFINITE_SIZE, coordinators
239 						.toArray(new ICoordinator[0]));
240 		// add the notification aspect to the container.
241 		URI aspectURI = this.capi.addAspect(cref, java.util.Arrays
242 				.asList(new LocalIPoint[] { LocalIPoint.PostWrite }),
243 				new WriteNotification(cref, ncref));
244 		// create a thread which waits for new (matching) entries.
245 		Thread thread = new Thread() {
246 			public void run() {
247 				while (true) {
248 					try {
249 						Entry[] entries = capi.take(ncref,
250 								ICapi.INFINITE_TIMEOUT, null, selectors);
251 						// in this case the operation field is not used.
252 						listener.handleNotification(null, entries);
253 					} catch (Exception e) {
254 						// ignore
255 					}
256 				}
257 			}
258 		};
259 		thread.start();
260 		return aspectURI;
261 	}
262 
263 	@Test
264 	public void testNotificationWithSelector() throws Exception {
265 		ContainerRef cref = this.capi.createContainer(null, defaultURI, null,
266 				IContainer.INFINITE_SIZE, new FifoCoordinator(),
267 				new LindaCoordinator());
268 		URI notifURI = this.createNotification(cref, this, new FifoSelector());
269 		assertNotNull(notifURI);
270 
271 		// write some entry to bring the notification to fire.
272 		this.capi.write(cref, 0, null, new AtomicEntry<String>(
273 				"Hello Notification", String.class, new FifoSelector()));
274 
275 		synchronized (this.fired) {
276 			this.fired.wait(10000);
277 			assertTrue(this.fired);
278 		}
279 
280 		// clean up
281 		this.capi.destroyContainer(null, cref);
282 	}
283 
284 	@Test
285 	public void testNotificationWithLindaSelector() throws Exception {
286 		ContainerRef cref = this.capi.createContainer(null, defaultURI, null,
287 				IContainer.INFINITE_SIZE, new FifoCoordinator());
288 		URI notifURI = this.createNotification(cref, this, new LindaSelector(
289 				new Tuple(new AtomicEntry<String>(String.class))));
290 		assertNotNull(notifURI);
291 
292 		// write some entry to bring the notification to fire.
293 		this.capi.write(cref, 0, null, new Tuple(new AtomicEntry<String>(
294 				"Hello Notification")));
295 
296 		synchronized (this.fired) {
297 			this.fired.wait(10000);
298 			assertTrue(this.fired);
299 			this.fired = false;
300 		}
301 
302 		// try again with an entry that does not match the template
303 		this.capi.write(cref, 0, null, new Tuple(new AtomicEntry<String>(
304 				"I do not match the template"), new AtomicEntry<Integer>(666)));
305 		synchronized (this.fired) {
306 			this.fired.wait(10000);
307 			assertFalse(this.fired);
308 		}
309 		// clean up
310 		this.capi.destroyContainer(null, cref);
311 	}
312 
313 	@Test
314 	public void testNotificationWithGenericKey() throws Exception {
315 		// Please note that this does not work with the KeyCoordinator because
316 		// you have to provide the KeyType during initialization.
317 		ContainerRef cref = this.capi.createContainer(null, defaultURI, null,
318 				IContainer.INFINITE_SIZE, new GenericKeyCoordinator());
319 		URI notifURI = this.createNotification(cref, this,
320 				new GenericKeySelector<String>("test", "10"));
321 		assertNotNull(notifURI);
322 
323 		// write some entry to bring the notification to fire.
324 		this.capi.write(cref, 0, null, new AtomicEntry<Integer>(100,
325 				Integer.class, new GenericKeySelector<String>("test", "10")));
326 		// Please note that this may cause a problem: when somebody already
327 		// wrote an entry on this position (and did not take this entry from the
328 		// notification container) the position in the notification container is
329 		// not free an the write operation in the aspect will cause an
330 		// exception. It would be possible to use "shift" instead of "write" in
331 		// the aspect but then it is possible that the user misses a
332 		// notification.
333 
334 		synchronized (this.fired) {
335 			this.fired.wait(10000);
336 			assertTrue(this.fired);
337 			this.fired = false;
338 		}
339 
340 		// test again with key that does not match
341 		this.capi.write(cref, 0, null, new AtomicEntry<Integer>(100,
342 				Integer.class, new GenericKeySelector<String>("test", "1")));
343 
344 		synchronized (this.fired) {
345 			this.fired.wait(10000);
346 			assertFalse(this.fired);
347 			this.fired = false;
348 		}
349 
350 		// clean up
351 		this.capi.destroyContainer(null, cref);
352 	}
353 
354 	/***
355 	 * The callback method of the notification.
356 	 */
357 	public void handleNotification(Operation operation, Entry... entries) {
358 		System.out.println("Notification fired :)");
359 		System.out
360 				.println(operation + " " + java.util.Arrays.toString(entries));
361 		synchronized (this.fired) {
362 			this.fired.notify();
363 			this.fired = true;
364 		}
365 	}
366 }