View Javadoc

1   package org.xvsm.remote.tcpconnection;
2   
3   import java.io.DataInputStream;
4   import java.io.DataOutputStream;
5   import java.io.EOFException;
6   import java.io.IOException;
7   import java.net.BindException;
8   import java.net.InetAddress;
9   import java.net.ServerSocket;
10  import java.net.Socket;
11  import java.net.SocketException;
12  import java.net.URI;
13  import java.net.URISyntaxException;
14  import java.net.UnknownHostException;
15  import java.util.HashMap;
16  import java.util.concurrent.ExecutorService;
17  import java.util.concurrent.Executors;
18  
19  import org.apache.log4j.Logger;
20  import org.xvsm.internal.exceptions.FatalException;
21  import org.xvsm.internal.tasks.Task;
22  import org.xvsm.remote.TransportHandler;
23  import org.xvsm.remote.interfaces.IMarshaller;
24  import org.xvsm.remote.interfaces.ITransportListener;
25  import org.xvsm.remote.interfaces.ITransportSender;
26  
27  public class TcpConnectedTransport implements ITransportListener<byte[]>,
28  		ITransportSender<byte[]> {
29  
30  	private static Logger logger = Logger
31  			.getLogger(TcpConnectedTransport.class);
32  
33  	/***
34  	 * The maximal amount of Threads in the TcpListenerPool.
35  	 */
36  	private static final int MAX_THREADS = 10; // TODO: Get MAX_THREADS from
37  
38  	private HashMap<String, Socket> requestSockets = new HashMap<String, Socket>();
39  
40  	private HashMap<String, Socket> answerSockets = new HashMap<String, Socket>();
41  	/***
42  	 * The ServerSocket of the TcpTransportListener.
43  	 */
44  	private ServerSocket ssocket;
45  
46  	/***
47  	 * Indicates if the TcpTransportListener is running or not.
48  	 */
49  	private boolean running = false;
50  
51  	/***
52  	 * The Marshaller that should be used.
53  	 */
54  	private IMarshaller<byte[]> marshaller;
55  
56  	/***
57  	 * The URI where the listener should listen.
58  	 */
59  	private URI uri;
60  
61  	private int socketPort;
62  	/***
63  	 * The ExecutorService to execute the TcpListenerThreads.
64  	 */
65  	private static ExecutorService service = Executors.newCachedThreadPool();
66  
67  	// newFixedThreadPool(MAX_THREADS);
68  
69  	/***
70  	 * Default Constructor.
71  	 */
72  	public TcpConnectedTransport() {
73  	}
74  
75  	/***
76  	 * {@inheritDoc}
77  	 */
78  	public void run() {
79  		Thread.currentThread().setName(this.getClass().getName());
80  		this.running = true;
81  		try {
82  			ssocket = new ServerSocket(uri.getPort());
83  			this.socketPort = ssocket.getLocalPort();
84  			this.uri = new URI(this.uri.getScheme() + "://"
85  					+ this.uri.getHost() + ":" + this.socketPort);
86  			while (true) {
87  				this.service.execute(new TcpListenerThread(ssocket.accept(),
88  						true));
89  			}
90  		} catch (BindException e) {
91  			throw new FatalException(e);
92  		} catch (SocketException e) {
93  			// Is thrown when the socket gets closed -> ignore
94  		} catch (IOException e) {
95  			throw new FatalException(e);
96  		} catch (URISyntaxException e) {
97  			throw new FatalException(e);
98  		}
99  	}
100 
101 	/***
102 	 * {@inheritDoc}
103 	 */
104 	// @Override
105 	public void stop() {
106 		this.running = false;
107 		try {
108 			if (ssocket != null) {
109 				this.ssocket.close();
110 			}
111 			for (Socket s : requestSockets.values()) {
112 				s.close();
113 			}
114 			for (Socket s : answerSockets.values()) {
115 				s.close();
116 			}
117 		} catch (IOException e) {
118 			throw new FatalException(e);
119 		}
120 	}
121 
122 	/***
123 	 * {@inheritDoc}
124 	 */
125 	// @Override
126 	public void setMarshaller(IMarshaller<byte[]> m) {
127 		this.marshaller = m;
128 
129 	}
130 
131 	public void setURI(URI nuri) {
132 		this.uri = nuri;
133 	}
134 
135 	/***
136 	 * {@inheritDoc}
137 	 */
138 	// @Override
139 	public URI getUri() {
140 		return this.uri;
141 	}
142 
143 	public synchronized void sendRequest(URI uri, Task t) {
144 		try {
145 			byte[] bytes = this.marshaller.marshall(t);
146 			InetAddress ip;
147 
148 			if (InetAddress.getLocalHost().getHostAddress().equals(
149 					uri.getHost())) {
150 				ip = InetAddress.getByName("localhost");
151 			} else {
152 				ip = InetAddress.getByName(uri.getHost());
153 			}
154 			int port = uri.getPort();
155 
156 			Socket socket;
157 			boolean createdSocket = false;
158 
159 			if ((socket = requestSockets.get(ip.getHostAddress() + ":" + port)) == null
160 					|| socket.isClosed()) {
161 				socket = new Socket(ip.getHostAddress(), port);
162 				requestSockets.put(ip.getHostAddress() + ":" + port, socket);
163 				createdSocket = true;
164 			}
165 
166 			// Start a Listener on this socket
167 			if (createdSocket) {
168 				new TcpListenerThread(socket, false).start();
169 			}
170 
171 			DataOutputStream out = new DataOutputStream(socket
172 					.getOutputStream());
173 			out.writeInt(bytes.length);
174 			out.write(bytes);
175 
176 			out.flush();
177 
178 		} catch (UnknownHostException e) {
179 			throw new FatalException(e);
180 
181 		} catch (IOException e) {
182 			throw new FatalException("Exception during sending a request to "
183 					+ uri, e);
184 
185 		}
186 
187 	}
188 
189 	public synchronized void sendResponse(URI uri, Task t) {
190 		try {
191 			byte[] bytes = this.marshaller.marshall(t);
192 			InetAddress ip;
193 
194 			int port = uri.getPort();
195 
196 			Socket socket = null;
197 			if ((socket = answerSockets.get(uri.toString())) == null
198 					|| socket.isClosed()) {
199 				if (InetAddress.getLocalHost().getHostAddress().equals(
200 						uri.getHost())) {
201 					ip = InetAddress.getByName("localhost");
202 				} else {
203 					ip = InetAddress.getByName(uri.getHost());
204 				}
205 				socket = new Socket(ip.getHostAddress(), port);
206 				answerSockets.put(uri.toString(), socket);
207 			}
208 
209 			DataOutputStream out = new DataOutputStream(socket
210 					.getOutputStream());
211 			out.writeInt(bytes.length);
212 			out.write(bytes);
213 			out.flush();
214 		} catch (UnknownHostException e) {
215 			throw new FatalException(e);
216 
217 		} catch (IOException e) {
218 			throw new FatalException(e);
219 
220 		}
221 	}
222 
223 	/***
224 	 * 
225 	 * @author Christian Schreiber, Michael Proestler
226 	 * 
227 	 */
228 	class TcpListenerThread extends Thread {
229 
230 		/***
231 		 * The Socket over which the communication takes place.
232 		 */
233 		private Socket socket;
234 
235 		private boolean newConnection;
236 
237 		private DataInputStream input;
238 
239 		private boolean listening;
240 
241 		/***
242 		 * Default Constructor.
243 		 * 
244 		 * @param socket
245 		 *            The socket over which the communication takes place.
246 		 */
247 		public TcpListenerThread(Socket socket, boolean newConnection) {
248 			this.socket = socket;
249 			this.listening = true;
250 			this.newConnection = newConnection;
251 			try {
252 				this.input = new DataInputStream(this.socket.getInputStream());
253 			} catch (IOException e) {
254 				// TODO Auto-generated catch block
255 				e.printStackTrace();
256 			}
257 		}
258 
259 		/***
260 		 * {@inheritDoc}
261 		 */
262 		// @Override
263 		public void run() {
264 			Thread.currentThread().setName(this.getClass().getName());
265 			try {
266 				while (running && listening) {
267 					try {
268 
269 						int dataSize = input.readInt();
270 						int read;
271 						int readTotal = 0;
272 						byte[] data = new byte[dataSize];
273 
274 						while ((readTotal < dataSize)
275 								&& (read = input.read(data, readTotal, dataSize
276 										- readTotal)) != -1) {
277 							readTotal += read;
278 						}
279 						Task task = marshaller.unmarshall(data);
280 
281 						if (newConnection) {
282 							if (task.getAnswerToContainer() != null) {
283 								String key = "";
284 								int lastSlash = task.getAnswerToContainer()
285 										.toString().lastIndexOf("/containers");
286 								key = task.getAnswerToContainer().toString()
287 										.substring(0, lastSlash);
288 								answerSockets.put(key, this.socket);
289 							}
290 						}
291 						TransportHandler.getInstance().processTask(task);
292 					} catch (SocketException e) {
293 						if (newConnection) {
294 							logger.debug("Connection reset by peer");
295 							this.listening = false;
296 						} else {
297 							logger.warn("Connection reset by peer");
298 							this.listening = false;
299 						}
300 					}
301 				}
302 			} catch (EOFException e) {
303 				logger
304 						.debug("Connection lost: "
305 								+ this.socket.getInetAddress());
306 			} catch (IOException e) {
307 				throw new FatalException(e);
308 			} finally {
309 				try {
310 					this.socket.close();
311 				} catch (IOException e) {
312 					throw new FatalException(e);
313 				}
314 			}
315 
316 		}
317 	}
318 
319 	/***
320 	 * {@inheritDoc}
321 	 */
322 	@Override
323 	public String toString() {
324 		return "TcpConnectedTransport: MAX_THREADS:"
325 				+ MAX_THREADS
326 				+ " URI: "
327 				+ this.uri
328 				+ ((this.ssocket == null) ? "" : " port: "
329 						+ this.ssocket.getLocalPort()) + " Marshaller: "
330 				+ this.marshaller;
331 	}
332 }