View Javadoc

1   /***
2    * 
3    */
4   package org.xvsm.internal;
5   
6   import java.net.URI;
7   import java.util.ArrayList;
8   import java.util.List;
9   import java.util.Properties;
10  import java.util.concurrent.locks.Lock;
11  import java.util.concurrent.locks.ReadWriteLock;
12  import java.util.concurrent.locks.ReentrantReadWriteLock;
13  
14  import org.apache.log4j.Logger;
15  import org.xvsm.core.ContainerRef;
16  import org.xvsm.core.Entry;
17  import org.xvsm.core.VoidEntry;
18  import org.xvsm.core.aspect.IAspect;
19  import org.xvsm.core.aspect.IPoint;
20  import org.xvsm.interfaces.ICapi;
21  import org.xvsm.interfaces.ICoordinator;
22  import org.xvsm.interfaces.container.IContainer;
23  import org.xvsm.interfaces.container.ITransactionLayer;
24  import org.xvsm.internal.exceptions.AspectNotOkException;
25  import org.xvsm.internal.exceptions.AspectRescheduleException;
26  import org.xvsm.internal.exceptions.ContainerFullException;
27  import org.xvsm.internal.exceptions.CountNotMetException;
28  import org.xvsm.internal.exceptions.FatalException;
29  import org.xvsm.internal.exceptions.InvalidContainerException;
30  import org.xvsm.internal.exceptions.InvalidTransactionException;
31  import org.xvsm.internal.exceptions.TimeoutExpiredException;
32  import org.xvsm.internal.exceptions.TransactionLockException;
33  import org.xvsm.internal.tasks.OperationTask;
34  import org.xvsm.internal.tasks.dao.IOperationTaskDAO;
35  import org.xvsm.internal.tasks.dao.ITransactionDAO;
36  import org.xvsm.internal.tasks.dao.inmemory.OperationTaskDAO;
37  import org.xvsm.internal.tasks.dao.inmemory.TransactionDAO;
38  import org.xvsm.selectors.Selector;
39  import org.xvsm.transactions.Transaction;
40  
41  /***
42   * @author Christian Schreiber, Michael Proestler
43   * 
44   */
45  public class BlockingLayer implements IContainer {
46  
47  	/***
48  	 * the logger.
49  	 */
50  	private static Logger logger = Logger.getLogger(BlockingLayer.class);
51  
52  	/***
53  	 * The transaction layer for this container.
54  	 */
55  	private ITransactionLayer c;
56  
57  	/***
58  	 * The lock.
59  	 */
60  	private ReadWriteLock rwLock = new ReentrantReadWriteLock();
61  
62  	/***
63  	 * Contains all tasks which block because another transaction uses the
64  	 * container.
65  	 */
66  	private ITransactionDAO transactionDAO = new TransactionDAO();
67  
68  	/***
69  	 * All blocking operations.
70  	 * 
71  	 */
72  	// TODO make separate DAOs for read/write/....?
73  	private IOperationTaskDAO blockingOperations = new OperationTaskDAO();
74  
75  	/***
76  	 * Creates a new BlockingLayer.
77  	 * 
78  	 * @param size
79  	 *            the size of the new container.
80  	 */
81  	public BlockingLayer(int size) {
82  		c = new TransactionLayer(size);
83  	}
84  
85  	/***
86  	 * {@inheritDoc}.
87  	 */
88  	public Object execute(OperationTask task) throws ContainerFullException,
89  			CountNotMetException, TransactionLockException,
90  			AspectNotOkException, AspectRescheduleException {
91  		if (logger.isDebugEnabled()) {
92  			logger.debug("execute() " + task + " -- " + this.getCref());
93  		}
94  		Lock lock = null;
95  		Object result = null;
96  		// this.transactionDAO.writeLock().lock();
97  		try {
98  			switch (task.getType()) {
99  			case READ:
100 				lock = rwLock.readLock();
101 				lock.lock();
102 				result = c.read(task.getTx(), task.getSelectors(), task
103 						.getRetrycount(), task.getAspectContext());
104 				break;
105 			case WRITE:
106 				lock = rwLock.writeLock();
107 				lock.lock();
108 				c.write(task.getEntries(), task.getTx(), task.getRetrycount(),
109 						task.getAspectContext());
110 				result = new VoidEntry();
111 				break;
112 			case TAKE:
113 				lock = rwLock.writeLock();
114 				lock.lock();
115 				result = c.take(false, task.getTx(), task.getSelectors(), task
116 						.getRetrycount(), task.getAspectContext());
117 				break;
118 			case SHIFT:
119 
120 				lock = rwLock.writeLock();
121 				lock.lock();
122 				c.shift(task.getEntries(), task.getTx(), task
123 						.getAspectContext());
124 				result = new VoidEntry();
125 				break;
126 			case DESTROY:
127 
128 				lock = rwLock.writeLock();
129 				lock.lock();
130 				List<Entry> deleted = c.take(true, task.getTx(), task
131 						.getSelectors(), task.getRetrycount(), task
132 						.getAspectContext());
133 				task.setDeleted(deleted);
134 				result = new VoidEntry();
135 				break;
136 			default:
137 				break;
138 			}
139 			// increase retrycount.
140 			task.setRetrycount(task.getRetrycount() + 1);
141 
142 			this.wakeUpBlocking();
143 			if (logger.isDebugEnabled()) {
144 				logger.debug("execute() executed task result: " + result
145 						+ " -- " + this.getCref());
146 			}
147 			return result;
148 		} catch (ContainerFullException e) {
149 			if (logger.isDebugEnabled()) {
150 				logger.debug("execute() NoPlaceLeftException was thrown."
151 						+ " Adding task to blocking operations -- "
152 						+ this.getCref());
153 			}
154 			this.blockOperationTask(task, e);
155 			throw e;
156 		} catch (CountNotMetException e) {
157 			if (logger.isDebugEnabled()) {
158 				logger.debug("execute() CountNotMetException was thrown."
159 						+ " Adding task to blocking operations -- "
160 						+ this.getCref());
161 			}
162 			this.blockOperationTask(task, e);
163 			throw e;
164 		} catch (TransactionLockException e) {
165 			if (!this.transactionDAO.add(task, e.getReason())) {
166 				// A transactions commited in the time
167 				EventProcessingPool.getInstance().execute(task);
168 			}
169 			throw e;
170 		} catch (InvalidTransactionException e) {
171 			return e;
172 		} catch (AspectRescheduleException e) {
173 			// increase retrycount.
174 			task.setRetrycount(task.getRetrycount() + 1);
175 			throw e;
176 		} catch (Exception e) {
177 			return new FatalException(e);
178 		} finally {
179 			// release the lock.
180 			if (lock != null) {
181 				lock.unlock();
182 				lock = null;
183 			}
184 		}
185 	}
186 
187 	/***
188 	 * Adds the Task to the blocking Tasks if the timeout is INFINITE or > 0.
189 	 * Otherwise the result of the task is set to e.
190 	 * 
191 	 * @param task
192 	 *            the task to block.
193 	 * @param e
194 	 *            the exception that will be set as result, if the timeout
195 	 *            expired.
196 	 */
197 	private void blockOperationTask(OperationTask task, Exception e) {
198 		// Block the task if the timeout is > 0 or INFINITE. Otherwise
199 		// return the exception.
200 		if (task.getTimeout() == ICapi.INFINITE_TIMEOUT
201 				|| task.getTimeout() > 0) {
202 			blockingOperations.add(task);
203 			if (logger.isDebugEnabled()) {
204 				logger.debug("blockOperationTask() added task");
205 			}
206 		} else {
207 			if (logger.isDebugEnabled()) {
208 				logger.debug("blockOperationTask() timeout is 0 => "
209 						+ "set the exception " + e + " as result");
210 			}
211 			task.setResult(e);
212 		}
213 	}
214 
215 	/***
216 	 * 
217 	 * {@inheritDoc}.
218 	 */
219 	public ContainerRef getCref() {
220 		return c.getCref();
221 	}
222 
223 	/***
224 	 * 
225 	 * {@inheritDoc}.
226 	 */
227 	public void setCref(ContainerRef cref) {
228 		c.setCref(cref);
229 	}
230 
231 	/***
232 	 * Wakes up all tasks which are blocking. All tasks in this.transactionDAO
233 	 * and this.blockOperations are waked up and added to the thread pool.
234 	 */
235 	private void wakeUpBlocking() {
236 		for (OperationTask t : this.transactionDAO.take()) {
237 			EventProcessingPool.getInstance().execute(t);
238 			if (logger.isDebugEnabled()) {
239 				logger.debug("wakeUpBlocking() waked up: " + t);
240 			}
241 		}
242 		// wake up all blocking operations.
243 		for (OperationTask t : this.blockingOperations.takeAll()) {
244 			EventProcessingPool.getInstance().execute(t);
245 			if (logger.isDebugEnabled()) {
246 				logger.debug("wakeUpBlocking() waked up: " + t);
247 			}
248 		}
249 	}
250 
251 	/***
252 	 * 
253 	 * {@inheritDoc}.
254 	 */
255 	public void rollback(Transaction txn) throws InvalidTransactionException,
256 			TransactionLockException, AspectRescheduleException,
257 			AspectNotOkException {
258 		this.transactionDAO.writeLock().lock();
259 		try {
260 			c.rollback(txn);
261 			this.wakeUpBlocking();
262 		} finally {
263 			this.transactionDAO.writeLock().unlock();
264 		}
265 	}
266 
267 	/***
268 	 * 
269 	 * {@inheritDoc}.
270 	 */
271 	public void commit(Transaction txn) throws TransactionLockException,
272 			InvalidTransactionException, AspectRescheduleException,
273 			AspectNotOkException {
274 
275 		this.transactionDAO.writeLock().lock();
276 		this.transactionDAO.commit(txn);
277 		try {
278 			c.commit(txn);
279 			this.wakeUpBlocking();
280 		} finally {
281 			this.transactionDAO.writeLock().unlock();
282 		}
283 	}
284 
285 	/***
286 	 * 
287 	 * {@inheritDoc}.
288 	 */
289 	public void updateTimeouts() {
290 		if (logger.isDebugEnabled()) {
291 			logger.debug("updateTimeouts() in Container: " + this.getCref());
292 		}
293 		this.rwLock.writeLock().lock();
294 		try {
295 			for (OperationTask task : this.blockingOperations.getAll()) {
296 				task.updateTimeout();
297 				if (task.timeoutExpired()) {
298 					// remove the expired task.
299 					this.blockingOperations.remove(task);
300 					task.setResult(new TimeoutExpiredException());
301 				}
302 			}
303 
304 			for (OperationTask task : this.transactionDAO.getAll()) {
305 				task.updateTimeout();
306 				if (task.timeoutExpired()) {
307 					// remove the expired task.
308 					this.transactionDAO.remove(task);
309 					task.setResult(new TimeoutExpiredException());
310 				}
311 			}
312 		} finally {
313 			if (this.rwLock != null) {
314 				this.rwLock.writeLock().unlock();
315 			}
316 		}
317 		if (logger.isDebugEnabled()) {
318 			logger.debug("updateTimeouts() updated timeouts.");
319 		}
320 	}
321 
322 	/***
323 	 * 
324 	 * {@inheritDoc}.
325 	 */
326 	public String addAspects(List<IPoint> points, IAspect aspect,
327 			Properties aspectContext) {
328 		return this.c.addAspects(points, aspect, aspectContext);
329 
330 	}
331 
332 	/***
333 	 * 
334 	 * {@inheritDoc}.
335 	 */
336 	public void removeAspect(IPoint p, URI uri, Properties aspectContext) {
337 		this.c.removeAspect(p, uri, aspectContext);
338 	}
339 
340 	/***
341 	 * 
342 	 * {@inheritDoc}.
343 	 */
344 	public void addCoordinator(Class<? extends Selector> s, ICoordinator coord) {
345 		this.c.addCoordinator(s, coord);
346 
347 	}
348 
349 	/***
350 	 * {@inheritDoc}
351 	 */
352 	// @Override
353 	public List<ICoordinator> getCoordinators() {
354 		return this.c.getCoordinators();
355 	}
356 
357 	/***
358 	 * {@inheritDoc}
359 	 */
360 	public void destroy() {
361 		for (OperationTask o : this.transactionDAO.take()) {
362 			o.setResult(new InvalidContainerException(
363 					"Container has been removed."));
364 		}
365 		// OPTMIZE if no copy is create a concurrent modification exception is
366 		// thrown
367 		for (OperationTask o : new ArrayList<OperationTask>(
368 				this.blockingOperations.getAll())) {
369 			o.setResult(new InvalidContainerException(
370 					"Container has been removed."));
371 		}
372 	}
373 
374 	/***
375 	 * {@inheritDoc}
376 	 */
377 	public int currentSize() {
378 		return c.currentSize();
379 	}
380 }