1 package org.xvsm.remote.tcpconnection;
2
3 import java.io.DataInputStream;
4 import java.io.DataOutputStream;
5 import java.io.EOFException;
6 import java.io.IOException;
7 import java.net.BindException;
8 import java.net.InetAddress;
9 import java.net.ServerSocket;
10 import java.net.Socket;
11 import java.net.SocketException;
12 import java.net.URI;
13 import java.net.URISyntaxException;
14 import java.net.UnknownHostException;
15 import java.util.HashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18
19 import org.apache.log4j.Logger;
20 import org.xvsm.internal.exceptions.FatalException;
21 import org.xvsm.internal.tasks.Task;
22 import org.xvsm.remote.TransportHandler;
23 import org.xvsm.remote.interfaces.IMarshaller;
24 import org.xvsm.remote.interfaces.ITransportListener;
25 import org.xvsm.remote.interfaces.ITransportSender;
26
27 public class TcpConnectedTransport implements ITransportListener<byte[]>,
28 ITransportSender<byte[]> {
29
30 private static Logger logger = Logger
31 .getLogger(TcpConnectedTransport.class);
32
33 /***
34 * The maximal amount of Threads in the TcpListenerPool.
35 */
36 private static final int MAX_THREADS = 10;
37
38 private HashMap<String, Socket> requestSockets = new HashMap<String, Socket>();
39
40 private HashMap<String, Socket> answerSockets = new HashMap<String, Socket>();
41 /***
42 * The ServerSocket of the TcpTransportListener.
43 */
44 private ServerSocket ssocket;
45
46 /***
47 * Indicates if the TcpTransportListener is running or not.
48 */
49 private boolean running = false;
50
51 /***
52 * The Marshaller that should be used.
53 */
54 private IMarshaller<byte[]> marshaller;
55
56 /***
57 * The URI where the listener should listen.
58 */
59 private URI uri;
60
61 private int socketPort;
62 /***
63 * The ExecutorService to execute the TcpListenerThreads.
64 */
65 private static ExecutorService service = Executors.newCachedThreadPool();
66
67
68
69 /***
70 * Default Constructor.
71 */
72 public TcpConnectedTransport() {
73 }
74
75 /***
76 * {@inheritDoc}
77 */
78 public void run() {
79 Thread.currentThread().setName(this.getClass().getName());
80 this.running = true;
81 try {
82 ssocket = new ServerSocket(uri.getPort());
83 this.socketPort = ssocket.getLocalPort();
84 this.uri = new URI(this.uri.getScheme() + "://"
85 + this.uri.getHost() + ":" + this.socketPort);
86 while (true) {
87 this.service.execute(new TcpListenerThread(ssocket.accept(),
88 true));
89 }
90 } catch (BindException e) {
91 throw new FatalException(e);
92 } catch (SocketException e) {
93
94 } catch (IOException e) {
95 throw new FatalException(e);
96 } catch (URISyntaxException e) {
97 throw new FatalException(e);
98 }
99 }
100
101 /***
102 * {@inheritDoc}
103 */
104
105 public void stop() {
106 this.running = false;
107 try {
108 if (ssocket != null) {
109 this.ssocket.close();
110 }
111 for (Socket s : requestSockets.values()) {
112 s.close();
113 }
114 for (Socket s : answerSockets.values()) {
115 s.close();
116 }
117 } catch (IOException e) {
118 throw new FatalException(e);
119 }
120 }
121
122 /***
123 * {@inheritDoc}
124 */
125
126 public void setMarshaller(IMarshaller<byte[]> m) {
127 this.marshaller = m;
128
129 }
130
131 public void setURI(URI nuri) {
132 this.uri = nuri;
133 }
134
135 /***
136 * {@inheritDoc}
137 */
138
139 public URI getUri() {
140 return this.uri;
141 }
142
143 public synchronized void sendRequest(URI uri, Task t) {
144 try {
145 byte[] bytes = this.marshaller.marshall(t);
146 InetAddress ip;
147
148 if (InetAddress.getLocalHost().getHostAddress().equals(
149 uri.getHost())) {
150 ip = InetAddress.getByName("localhost");
151 } else {
152 ip = InetAddress.getByName(uri.getHost());
153 }
154 int port = uri.getPort();
155
156 Socket socket;
157 boolean createdSocket = false;
158
159 if ((socket = requestSockets.get(ip.getHostAddress() + ":" + port)) == null
160 || socket.isClosed()) {
161 socket = new Socket(ip.getHostAddress(), port);
162 requestSockets.put(ip.getHostAddress() + ":" + port, socket);
163 createdSocket = true;
164 }
165
166
167 if (createdSocket) {
168 new TcpListenerThread(socket, false).start();
169 }
170
171 DataOutputStream out = new DataOutputStream(socket
172 .getOutputStream());
173 out.writeInt(bytes.length);
174 out.write(bytes);
175
176 out.flush();
177
178 } catch (UnknownHostException e) {
179 throw new FatalException(e);
180
181 } catch (IOException e) {
182 throw new FatalException("Exception during sending a request to "
183 + uri, e);
184
185 }
186
187 }
188
189 public synchronized void sendResponse(URI uri, Task t) {
190 try {
191 byte[] bytes = this.marshaller.marshall(t);
192 InetAddress ip;
193
194 int port = uri.getPort();
195
196 Socket socket = null;
197 if ((socket = answerSockets.get(uri.toString())) == null
198 || socket.isClosed()) {
199 if (InetAddress.getLocalHost().getHostAddress().equals(
200 uri.getHost())) {
201 ip = InetAddress.getByName("localhost");
202 } else {
203 ip = InetAddress.getByName(uri.getHost());
204 }
205 socket = new Socket(ip.getHostAddress(), port);
206 answerSockets.put(uri.toString(), socket);
207 }
208
209 DataOutputStream out = new DataOutputStream(socket
210 .getOutputStream());
211 out.writeInt(bytes.length);
212 out.write(bytes);
213 out.flush();
214 } catch (UnknownHostException e) {
215 throw new FatalException(e);
216
217 } catch (IOException e) {
218 throw new FatalException(e);
219
220 }
221 }
222
223 /***
224 *
225 * @author Christian Schreiber, Michael Proestler
226 *
227 */
228 class TcpListenerThread extends Thread {
229
230 /***
231 * The Socket over which the communication takes place.
232 */
233 private Socket socket;
234
235 private boolean newConnection;
236
237 private DataInputStream input;
238
239 private boolean listening;
240
241 /***
242 * Default Constructor.
243 *
244 * @param socket
245 * The socket over which the communication takes place.
246 */
247 public TcpListenerThread(Socket socket, boolean newConnection) {
248 this.socket = socket;
249 this.listening = true;
250 this.newConnection = newConnection;
251 try {
252 this.input = new DataInputStream(this.socket.getInputStream());
253 } catch (IOException e) {
254
255 e.printStackTrace();
256 }
257 }
258
259 /***
260 * {@inheritDoc}
261 */
262
263 public void run() {
264 Thread.currentThread().setName(this.getClass().getName());
265 try {
266 while (running && listening) {
267 try {
268
269 int dataSize = input.readInt();
270 int read;
271 int readTotal = 0;
272 byte[] data = new byte[dataSize];
273
274 while ((readTotal < dataSize)
275 && (read = input.read(data, readTotal, dataSize
276 - readTotal)) != -1) {
277 readTotal += read;
278 }
279 Task task = marshaller.unmarshall(data);
280
281 if (newConnection) {
282 if (task.getAnswerToContainer() != null) {
283 String key = "";
284 int lastSlash = task.getAnswerToContainer()
285 .toString().lastIndexOf("/containers");
286 key = task.getAnswerToContainer().toString()
287 .substring(0, lastSlash);
288 answerSockets.put(key, this.socket);
289 }
290 }
291 TransportHandler.getInstance().processTask(task);
292 } catch (SocketException e) {
293 if (newConnection) {
294 logger.debug("Connection reset by peer");
295 this.listening = false;
296 } else {
297 logger.warn("Connection reset by peer");
298 this.listening = false;
299 }
300 }
301 }
302 } catch (EOFException e) {
303 logger
304 .debug("Connection lost: "
305 + this.socket.getInetAddress());
306 } catch (IOException e) {
307 throw new FatalException(e);
308 } finally {
309 try {
310 this.socket.close();
311 } catch (IOException e) {
312 throw new FatalException(e);
313 }
314 }
315
316 }
317 }
318
319 /***
320 * {@inheritDoc}
321 */
322 @Override
323 public String toString() {
324 return "TcpConnectedTransport: MAX_THREADS:"
325 + MAX_THREADS
326 + " URI: "
327 + this.uri
328 + ((this.ssocket == null) ? "" : " port: "
329 + this.ssocket.getLocalPort()) + " Marshaller: "
330 + this.marshaller;
331 }
332 }