fhanik 2003/12/17 20:20:15 Modified: modules/cluster/src/share/org/apache/catalina/cluster/io ObjectReader.java modules/cluster/src/share/org/apache/catalina/cluster/mcast McastService.java McastServiceImpl.java modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationListener.java ReplicationTransmitter.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java Added: modules/cluster/src/share/org/apache/catalina/cluster/io Jdk13ObjectReader.java modules/cluster/src/share/org/apache/catalina/cluster/tcp Jdk13ReplicationListener.java Log: adding in a regular io cluster listener to be used with JDK1.3 Revision Changes Path 1.2 +14 -17 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java Index: ObjectReader.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- ObjectReader.java 19 Feb 2003 20:32:10 -0000 1.1 +++ ObjectReader.java 18 Dec 2003 04:20:14 -0000 1.2 @@ -102,25 +102,22 @@ return this.channel; } - public boolean append(byte[] data,int off,int len) { + public int append(byte[] data,int off,int len) { boolean result = false; buffer.append(data,off,len); - if ( buffer.doesPackageExist() ) { + int pkgCnt = 0; + boolean pkgExists = buffer.doesPackageExist(); + while ( pkgExists ) { byte[] b = buffer.extractPackage(true); callback.messageDataReceived(b); - result = true; + pkgCnt++; + pkgExists = buffer.doesPackageExist(); }//end if - return result; + return pkgCnt; } - public boolean execute() { - boolean result = false; - if ( buffer.doesPackageExist() ) { - byte[] data = buffer.extractPackage(true); - callback.messageDataReceived(data); - result = true; - }//end if - return result; + public int execute() { + return append(new byte[0],0,0); } public int write(ByteBuffer buf) @@ -131,4 +128,4 @@ -} \ No newline at end of file +} 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java Index: Jdk13ObjectReader.java =================================================================== /* * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java,v 1.1 2003/12/18 04:20:14 fhanik Exp $ * $Revision: 1.1 $ * $Date: 2003/12/18 04:20:14 $ * * ==================================================================== * * The Apache Software License, Version 1.1 * * Copyright (c) 1999 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, if * any, must include the following acknowlegement: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowlegement may appear in the software itself, * if and wherever such third-party acknowlegements normally appear. * * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software * Foundation" must not be used to endorse or promote products derived * from this software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache" * nor may "Apache" appear in their names without prior written * permission of the Apache Group. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * [Additional notices, if required by prior licensing conditions] * */ package org.apache.catalina.cluster.io; /** * The object reader object is an object used in conjunction with * java.nio TCP messages. This object stores the message bytes in a * <code>XByteBuffer</code> until a full package has been received. * When a full package has been received, the append method will call messageDataReceived * on the callback object associated with this object reader.<BR> * This object uses an XByteBuffer which is an extendable object buffer that also allows * for message encoding and decoding. * * @author Filip Hanik * @version $Revision: 1.1 $, $Date: 2003/12/18 04:20:14 $ */ import java.net.Socket; import java.nio.ByteBuffer; import java.io.IOException; import org.apache.catalina.cluster.io.XByteBuffer; public class Jdk13ObjectReader { private Socket socket; private ListenCallback callback; private XByteBuffer buffer; public Jdk13ObjectReader( Socket socket, ListenCallback callback ) { this.socket = socket; this.callback = callback; this.buffer = new XByteBuffer(); } public int append(byte[] data,int off,int len) { boolean result = false; buffer.append(data,off,len); int pkgCnt = 0; boolean pkgExists = buffer.doesPackageExist(); while ( pkgExists ) { byte[] b = buffer.extractPackage(true); callback.messageDataReceived(b); pkgCnt++; pkgExists = buffer.doesPackageExist(); }//end if return pkgCnt; } public int execute() { return append(new byte[0],0,0); } public int write(byte[] data) throws java.io.IOException { socket.getOutputStream().write(data); return 0; } } 1.4 +6 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java Index: McastService.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- McastService.java 16 Nov 2003 22:22:45 -0000 1.3 +++ McastService.java 18 Dec 2003 04:20:14 -0000 1.4 @@ -165,7 +165,7 @@ String host = getProperties().getProperty("tcpListenHost"); int port = Integer.parseInt(getProperties().getProperty("tcpListenPort")); String name = "tcp://"+host+":"+port; - localMember = new McastMember(name,host,port,0); + localMember = new McastMember(name,host,port,100); impl = new McastServiceImpl((McastMember)localMember,Long.parseLong(properties.getProperty("msgFrequency")), Long.parseLong(properties.getProperty("memberDropTime")), Integer.parseInt(properties.getProperty("mcastPort")), @@ -244,4 +244,4 @@ service.start(); Thread.currentThread().sleep(60*1000*60); } -} \ No newline at end of file +} 1.5 +7 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java Index: McastServiceImpl.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- McastServiceImpl.java 16 Nov 2003 22:22:45 -0000 1.4 +++ McastServiceImpl.java 18 Dec 2003 04:20:14 -0000 1.5 @@ -140,7 +140,7 @@ /** * When was the service started */ - protected long serviceStartTime = 0; + protected long serviceStartTime = System.currentTimeMillis(); /** * Create a new mcast service impl @@ -186,6 +186,7 @@ */ public synchronized void start() throws IOException { if ( doRun ) throw new IllegalStateException("Service already running."); + serviceStartTime = System.currentTimeMillis(); socket.joinGroup(address); doRun = true; sender = new SenderThread(sendFrequency); @@ -194,7 +195,7 @@ receiver.setDaemon(true); receiver.start(); sender.start(); - serviceStartTime = System.currentTimeMillis(); + } /** 1.6 +40 -31 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java Index: ReplicationListener.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- ReplicationListener.java 15 Dec 2003 21:33:06 -0000 1.5 +++ ReplicationListener.java 18 Dec 2003 04:20:15 -0000 1.6 @@ -142,33 +142,46 @@ while (doListen) { // this may block for a long time, upon return the // selected set contains keys of the ready channels - int n = selector.select(timeout); - if (n == 0) { - continue; // nothing to do - } - // get an iterator over the set of selected keys - Iterator it = selector.selectedKeys().iterator(); - // look at each key in the selected set - while (it.hasNext()) { - SelectionKey key = (SelectionKey) it.next(); - // Is a new connection coming in? - if (key.isAcceptable()) { - ServerSocketChannel server = - (ServerSocketChannel) key.channel(); - SocketChannel channel = server.accept(); - registerChannel (selector, - channel, - SelectionKey.OP_READ | SelectionKey.OP_WRITE, - new ObjectReader(channel,selector,callback)); + try { + + int n = selector.select(timeout); + if (n == 0) { + continue; // nothing to do } - // is there data to read on this channel? - if (key.isReadable()) { - readDataFromSocket (key); + // get an iterator over the set of selected keys + Iterator it = selector.selectedKeys().iterator(); + // look at each key in the selected set + while (it.hasNext()) { + SelectionKey key = (SelectionKey) it.next(); + // Is a new connection coming in? + if (key.isAcceptable()) { + ServerSocketChannel server = + (ServerSocketChannel) key.channel(); + SocketChannel channel = server.accept(); + registerChannel(selector, + channel, + SelectionKey.OP_READ | + SelectionKey.OP_WRITE, + new ObjectReader(channel, selector, + callback)); + } + // is there data to read on this channel? + if (key.isReadable()) { + readDataFromSocket(key); + } + // remove key from selected set, it's been handled + it.remove(); } - // remove key from selected set, it's been handled - it.remove(); } - }//while + catch (java.nio.channels.CancelledKeyException nx) { + log.warn( + "Replication client disconnected, error when polling key. Ignoring client."); + } + catch (Exception x) { + log.error("Unable to process request in ReplicationListener", x); + } + + } //while serverChannel.close(); selector.close(); } @@ -197,10 +210,6 @@ } // ---------------------------------------------------------- - - // Use the same byte buffer for all channels. A single thread is - // servicing all the channels, so no danger of concurrent acccess. - private ByteBuffer buffer = ByteBuffer.allocateDirect (1024); /** * Sample data handler method for a channel with data ready to read. 1.8 +11 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Index: ReplicationTransmitter.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -r1.7 -r1.8 --- ReplicationTransmitter.java 15 Oct 2003 03:29:32 -0000 1.7 +++ ReplicationTransmitter.java 18 Dec 2003 04:20:15 -0000 1.8 @@ -124,7 +124,7 @@ v.copyInto(result); return result; } - + protected void sendMessageData(String sessionId, byte[] data, IDataSender sender) throws java.io.IOException { if ( sender == null ) throw new java.io.IOException("Sender not available. Make sure sender information is available to the ReplicationTransmitter."); try @@ -132,9 +132,14 @@ if (!sender.isConnected()) sender.connect(); sender.sendMessage(sessionId,data); + sender.setSuspect(false); }catch ( Exception x) { - log.warn("Unable to send replicated message, is server down?",x); + if ( !sender.getSuspect() ) { + log.warn("Unable to send replicated message, is server down?", + x); + } + sender.setSuspect(true); } } @@ -169,4 +174,4 @@ -} \ No newline at end of file +} 1.21 +42 -20 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Index: SimpleTcpCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v retrieving revision 1.20 retrieving revision 1.21 diff -u -r1.20 -r1.21 --- SimpleTcpCluster.java 15 Dec 2003 21:33:06 -0000 1.20 +++ SimpleTcpCluster.java 18 Dec 2003 04:20:15 -0000 1.21 @@ -162,10 +162,6 @@ protected int tcpThreadCount = 2; /** - * ReplicationListener to listen for incoming data - */ - protected ReplicationListener mReplicationListener; - /** * ReplicationTransmitter to send data with */ protected ReplicationTransmitter mReplicationTransmitter; @@ -272,6 +268,12 @@ * defaults to synchronous */ protected String replicationMode="synchronous"; + + private long nrOfMsgsReceived = 0; + private long msgSendTime = 0; + private long lastChecked = System.currentTimeMillis(); + private boolean isJdk13 = false; + // ------------------------------------------------------------- Properties public SimpleTcpCluster() { @@ -475,15 +477,31 @@ (sm.getString("cluster.alreadyStarted")); log.info("Cluster is about to start"); try { - mReplicationListener = - new ReplicationListener(this, - this.tcpThreadCount, - this.tcpAddress, - this.tcpPort, - this.tcpSelectorTimeout, - "synchronous".equals(this.replicationMode)); - mReplicationListener.setDaemon(true); - mReplicationListener.start(); + if ( isJdk13 ) { + Jdk13ReplicationListener mReplicationListener = + new Jdk13ReplicationListener(this, + this.tcpThreadCount, + this.tcpAddress, + this.tcpPort, + this.tcpSelectorTimeout, + "synchronous".equals(this. + replicationMode)); + Thread t = new Thread(mReplicationListener); + t.setDaemon(true); + t.start(); + } else { + ReplicationListener mReplicationListener = + new ReplicationListener(this, + this.tcpThreadCount, + this.tcpAddress, + this.tcpPort, + this.tcpSelectorTimeout, + "synchronous".equals(this. + replicationMode)); + mReplicationListener.setDaemon(true); + mReplicationListener.start(); + } + mReplicationTransmitter = new ReplicationTransmitter(new IDataSender[0]); mReplicationTransmitter.start(); @@ -777,15 +795,19 @@ // --------------------------------------------- Inner Class // --------------------------------------------- Performance - private long nrOfMsgsReceived = 0; - private long msgSendTime = 0; - private long lastChecked = System.currentTimeMillis(); + private void perfMessageRecvd(long timeSent) { nrOfMsgsReceived++; msgSendTime+=(System.currentTimeMillis()-timeSent); if ( (System.currentTimeMillis() - lastChecked) > 5000 ) { log.debug("Calc msg send time total="+msgSendTime+"ms num request="+nrOfMsgsReceived+" average per msg="+(msgSendTime/nrOfMsgsReceived)+"ms."); } + } + public boolean getIsJdk13() { + return isJdk13; + } + public void setIsJdk13(boolean isJdk13) { + this.isJdk13 = isJdk13; } } 1.6 +31 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java Index: SocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- SocketSender.java 15 Dec 2003 22:09:10 -0000 1.5 +++ SocketSender.java 18 Dec 2003 04:20:15 -0000 1.6 @@ -82,7 +82,12 @@ private Socket sc = null; private boolean isSocketConnected = false; private boolean suspect; - private long ackTimeout = 60*1000; + private long ackTimeout = 5*1000; + private long keepAliveTimeout = 60*1000; //keep socket open for no more than one min + private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting + private long keepAliveConnectTime = 0; + private int keepAliveCount = 0; + public SocketSender(InetAddress host, int port) { @@ -105,6 +110,8 @@ sc = new Socket(getAddress(),getPort()); sc.setSoTimeout((int)ackTimeout); isSocketConnected = true; + this.keepAliveCount = 0; + this.keepAliveConnectTime = System.currentTimeMillis(); } public void disconnect() @@ -129,9 +136,16 @@ */ public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException { + long ctime = System.currentTimeMillis() - this.keepAliveConnectTime; + if ( (ctime > this.keepAliveTimeout) || + (this.keepAliveCount >= this.keepAliveMaxRequestCount) ) { + disconnect(); + } if ( !isConnected() ) connect(); try { + + sc.getOutputStream().write(data); sc.getOutputStream().flush(); waitForAck(ackTimeout); @@ -144,6 +158,7 @@ sc.getOutputStream().flush(); waitForAck(ackTimeout); } + this.keepAliveCount++; } private void waitForAck(long timeout) throws java.io.IOException, @@ -169,6 +184,18 @@ public void setSuspect(boolean suspect) { this.suspect = suspect; + } + public long getKeepAliveTimeout() { + return keepAliveTimeout; + } + public void setKeepAliveTimeout(long keepAliveTimeout) { + this.keepAliveTimeout = keepAliveTimeout; + } + public int getKeepAliveMaxRequestCount() { + return keepAliveMaxRequestCount; + } + public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { + this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; } 1.3 +13 -9 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java Index: TcpReplicationThread.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- TcpReplicationThread.java 15 Dec 2003 21:33:06 -0000 1.2 +++ TcpReplicationThread.java 18 Dec 2003 04:20:15 -0000 1.3 @@ -167,26 +167,30 @@ // loop while data available, channel is non-blocking while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable - if (reader.append(buffer.array(),0,count)) { + int pkgcnt = reader.append(buffer.array(),0,count); + while ( pkgcnt > 0 ) { if (synchronous) { sendAck(key,channel); } //end if + pkgcnt--; } buffer.clear(); // make buffer empty } //check to see if any data is available - if ( reader.execute() ) { + int pkgcnt = reader.execute(); + while ( pkgcnt > 0 ) { if (synchronous) { sendAck(key,channel); - }//end if - }//end if + } //end if + pkgcnt--; + } if (count < 0) { // close channel on EOF, invalidates the key channel.close(); return; } - // resume interest in OP_READ - key.interestOps (key.interestOps() | SelectionKey.OP_READ); + // resume interest in OP_READ, OP_WRITE + key.interestOps (key.interestOps() | SelectionKey.OP_READ | SelectionKey.OP_WRITE); // cycle the selector so this key is active again key.selector().wakeup(); } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/Jdk13ReplicationListener.java Index: Jdk13ReplicationListener.java =================================================================== /* * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/Jdk13ReplicationListener.java,v 1.1 2003/12/18 04:20:15 fhanik Exp $ * $Revision: 1.1 $ * $Date: 2003/12/18 04:20:15 $ * * ==================================================================== * * The Apache Software License, Version 1.1 * * Copyright (c) 1999 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, if * any, must include the following acknowlegement: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowlegement may appear in the software itself, * if and wherever such third-party acknowlegements normally appear. * * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software * Foundation" must not be used to endorse or promote products derived * from this software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache" * nor may "Apache" appear in their names without prior written * permission of the Apache Group. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * [Additional notices, if required by prior licensing conditions] * */ package org.apache.catalina.cluster.tcp; import java.net.Socket; import java.net.ServerSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Iterator; import org.apache.catalina.cluster.io.ListenCallback; import org.apache.catalina.cluster.io.Jdk13ObjectReader; import org.apache.catalina.cluster.io.XByteBuffer; /** */ public class Jdk13ReplicationListener implements Runnable { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( SimpleTcpCluster.class ); private ThreadPool pool = null; private boolean doListen = false; private ListenCallback callback; private java.net.InetAddress bind; private int port; private long timeout = 0; private boolean synchronous = false; ServerSocket serverSocket = null; public Jdk13ReplicationListener(ListenCallback callback, int poolSize, java.net.InetAddress bind, int port, long timeout, boolean synchronous) { this.synchronous=synchronous; this.callback = callback; this.bind = bind; this.port = port; this.timeout = timeout; } public void run() { try { listen(); } catch ( Exception x ) { log.fatal("Unable to start cluster listener.",x); } } public void listen () throws Exception { doListen = true; // Get the associated ServerSocket to bind it with serverSocket = new ServerSocket(); serverSocket.bind (new InetSocketAddress (bind,port)); while (doListen) { Socket socket = serverSocket.accept(); ClusterListenThread t = new ClusterListenThread(socket,new Jdk13ObjectReader(socket,callback)); t.setDaemon(true); t.start(); }//while serverSocket.close(); } public void stopListening(){ doListen = false; try { serverSocket.close(); } catch ( Exception x ) { log.error("Unable to stop the replication listen socket",x); } } protected static class ClusterListenThread extends Thread { private Socket socket; private Jdk13ObjectReader reader; private boolean keepRunning = true; private static byte[] ackMsg = new byte[] {6,2,3}; ClusterListenThread(Socket socket, Jdk13ObjectReader reader) { this.socket = socket; this.reader = reader; } public void run() { try { byte[] buffer = new byte[1024]; while (keepRunning) { java.io.InputStream in = socket.getInputStream(); int cnt = in.read(buffer); int ack = 0; if ( cnt > 0 ) { ack = reader.append(buffer, 0, cnt); } while ( ack > 0 ) { sendAck(); ack--; } } } catch ( Exception x ) { keepRunning = false; log.error("Unable to read data from client, disconnecting.",x); try { socket.close(); } catch ( Exception ignore ) {} } } private void sendAck() throws java.io.IOException { //send a reply-acknowledgement socket.getOutputStream().write(ackMsg); } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]