1 package org.xvsm.remote.freepastry;
2
3 import java.net.URI;
4 import java.util.List;
5 import java.util.Properties;
6
7 import org.xvsm.core.AtomicEntry;
8 import org.xvsm.core.Capi;
9 import org.xvsm.core.ContainerRef;
10 import org.xvsm.core.Entry;
11 import org.xvsm.core.aspect.LocalAspect;
12 import org.xvsm.interfaces.ICapi;
13 import org.xvsm.internal.exceptions.AspectNotOkException;
14 import org.xvsm.internal.exceptions.AspectRescheduleException;
15 import org.xvsm.internal.exceptions.AspectSkipException;
16 import org.xvsm.internal.exceptions.XCoreException;
17 import org.xvsm.lookup.LookupManager;
18 import org.xvsm.selectors.KeySelector;
19 import org.xvsm.selectors.Selector;
20 import org.xvsm.transactions.Transaction;
21
22 import rice.p2p.commonapi.NodeHandleSet;
23
24 /***
25 * Aspect, which writes the values of a write operation into the replicas,
26 * before they are written into the container.
27 *
28 * @author Hannu-Daniel Goiss
29 */
30 public class MyAspect extends LocalAspect {
31 private static final long serialVersionUID = 974266238027304067L;
32
33 @Override
34 public void preDestroy(ContainerRef cref, Transaction tx,
35 List<Selector> selectors, Properties contextProperties)
36 throws AspectNotOkException, AspectRescheduleException,
37 AspectSkipException {
38 super.preDestroy(cref, tx, selectors, contextProperties);
39 }
40
41
42
43
44
45
46
47
48 @Override
49 public void preShift(ContainerRef cref, Transaction tx,
50 List<Entry> entries, Properties contextProperties)
51 throws AspectNotOkException, AspectRescheduleException,
52 AspectSkipException {
53 super.preShift(cref, tx, entries, contextProperties);
54 }
55
56
57
58
59
60
61
62
63 @Override
64 public void preTake(ContainerRef cref, Transaction tx,
65 List<Selector> selectors, Properties contextProperties)
66 throws AspectNotOkException, AspectRescheduleException,
67 AspectSkipException {
68 super.preTake(cref, tx, selectors, contextProperties);
69 }
70
71
72
73
74
75
76
77
78 @Override
79 public void preWrite(ContainerRef cref, Transaction tx,
80 List<Entry> entries, int retrycount, Properties contextProperties)
81 throws AspectNotOkException, AspectRescheduleException,
82 AspectSkipException {
83 super.preWrite(cref, tx, entries, retrycount, contextProperties);
84
85 if (properties.getProperty("stop2") != null)
86 return;
87
88 URI site = null;
89
90
91 ICapi capi;
92 try {
93
94 capi = new Capi();
95 } catch (XCoreException xce) {
96 throw new RuntimeException("this should not happen");
97 }
98 Transaction tx2 = null;
99 try {
100
101 String containerName = this.getContainerName(capi, cref);
102
103 FreePastryConnection fp = FreePastryConnection.getInstance();
104 NodeHandleSet handles = fp.getEndpoint().replicaSet(
105 fp.getPastryIdFactory().buildId(containerName),
106 fp.getNumberOfContainers());
107
108 for (int i = 0; i < handles.size(); i++) {
109 String handleString = handles.getHandle(i).toString();
110 String ip = handleString.substring(handleString
111 .lastIndexOf("/") + 1, handleString.lastIndexOf(":"));
112 int port = Integer.parseInt(handleString.substring(handleString
113 .lastIndexOf(":") + 1,
114 handleString.lastIndexOf(":") + 5));
115
116 port = port + 1000;
117 if (cref.asURI().toString().contains("" + port) == false) {
118 System.err.println("IP: " + ip);
119 System.err.println("Port: " + port);
120
121 site = new URI("tcpjava://" + ip + ":" + port);
122 }
123 }
124
125 ContainerRef nCref = capi
126 .lookupContainer(null, site, containerName);
127
128 if (site == null)
129 return;
130
131 tx2 = capi.createTransaction(site, 0);
132
133 for (Entry e : entries) {
134
135 if (properties.getProperty("stop") != null) {
136 Properties prop = new Properties();
137 prop.setProperty("stop2", "true");
138 capi.setAspectContext(prop);
139 } else {
140 Properties prop = new Properties();
141 prop.setProperty("stop", "true");
142 capi.setAspectContext(prop);
143 }
144
145 capi.write(nCref, 0L, tx2, e);
146 }
147 capi.commitTransaction(tx2);
148 } catch (Exception e) {
149 try {
150 capi.rollbackTransaction(tx2);
151 } catch (Exception e2) {
152
153 }
154
155 throw new AspectNotOkException(e);
156 }
157
158 }
159
160 private String getContainerName(ICapi capi, ContainerRef cref)
161 throws XCoreException {
162 ContainerRef metaCref = LookupManager.lookupMetaContainer(cref);
163 String containerName = (String) ((AtomicEntry) capi.read(metaCref, 0,
164 null, new KeySelector<String>("system", "name"))[0]).getValue();
165
166 return containerName;
167 }
168 }