View Javadoc

1   package org.xvsm.remote.freepastry;
2   
3   import java.net.URI;
4   import java.util.List;
5   import java.util.Properties;
6   
7   import org.xvsm.core.AtomicEntry;
8   import org.xvsm.core.Capi;
9   import org.xvsm.core.ContainerRef;
10  import org.xvsm.core.Entry;
11  import org.xvsm.core.aspect.LocalAspect;
12  import org.xvsm.interfaces.ICapi;
13  import org.xvsm.internal.exceptions.AspectNotOkException;
14  import org.xvsm.internal.exceptions.AspectRescheduleException;
15  import org.xvsm.internal.exceptions.AspectSkipException;
16  import org.xvsm.internal.exceptions.XCoreException;
17  import org.xvsm.lookup.LookupManager;
18  import org.xvsm.selectors.KeySelector;
19  import org.xvsm.selectors.Selector;
20  import org.xvsm.transactions.Transaction;
21  
22  import rice.p2p.commonapi.NodeHandleSet;
23  
24  /***
25   * Aspect, which writes the values of a write operation into the replicas,
26   * before they are written into the container.
27   * 
28   * @author Hannu-Daniel Goiss
29   */
30  public class MyAspect extends LocalAspect {
31  	private static final long serialVersionUID = 974266238027304067L;
32  
33  	@Override
34  	public void preDestroy(ContainerRef cref, Transaction tx,
35  			List<Selector> selectors, Properties contextProperties)
36  			throws AspectNotOkException, AspectRescheduleException,
37  			AspectSkipException {
38  		super.preDestroy(cref, tx, selectors, contextProperties);
39  	}
40  
41  	/*
42  	 * (non-Javadoc)
43  	 * 
44  	 * @see org.xvsm.core.aspect.LocalAspect#preShift(org.xvsm.core.ContainerRef,
45  	 *      org.xvsm.transactions.Transaction, java.util.List,
46  	 *      java.util.Properties)
47  	 */
48  	@Override
49  	public void preShift(ContainerRef cref, Transaction tx,
50  			List<Entry> entries, Properties contextProperties)
51  			throws AspectNotOkException, AspectRescheduleException,
52  			AspectSkipException {
53  		super.preShift(cref, tx, entries, contextProperties);
54  	}
55  
56  	/*
57  	 * (non-Javadoc)
58  	 * 
59  	 * @see org.xvsm.core.aspect.LocalAspect#preTake(org.xvsm.core.ContainerRef,
60  	 *      org.xvsm.transactions.Transaction, java.util.List,
61  	 *      java.util.Properties)
62  	 */
63  	@Override
64  	public void preTake(ContainerRef cref, Transaction tx,
65  			List<Selector> selectors, Properties contextProperties)
66  			throws AspectNotOkException, AspectRescheduleException,
67  			AspectSkipException {
68  		super.preTake(cref, tx, selectors, contextProperties);
69  	}
70  
71  	/*
72  	 * (non-Javadoc)
73  	 * 
74  	 * @see org.xvsm.core.aspect.LocalAspect#preWrite(org.xvsm.core.ContainerRef,
75  	 *      org.xvsm.transactions.Transaction, java.util.List, int,
76  	 *      java.util.Properties)
77  	 */
78  	@Override
79  	public void preWrite(ContainerRef cref, Transaction tx,
80  			List<Entry> entries, int retrycount, Properties contextProperties)
81  			throws AspectNotOkException, AspectRescheduleException,
82  			AspectSkipException {
83  		super.preWrite(cref, tx, entries, retrycount, contextProperties);
84  
85  		if (properties.getProperty("stop2") != null)
86  			return;
87  
88  		URI site = null;
89  
90  		// ICapi capi = TransportManager.getCapi();
91  		ICapi capi;
92  		try {
93  			// TODO: ersetzen
94  			capi = new Capi();
95  		} catch (XCoreException xce) {
96  			throw new RuntimeException("this should not happen");
97  		}
98  		Transaction tx2 = null;
99  		try {
100 
101 			String containerName = this.getContainerName(capi, cref);
102 
103 			FreePastryConnection fp = FreePastryConnection.getInstance();
104 			NodeHandleSet handles = fp.getEndpoint().replicaSet(
105 					fp.getPastryIdFactory().buildId(containerName),
106 					fp.getNumberOfContainers()); // 2. param number of
107 			// replicas
108 			for (int i = 0; i < handles.size(); i++) {
109 				String handleString = handles.getHandle(i).toString();
110 				String ip = handleString.substring(handleString
111 						.lastIndexOf("/") + 1, handleString.lastIndexOf(":"));
112 				int port = Integer.parseInt(handleString.substring(handleString
113 						.lastIndexOf(":") + 1,
114 						handleString.lastIndexOf(":") + 5));
115 
116 				port = port + 1000;
117 				if (cref.asURI().toString().contains("" + port) == false) {
118 					System.err.println("IP: " + ip);
119 					System.err.println("Port: " + port);
120 
121 					site = new URI("tcpjava://" + ip + ":" + port);
122 				}
123 			}
124 
125 			ContainerRef nCref = capi
126 					.lookupContainer(null, site, containerName);
127 
128 			if (site == null)
129 				return;
130 
131 			tx2 = capi.createTransaction(site, 0);
132 
133 			for (Entry e : entries) {
134 
135 				if (properties.getProperty("stop") != null) {
136 					Properties prop = new Properties();
137 					prop.setProperty("stop2", "true");
138 					capi.setAspectContext(prop);
139 				} else {
140 					Properties prop = new Properties();
141 					prop.setProperty("stop", "true");
142 					capi.setAspectContext(prop);
143 				}
144 
145 				capi.write(nCref, 0L, tx2, e);
146 			}
147 			capi.commitTransaction(tx2);
148 		} catch (Exception e) {
149 			try {
150 				capi.rollbackTransaction(tx2);
151 			} catch (Exception e2) {
152 
153 			}
154 			// e.printStackTrace();
155 			throw new AspectNotOkException(e);
156 		}
157 
158 	}
159 
160 	private String getContainerName(ICapi capi, ContainerRef cref)
161 			throws XCoreException {
162 		ContainerRef metaCref = LookupManager.lookupMetaContainer(cref);
163 		String containerName = (String) ((AtomicEntry) capi.read(metaCref, 0,
164 				null, new KeySelector<String>("system", "name"))[0]).getValue();
165 
166 		return containerName;
167 	}
168 }