1 /***
2 *
3 */
4 package org.xvsm.internal;
5
6 import java.net.URI;
7 import java.util.ArrayList;
8 import java.util.List;
9 import java.util.Properties;
10 import java.util.concurrent.locks.Lock;
11 import java.util.concurrent.locks.ReadWriteLock;
12 import java.util.concurrent.locks.ReentrantReadWriteLock;
13
14 import org.apache.log4j.Logger;
15 import org.xvsm.core.ContainerRef;
16 import org.xvsm.core.Entry;
17 import org.xvsm.core.VoidEntry;
18 import org.xvsm.core.aspect.IAspect;
19 import org.xvsm.core.aspect.IPoint;
20 import org.xvsm.interfaces.ICapi;
21 import org.xvsm.interfaces.ICoordinator;
22 import org.xvsm.interfaces.container.IContainer;
23 import org.xvsm.interfaces.container.ITransactionLayer;
24 import org.xvsm.internal.exceptions.AspectNotOkException;
25 import org.xvsm.internal.exceptions.AspectRescheduleException;
26 import org.xvsm.internal.exceptions.ContainerFullException;
27 import org.xvsm.internal.exceptions.CountNotMetException;
28 import org.xvsm.internal.exceptions.FatalException;
29 import org.xvsm.internal.exceptions.InvalidContainerException;
30 import org.xvsm.internal.exceptions.InvalidTransactionException;
31 import org.xvsm.internal.exceptions.TimeoutExpiredException;
32 import org.xvsm.internal.exceptions.TransactionLockException;
33 import org.xvsm.internal.tasks.OperationTask;
34 import org.xvsm.internal.tasks.dao.IOperationTaskDAO;
35 import org.xvsm.internal.tasks.dao.ITransactionDAO;
36 import org.xvsm.internal.tasks.dao.inmemory.OperationTaskDAO;
37 import org.xvsm.internal.tasks.dao.inmemory.TransactionDAO;
38 import org.xvsm.selectors.Selector;
39 import org.xvsm.transactions.Transaction;
40
41 /***
42 * @author Christian Schreiber, Michael Proestler
43 *
44 */
45 public class BlockingLayer implements IContainer {
46
47 /***
48 * the logger.
49 */
50 private static Logger logger = Logger.getLogger(BlockingLayer.class);
51
52 /***
53 * The transaction layer for this container.
54 */
55 private ITransactionLayer c;
56
57 /***
58 * The lock.
59 */
60 private ReadWriteLock rwLock = new ReentrantReadWriteLock();
61
62 /***
63 * Contains all tasks which block because another transaction uses the
64 * container.
65 */
66 private ITransactionDAO transactionDAO = new TransactionDAO();
67
68 /***
69 * All blocking operations.
70 *
71 */
72
73 private IOperationTaskDAO blockingOperations = new OperationTaskDAO();
74
75 /***
76 * Creates a new BlockingLayer.
77 *
78 * @param size
79 * the size of the new container.
80 */
81 public BlockingLayer(int size) {
82 c = new TransactionLayer(size);
83 }
84
85 /***
86 * {@inheritDoc}.
87 */
88 public Object execute(OperationTask task) throws ContainerFullException,
89 CountNotMetException, TransactionLockException,
90 AspectNotOkException, AspectRescheduleException {
91 if (logger.isDebugEnabled()) {
92 logger.debug("execute() " + task + " -- " + this.getCref());
93 }
94 Lock lock = null;
95 Object result = null;
96
97 try {
98 switch (task.getType()) {
99 case READ:
100 lock = rwLock.readLock();
101 lock.lock();
102 result = c.read(task.getTx(), task.getSelectors(), task
103 .getRetrycount(), task.getAspectContext());
104 break;
105 case WRITE:
106 lock = rwLock.writeLock();
107 lock.lock();
108 c.write(task.getEntries(), task.getTx(), task.getRetrycount(),
109 task.getAspectContext());
110 result = new VoidEntry();
111 break;
112 case TAKE:
113 lock = rwLock.writeLock();
114 lock.lock();
115 result = c.take(false, task.getTx(), task.getSelectors(), task
116 .getRetrycount(), task.getAspectContext());
117 break;
118 case SHIFT:
119
120 lock = rwLock.writeLock();
121 lock.lock();
122 c.shift(task.getEntries(), task.getTx(), task
123 .getAspectContext());
124 result = new VoidEntry();
125 break;
126 case DESTROY:
127
128 lock = rwLock.writeLock();
129 lock.lock();
130 List<Entry> deleted = c.take(true, task.getTx(), task
131 .getSelectors(), task.getRetrycount(), task
132 .getAspectContext());
133 task.setDeleted(deleted);
134 result = new VoidEntry();
135 break;
136 default:
137 break;
138 }
139
140 task.setRetrycount(task.getRetrycount() + 1);
141
142 this.wakeUpBlocking();
143 if (logger.isDebugEnabled()) {
144 logger.debug("execute() executed task result: " + result
145 + " -- " + this.getCref());
146 }
147 return result;
148 } catch (ContainerFullException e) {
149 if (logger.isDebugEnabled()) {
150 logger.debug("execute() NoPlaceLeftException was thrown."
151 + " Adding task to blocking operations -- "
152 + this.getCref());
153 }
154 this.blockOperationTask(task, e);
155 throw e;
156 } catch (CountNotMetException e) {
157 if (logger.isDebugEnabled()) {
158 logger.debug("execute() CountNotMetException was thrown."
159 + " Adding task to blocking operations -- "
160 + this.getCref());
161 }
162 this.blockOperationTask(task, e);
163 throw e;
164 } catch (TransactionLockException e) {
165 if (!this.transactionDAO.add(task, e.getReason())) {
166
167 EventProcessingPool.getInstance().execute(task);
168 }
169 throw e;
170 } catch (InvalidTransactionException e) {
171 return e;
172 } catch (AspectRescheduleException e) {
173
174 task.setRetrycount(task.getRetrycount() + 1);
175 throw e;
176 } catch (Exception e) {
177 return new FatalException(e);
178 } finally {
179
180 if (lock != null) {
181 lock.unlock();
182 lock = null;
183 }
184 }
185 }
186
187 /***
188 * Adds the Task to the blocking Tasks if the timeout is INFINITE or > 0.
189 * Otherwise the result of the task is set to e.
190 *
191 * @param task
192 * the task to block.
193 * @param e
194 * the exception that will be set as result, if the timeout
195 * expired.
196 */
197 private void blockOperationTask(OperationTask task, Exception e) {
198
199
200 if (task.getTimeout() == ICapi.INFINITE_TIMEOUT
201 || task.getTimeout() > 0) {
202 blockingOperations.add(task);
203 if (logger.isDebugEnabled()) {
204 logger.debug("blockOperationTask() added task");
205 }
206 } else {
207 if (logger.isDebugEnabled()) {
208 logger.debug("blockOperationTask() timeout is 0 => "
209 + "set the exception " + e + " as result");
210 }
211 task.setResult(e);
212 }
213 }
214
215 /***
216 *
217 * {@inheritDoc}.
218 */
219 public ContainerRef getCref() {
220 return c.getCref();
221 }
222
223 /***
224 *
225 * {@inheritDoc}.
226 */
227 public void setCref(ContainerRef cref) {
228 c.setCref(cref);
229 }
230
231 /***
232 * Wakes up all tasks which are blocking. All tasks in this.transactionDAO
233 * and this.blockOperations are waked up and added to the thread pool.
234 */
235 private void wakeUpBlocking() {
236 for (OperationTask t : this.transactionDAO.take()) {
237 EventProcessingPool.getInstance().execute(t);
238 if (logger.isDebugEnabled()) {
239 logger.debug("wakeUpBlocking() waked up: " + t);
240 }
241 }
242
243 for (OperationTask t : this.blockingOperations.takeAll()) {
244 EventProcessingPool.getInstance().execute(t);
245 if (logger.isDebugEnabled()) {
246 logger.debug("wakeUpBlocking() waked up: " + t);
247 }
248 }
249 }
250
251 /***
252 *
253 * {@inheritDoc}.
254 */
255 public void rollback(Transaction txn) throws InvalidTransactionException,
256 TransactionLockException, AspectRescheduleException,
257 AspectNotOkException {
258 this.transactionDAO.writeLock().lock();
259 try {
260 c.rollback(txn);
261 this.wakeUpBlocking();
262 } finally {
263 this.transactionDAO.writeLock().unlock();
264 }
265 }
266
267 /***
268 *
269 * {@inheritDoc}.
270 */
271 public void commit(Transaction txn) throws TransactionLockException,
272 InvalidTransactionException, AspectRescheduleException,
273 AspectNotOkException {
274
275 this.transactionDAO.writeLock().lock();
276 this.transactionDAO.commit(txn);
277 try {
278 c.commit(txn);
279 this.wakeUpBlocking();
280 } finally {
281 this.transactionDAO.writeLock().unlock();
282 }
283 }
284
285 /***
286 *
287 * {@inheritDoc}.
288 */
289 public void updateTimeouts() {
290 if (logger.isDebugEnabled()) {
291 logger.debug("updateTimeouts() in Container: " + this.getCref());
292 }
293 this.rwLock.writeLock().lock();
294 try {
295 for (OperationTask task : this.blockingOperations.getAll()) {
296 task.updateTimeout();
297 if (task.timeoutExpired()) {
298
299 this.blockingOperations.remove(task);
300 task.setResult(new TimeoutExpiredException());
301 }
302 }
303
304 for (OperationTask task : this.transactionDAO.getAll()) {
305 task.updateTimeout();
306 if (task.timeoutExpired()) {
307
308 this.transactionDAO.remove(task);
309 task.setResult(new TimeoutExpiredException());
310 }
311 }
312 } finally {
313 if (this.rwLock != null) {
314 this.rwLock.writeLock().unlock();
315 }
316 }
317 if (logger.isDebugEnabled()) {
318 logger.debug("updateTimeouts() updated timeouts.");
319 }
320 }
321
322 /***
323 *
324 * {@inheritDoc}.
325 */
326 public String addAspects(List<IPoint> points, IAspect aspect,
327 Properties aspectContext) {
328 return this.c.addAspects(points, aspect, aspectContext);
329
330 }
331
332 /***
333 *
334 * {@inheritDoc}.
335 */
336 public void removeAspect(IPoint p, URI uri, Properties aspectContext) {
337 this.c.removeAspect(p, uri, aspectContext);
338 }
339
340 /***
341 *
342 * {@inheritDoc}.
343 */
344 public void addCoordinator(Class<? extends Selector> s, ICoordinator coord) {
345 this.c.addCoordinator(s, coord);
346
347 }
348
349 /***
350 * {@inheritDoc}
351 */
352
353 public List<ICoordinator> getCoordinators() {
354 return this.c.getCoordinators();
355 }
356
357 /***
358 * {@inheritDoc}
359 */
360 public void destroy() {
361 for (OperationTask o : this.transactionDAO.take()) {
362 o.setResult(new InvalidContainerException(
363 "Container has been removed."));
364 }
365
366
367 for (OperationTask o : new ArrayList<OperationTask>(
368 this.blockingOperations.getAll())) {
369 o.setResult(new InvalidContainerException(
370 "Container has been removed."));
371 }
372 }
373
374 /***
375 * {@inheritDoc}
376 */
377 public int currentSize() {
378 return c.currentSize();
379 }
380 }