pero 2005/02/15 01:31:46 Modified: modules/cluster/src/share/org/apache/catalina/cluster ClusterReceiver.java ClusterSender.java modules/cluster/src/share/org/apache/catalina/cluster/tcp AsyncSocketSender.java IDataSender.java PooledSocketSender.java ReplicationListener.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java Added: modules/cluster/src/share/org/apache/catalina/cluster/tcp DataSender.java Log: Refactor all IDataSender and factor out a base class Add some statistics attributes i18n support to senders Add KeepAlive and Ack Handling to AsyncSocketSender Revision Changes Path 1.3 +9 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java Index: ClusterReceiver.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- ClusterReceiver.java 27 Feb 2004 14:58:55 -0000 1.2 +++ ClusterReceiver.java 15 Feb 2005 09:31:45 -0000 1.3 @@ -17,7 +17,13 @@ package org.apache.catalina.cluster; - +/** + * + * @author Filip Hanik + * @author Peter Rossbach + * @version 1.1 + * + */ public interface ClusterReceiver { @@ -27,7 +33,8 @@ public void setCatalinaCluster(CatalinaCluster cluster); - public void setIsSenderSynchronized(boolean isSenderSynchronized); + public boolean isWaitForAck(); + public void setWaitForAck(boolean isWaitForAck); public String getHost(); 1.4 +9 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java Index: ClusterSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- ClusterSender.java 27 Dec 2004 09:30:36 -0000 1.3 +++ ClusterSender.java 15 Feb 2005 09:31:45 -0000 1.4 @@ -19,7 +19,13 @@ import org.apache.catalina.cluster.tcp.SimpleTcpCluster; - +/** + * + * @author Filip Hanik + * @author Peter Rossbach + * @version 1.1 + * + */ public interface ClusterSender { @@ -35,7 +41,8 @@ public void sendMessage(String messageId, byte[] indata) throws java.io.IOException; - public boolean getIsSenderSynchronized(); + public boolean isWaitForAck(); + public void setWaitForAck(boolean isWaitForAck); /** * @param cluster 1.10 +157 -105 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java Index: AsyncSocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- AsyncSocketSender.java 27 Dec 2004 09:30:36 -0000 1.9 +++ AsyncSocketSender.java 15 Feb 2005 09:31:45 -0000 1.10 @@ -17,173 +17,215 @@ package org.apache.catalina.cluster.tcp; import java.net.InetAddress; -import java.net.Socket; + import org.apache.catalina.cluster.util.SmartQueue; /** - * Send cluster messages from a Message queue with only one socket. + * Send cluster messages from a Message queue with only one socket. Ack and keep + * Alive Handling is supported. + * <ul> + * <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the + * sender and all messages are queued. Only use this for small maintaince + * isuses!</li> + * <li>waitForAck=true, means that receiver ack the transfer</li> + * <li>after one minute idle time, or number of request (100) the connection is + * reconnected with next request. Change this for production use!</li> + * <li>default ackTimeout is 15 sec: this is very low for big all session replication messages after restart a node</li> + * <li>disable keepAlive: keepAliveTimeout="-1" and keepAliveMaxRequestCount="-1"</li> + * </ul> * * @author Filip Hanik * @author Peter Rossbach - * @version 1.1 + * @version 1.2 */ -public class AsyncSocketSender implements IDataSender { +public class AsyncSocketSender extends DataSender { + private static int threadCounter = 1; private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory .getLog(AsyncSocketSender.class); - private InetAddress address; - - private int port; - - private Socket sc = null; + /** + * The descriptive information about this implementation. + */ + private static final String info = "AsyncSocketSender/1.2"; - private boolean isSocketConnected = false; + // ----------------------------------------------------- Instance Variables + /** + * Message Queue + */ private SmartQueue queue = new SmartQueue(); - private boolean suspect; - + /** + * Active thread to push messages asynchronous to the other replication node + */ private QueueThread queueThread = null; - private long ackTimeout; - - private long nrOfRequests = 0; - - private long totalBytes = 0; - - private synchronized void addStats(int length) { - nrOfRequests++; - totalBytes += length; - if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) { - log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort() - + "Nr of bytes sent=" + totalBytes + " over " - + nrOfRequests + " ==" + (totalBytes / nrOfRequests) - + " bytes/request"); - } - - } + /** + * Count number of queue message + */ + private long inQueueCounter = 0; /** - * @return Returns the nrOfRequests. + * Count all successfull push messages from queue */ - public long getNrOfRequests() { - return nrOfRequests; - } + private long outQueueCounter = 0; /** - * @return Returns the totalBytes. + * Current number of bytes from all queued messages */ - public long getTotalBytes() { - return totalBytes; - } + private long queuedNrOfBytes = 0; + // ------------------------------------------------------------- Constructor + + /** + * start background thread to push incomming cluster messages to replication + * node + * + * @param host replication node tcp address + * @param port replication node tcp port + */ public AsyncSocketSender(InetAddress host, int port) { - this.address = host; - this.port = port; + super(host, port); checkThread(); - if (log.isInfoEnabled()) - log.info("Started async sender thread for TCP replication."); + long a = Long.MAX_VALUE; } - public InetAddress getAddress() { - return address; - } + // ------------------------------------------------------------- Properties - public int getPort() { - return port; - } + /** + * Return descriptive information about this implementation and the + * corresponding version number, in the format + * <code><description>/<version></code>. + */ + public String getInfo() { - public void connect() throws java.io.IOException { - sc = new Socket(getAddress(), getPort()); - isSocketConnected = true; - checkThread(); + return (info); } - protected void checkThread() { - if (queueThread == null) { - queueThread = new QueueThread(this); - queueThread.setDaemon(true); - queueThread.start(); - } + /** + * @return Returns the inQueueCounter. + */ + public long getInQueueCounter() { + return inQueueCounter; } - public void disconnect() { - try { - sc.close(); - } catch (Exception x) { - } - isSocketConnected = false; - if (queueThread != null) { - queueThread.stopRunning(); - queueThread = null; - } + /** + * @return Returns the outQueueCounter. + */ + public long getOutQueueCounter() { + return outQueueCounter; + } + /** + * @return Returns the queueSize. + */ + public int getQueueSize() { + return queue.size(); } - public boolean isConnected() { - return isSocketConnected; + /** + * @return Returns the queuedNrOfBytes. + */ + public long getQueuedNrOfBytes() { + return queuedNrOfBytes; } - public int getQueueSize() { - return queue.size(); + // --------------------------------------------------------- Public Methods + + /* + * Connect to socket and start background thread to ppush queued messages + * + * @see org.apache.catalina.cluster.tcp.IDataSender#connect() + */ + public void connect() throws java.io.IOException { + super.connect(); + checkThread(); } /** - * Blocking send + * Disconnect socket ad stop queue thread * - * @param data - * @throws java.io.IOException + * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect() */ - private synchronized void sendMessage(byte[] data) - throws java.io.IOException { - if (!isConnected()) - connect(); - try { - sc.getOutputStream().write(data); - sc.getOutputStream().flush(); - } catch (java.io.IOException x) { - disconnect(); - connect(); - sc.getOutputStream().write(data); - sc.getOutputStream().flush(); - } - addStats(data.length); + public void disconnect() { + stopThread(); + super.disconnect(); } - public synchronized void sendMessage(String sessionId, byte[] data) + /* + * Send message to queue for later sending + * + * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String, + * byte[]) + */ + public synchronized void sendMessage(String messageid, byte[] data) throws java.io.IOException { - SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(sessionId, data); + SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(messageid, data); queue.add(entry); + inQueueCounter++; + queuedNrOfBytes += data.length; + if (log.isTraceEnabled()) + log.trace(sm.getString("AsyncSocketSender.queue.message", + getAddress(), new Integer(getPort()), messageid, new Long( + data.length))); + } + + /* + * Reset sender statistics + */ + public synchronized void resetStatistics() { + super.resetStatistics(); + inQueueCounter = queue.size(); + outQueueCounter = 0; + } + /** + * Name of this SockerSender + */ public String toString() { StringBuffer buf = new StringBuffer("AsyncSocketSender["); buf.append(getAddress()).append(":").append(getPort()).append("]"); return buf.toString(); } - public boolean isSuspect() { - return suspect; - } + // --------------------------------------------------------- Public Methods - public boolean getSuspect() { - return suspect; + /** + * Start Queue thread as daemon + */ + protected void checkThread() { + if (queueThread == null) { + if (log.isInfoEnabled()) + log.info(sm.getString("AsyncSocketSender.create.thread", + getAddress(), new Integer(getPort()))); + queueThread = new QueueThread(this); + queueThread.setDaemon(true); + queueThread.start(); + } } - public void setSuspect(boolean suspect) { - this.suspect = suspect; + /** + * stop queue worker thread + */ + protected void stopThread() { + if (queueThread != null) { + queueThread.stopRunning(); + queueThread = null; + } } - public long getAckTimeout() { - return ackTimeout; + /* + * Reduce queued message date size counter + */ + protected void reduceQueuedCounter(int size) { + queuedNrOfBytes -= size; } - public void setAckTimeout(long ackTimeout) { - this.ackTimeout = ackTimeout; - } + // -------------------------------------------------------- Inner Class private class QueueThread extends Thread { AsyncSocketSender sender; @@ -199,16 +241,26 @@ keepRunning = false; } + /** + * Get one queued message and push it to the replication node + * + * @see DataSender#pushMessage(String, byte[]) + */ public void run() { while (keepRunning) { SmartQueue.SmartEntry entry = sender.queue.remove(5000); if (entry != null) { + int messagesize = 0; try { byte[] data = (byte[]) entry.getValue(); - sender.sendMessage(data); + messagesize = data.length; + sender.pushMessage((String) entry.getKey(), data); + outQueueCounter++; } catch (Exception x) { - log.warn("Unable to asynchronously send session w/ id=" - + entry.getKey() + " message will be ignored."); + log.warn(sm.getString("AsyncSocketSender.send.error", + entry.getKey())); + } finally { + reduceQueuedCounter(messagesize); } } } 1.6 +4 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java Index: IDataSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- IDataSender.java 29 Sep 2004 18:23:55 -0000 1.5 +++ IDataSender.java 15 Feb 2005 09:31:45 -0000 1.6 @@ -36,4 +36,8 @@ public void setSuspect(boolean suspect); public boolean getSuspect(); public void setAckTimeout(long timeout); + public long getAckTimeout(); + public boolean isWaitForAck(); + public void setWaitForAck(boolean isWaitForAck); + } 1.9 +117 -133 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Index: PooledSocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- PooledSocketSender.java 27 Dec 2004 09:30:36 -0000 1.8 +++ PooledSocketSender.java 15 Feb 2005 09:31:45 -0000 1.9 @@ -15,8 +15,9 @@ */ package org.apache.catalina.cluster.tcp; -import java.net.InetAddress ; -import java.net.Socket; + +import java.io.IOException; +import java.net.InetAddress; import java.util.LinkedList; /** @@ -24,129 +25,99 @@ * * @author Filip Hanik * @author Peter Rossbach - * @version 1.1 + * @version 1.2 */ +public class PooledSocketSender extends DataSender { + + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + .getLog(org.apache.catalina.cluster.tcp.PooledSocketSender.class); -public class PooledSocketSender implements IDataSender -{ + /** + * The descriptive information about this implementation. + */ + private static final String info = "PooledSocketSender/1.2"; - private static org.apache.commons.logging.Log log = - org.apache.commons.logging.LogFactory.getLog( org.apache.catalina.cluster.CatalinaCluster.class ); + // ----------------------------------------------------- Instance Variables - private InetAddress address; - private int port; - private Socket sc = null; - private boolean isSocketConnected = true; - private boolean suspect; - private long ackTimeout = 15*1000; //15 seconds socket read timeout (for acknowledgement) - private long keepAliveTimeout = 60*1000; //keep socket open for no more than one min - private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting - private long keepAliveConnectTime = 0; - private int keepAliveCount = 0; private int maxPoolSocketLimit = 25; private SenderQueue senderQueue = null; - private long nrOfRequests = 0; - private long totalBytes = 0; - - public PooledSocketSender(InetAddress host, int port) - { - this.address = host; - this.port = port; - senderQueue = new SenderQueue(this,maxPoolSocketLimit); - } - - private synchronized void addStats(int length) { - nrOfRequests++; - totalBytes += length; - if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) { - log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort() - + "Nr of bytes sent=" + totalBytes + " over " - + nrOfRequests + " ==" + (totalBytes / nrOfRequests) - + " bytes/request"); - } + // ----------------------------------------------------- Constructor + public PooledSocketSender(InetAddress host, int port) { + super(host, port); + senderQueue = new SenderQueue(this, maxPoolSocketLimit); } - /** - * @return Returns the nrOfRequests. - */ - public long getNrOfRequests() { - return nrOfRequests; - } + // ----------------------------------------------------- Public Properties /** - * @return Returns the totalBytes. + * Return descriptive information about this implementation and the + * corresponding version number, in the format + * <code><description>/<version></code>. */ - public long getTotalBytes() { - return totalBytes; - } + public String getInfo() { + return (info); - public InetAddress getAddress() - { - return address; - } - - public int getPort() - { - return port; } - public void connect() throws java.io.IOException - { - //do nothing, happens in the socket sender itself - senderQueue.open(); - isSocketConnected = true; + public void setMaxPoolSocketLimit(int limit) { + maxPoolSocketLimit = limit; + senderQueue.setLimit(limit); } - public void disconnect() - { - senderQueue.close(); - isSocketConnected = false; + public int getMaxPoolSocketLimit() { + return maxPoolSocketLimit; } - public boolean isConnected() - { - return isSocketConnected; + public int getInPoolSize() { + return senderQueue.getInPoolSize(); } - public void setAckTimeout(long timeout) { - this.ackTimeout = timeout; + public int getInUsePoolSize() { + return senderQueue.getInUsePoolSize(); } - public long getAckTimeout() { - return ackTimeout; - } + // ----------------------------------------------------- Public Methode - public void setMaxPoolSocketLimit(int limit) { - maxPoolSocketLimit = limit; + public void connect() throws java.io.IOException { + //do nothing, happens in the socket sender itself + senderQueue.open(); + setSocketConnected(true); + connectCounter++; } - public int getMaxPoolSocketLimit() { - return maxPoolSocketLimit; + public void disconnect() { + senderQueue.close(); + setSocketConnected(false); + disconnectCounter++; } - /** - * Blocking send - * @param data + * send Message and use a pool of SocketSenders + * + * @param messageId Message unique identifier + * @param data Message data * @throws java.io.IOException */ - public void sendMessage(String sessionId, byte[] data) throws java.io.IOException - { + public void sendMessage(String messageId, byte[] data) throws IOException { //get a socket sender from the pool SocketSender sender = senderQueue.getSender(0); - if ( sender == null ) { - log.warn("No socket sender available for client="+this.getAddress()+":"+this.getPort()+" did it disappear?"); + if (sender == null) { + log.warn(sm.getString("PoolSocketSender.noMoreSender", this + .getAddress(), new Integer(this.getPort()))); return; - }//end if + } //send the message - sender.sendMessage(sessionId,data); - //return the connection to the pool - senderQueue.returnSender(sender); + try { + sender.sendMessage(messageId, data); + } finally { + //return the connection to the pool + senderQueue.returnSender(sender); + } addStats(data.length); } @@ -156,46 +127,19 @@ return buf.toString(); } - public boolean getSuspect() { - return suspect; - } - - public void setSuspect(boolean suspect) { - this.suspect = suspect; - } - - public long getKeepAliveTimeout() { - return keepAliveTimeout; - } - public void setKeepAliveTimeout(long keepAliveTimeout) { - this.keepAliveTimeout = keepAliveTimeout; - } - public int getKeepAliveMaxRequestCount() { - return keepAliveMaxRequestCount; - } - public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { - this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; - } - - /** - * @return Returns the keepAliveConnectTime. - */ - public long getKeepAliveConnectTime() { - return keepAliveConnectTime; - } - /** - * @return Returns the keepAliveCount. - */ - public int getKeepAliveCount() { - return keepAliveCount; - } + // ----------------------------------------------------- Inner Class private class SenderQueue { private int limit = 25; + PooledSocketSender parent = null; + private LinkedList queue = new LinkedList(); + private LinkedList inuse = new LinkedList(); + private Object mutex = new Object(); + private boolean isOpen = true; public SenderQueue(PooledSocketSender parent, int limit) { @@ -203,30 +147,67 @@ this.parent = parent; } + /** + * @return Returns the limit. + */ + public int getLimit() { + return limit; + } + /** + * @param limit The limit to set. + */ + public void setLimit(int limit) { + this.limit = limit; + } + /** + * @return + */ + public int getInUsePoolSize() { + return inuse.size(); + } + + /** + * @return + */ + public int getInPoolSize() { + return queue.size(); + } + public SocketSender getSender(long timeout) { SocketSender sender = null; long start = System.currentTimeMillis(); long delta = 0; do { synchronized (mutex) { - if ( !isOpen ) throw new IllegalStateException("Socket pool is closed."); - if ( queue.size() > 0 ) { + if (!isOpen) + throw new IllegalStateException( + "Socket pool is closed."); + if (queue.size() > 0) { sender = (SocketSender) queue.removeFirst(); - } else if ( inuse.size() < limit ) { + } else if (inuse.size() < limit) { sender = getNewSocketSender(); } else { try { mutex.wait(timeout); - }catch ( Exception x ) { - PooledSocketSender.log.warn("PoolSocketSender.senderQueue.getSender failed",x); + } catch (Exception x) { + PooledSocketSender.log + .warn( + sm + .getString( + "PoolSocketSender.senderQueue.sender.failed", + parent.getAddress(), + new Integer(parent + .getPort())), + x); }//catch }//end if - if ( sender != null ) { + if (sender != null) { inuse.add(sender); } }//synchronized delta = System.currentTimeMillis() - start; - } while ( (isOpen) && (sender == null) && (timeout==0?true:(delta<timeout)) ); + } while ((isOpen) && (sender == null) + && (timeout == 0 ? true : (delta < timeout))); //to do return sender; } @@ -242,21 +223,24 @@ private SocketSender getNewSocketSender() { //new SocketSender( - SocketSender sender = new SocketSender(parent.getAddress(),parent.getPort()); - sender.setKeepAliveMaxRequestCount(parent.getKeepAliveMaxRequestCount()); + SocketSender sender = new SocketSender(parent.getAddress(), parent + .getPort()); + sender.setKeepAliveMaxRequestCount(parent + .getKeepAliveMaxRequestCount()); sender.setKeepAliveTimeout(parent.getKeepAliveTimeout()); sender.setAckTimeout(parent.getAckTimeout()); + sender.setWaitForAck(parent.isWaitForAck()); return sender; } public void close() { synchronized (mutex) { - for ( int i=0; i<queue.size(); i++ ) { - SocketSender sender = (SocketSender)queue.get(i); + for (int i = 0; i < queue.size(); i++) { + SocketSender sender = (SocketSender) queue.get(i); sender.disconnect(); }//for - for ( int i=0; i<inuse.size(); i++ ) { + for (int i = 0; i < inuse.size(); i++) { SocketSender sender = (SocketSender) inuse.get(i); sender.disconnect(); }//for @@ -266,7 +250,7 @@ mutex.notifyAll(); } } - + public void open() { synchronized (mutex) { isOpen = true; @@ -274,4 +258,4 @@ } } } -} +} \ No newline at end of file 1.19 +6 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java Index: ReplicationListener.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v retrieving revision 1.18 retrieving revision 1.19 diff -u -r1.18 -r1.19 --- ReplicationListener.java 27 Dec 2004 09:30:36 -0000 1.18 +++ ReplicationListener.java 15 Feb 2005 09:31:45 -0000 1.19 @@ -45,7 +45,7 @@ private int tcpThreadCount; private long tcpSelectorTimeout; private int tcpListenPort; - private boolean isSenderSynchronized; + private boolean waitForAck; private Selector selector = null; private Object interestOpsMutex = new Object(); @@ -221,7 +221,7 @@ return; } else { // invoking this wakes up the worker thread then returns - worker.serviceChannel(key, isSenderSynchronized); + worker.serviceChannel(key, waitForAck); return; } } @@ -249,11 +249,11 @@ public void setTcpThreadCount(int tcpThreadCount) { this.tcpThreadCount = tcpThreadCount; } - public boolean getIsSenderSynchronized() { - return isSenderSynchronized; + public boolean isWaitForAck() { + return waitForAck; } - public void setIsSenderSynchronized(boolean isSenderSynchronized) { - this.isSenderSynchronized = isSenderSynchronized; + public void setWaitForAck(boolean waitForAck) { + this.waitForAck = waitForAck; } public String getHost() { 1.58 +2 -3 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Index: SimpleTcpCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v retrieving revision 1.57 retrieving revision 1.58 diff -u -r1.57 -r1.58 --- SimpleTcpCluster.java 27 Dec 2004 09:30:36 -0000 1.57 +++ SimpleTcpCluster.java 15 Feb 2005 09:31:45 -0000 1.58 @@ -389,8 +389,7 @@ } registerMBeans(); - clusterReceiver.setIsSenderSynchronized(clusterSender - .getIsSenderSynchronized()); + clusterReceiver.setWaitForAck(clusterSender.isWaitForAck()); clusterReceiver.setCatalinaCluster(this); clusterReceiver.start(); clusterSender.setCatalinaCluster(this); 1.15 +18 -187 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java Index: SocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v retrieving revision 1.14 retrieving revision 1.15 diff -u -r1.14 -r1.15 --- SocketSender.java 27 Dec 2004 09:30:36 -0000 1.14 +++ SocketSender.java 15 Feb 2005 09:31:45 -0000 1.15 @@ -15,176 +15,42 @@ */ package org.apache.catalina.cluster.tcp; -import java.net.InetAddress ; -import java.net.Socket; + +import java.net.InetAddress; /** * Send cluster messages sync to request with only one socket. * * @author Filip Hanik * @author Peter Rossbach - * @version 1.1 + * @version 1.2 */ -public class SocketSender implements IDataSender -{ - - private static org.apache.commons.logging.Log log = - org.apache.commons.logging.LogFactory.getLog( SocketSender.class ); - - private InetAddress address; - private int port; - private Socket sc = null; - private boolean isSocketConnected = false; - /** - * Flag socket as suspect - */ - private boolean suspect; - /** - * 15 seconds socket read timeout (for acknowledgement) - */ - private long ackTimeout = 15*1000; - /** - * keep socket open for no more than one min - */ - private long keepAliveTimeout = 60*1000; - - /** - * max 100 requests before reconnecting - */ - private int keepAliveMaxRequestCount = 100; - - private long keepAliveConnectTime = 0; - private int keepAliveCount = 0; - - private long nrOfRequests = 0; - - private long totalBytes = 0; - - private synchronized void addStats(int length) { - nrOfRequests++; - totalBytes += length; - if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) { - log.debug("Send stats from " + getAddress().getHostAddress() + ":" + getPort() - + "Nr of bytes sent=" + totalBytes + " over " - + nrOfRequests + " ==" + (totalBytes / nrOfRequests) - + " bytes/request"); - } - - } - - /** - * get number of messages that send - * @return Returns the nrOfRequests. - */ - public long getNrOfRequests() { - return nrOfRequests; - } +public class SocketSender extends DataSender { + // ----------------------------------------------------- Instance Variables /** - * get total num bytes send with this socket. - * @return Returns the totalBytes. + * The descriptive information about this implementation. */ - public long getTotalBytes() { - return totalBytes; - } - - public SocketSender(InetAddress host, int port) - { - this.address = host; - this.port = port; - } - - public InetAddress getAddress() - { - return address; - } - - public int getPort() - { - return port; - } - - public void connect() throws java.io.IOException - { - sc = new Socket(getAddress(),getPort()); - sc.setSoTimeout((int)ackTimeout); - isSocketConnected = true; - this.keepAliveCount = 0; - this.keepAliveConnectTime = System.currentTimeMillis(); - } - - public void disconnect() - { - try - { - sc.close(); - }catch ( Exception x) - {} - isSocketConnected = false; - } + private static final String info = "SocketSender/1.2"; - public boolean isConnected() - { - return isSocketConnected; - } - - public void checkIfDisconnect() { - long ctime = System.currentTimeMillis() - this.keepAliveConnectTime; - if ( (ctime > this.keepAliveTimeout) || - (this.keepAliveCount >= this.keepAliveMaxRequestCount) ) { - disconnect(); - } - } + // ------------------------------------------------------------- Constructor - public void setAckTimeout(long timeout) { - this.ackTimeout = timeout; + public SocketSender(InetAddress host, int port) { + super(host, port); } - public long getAckTimeout() { - return ackTimeout; - } + // ------------------------------------------------------------- Properties /** - * send with only one socket at a time - * @param sessionid unique message id - * @param data data to send - * @throws java.io.IOException + * Return descriptive information about this implementation and the + * corresponding version number, in the format + * <code><description>/<version></code>. */ - public synchronized void sendMessage(String sessionId, byte[] data) throws java.io.IOException - { - checkIfDisconnect(); - if ( !isConnected() ) connect(); - try - { - sc.getOutputStream().write(data); - sc.getOutputStream().flush(); - waitForAck(ackTimeout); - } - catch ( java.io.IOException x ) - { - disconnect(); - connect(); - sc.getOutputStream().write(data); - sc.getOutputStream().flush(); - waitForAck(ackTimeout); - } - this.keepAliveCount++; - checkIfDisconnect(); - addStats(data.length); - } + public String getInfo() { + + return (info); - private void waitForAck(long timeout) throws java.io.IOException { - try { - int i = sc.getInputStream().read(); - while ( (i != -1) && (i != 3)) { - i = sc.getInputStream().read(); - } - } catch (java.net.SocketTimeoutException x ) { - log.warn("Wasn't able to read acknowledgement from server["+getAddress()+":"+getPort()+"] in "+this.ackTimeout+" ms."+ - " Disconnecting socket, and trying again."); - throw x; - } } public String toString() { @@ -192,40 +58,5 @@ buf.append(getAddress()).append(":").append(getPort()).append("]"); return buf.toString(); } - public boolean isSuspect() { - return suspect; - } - - public boolean getSuspect() { - return suspect; - } - public void setSuspect(boolean suspect) { - this.suspect = suspect; - } - public long getKeepAliveTimeout() { - return keepAliveTimeout; - } - public void setKeepAliveTimeout(long keepAliveTimeout) { - this.keepAliveTimeout = keepAliveTimeout; - } - public int getKeepAliveMaxRequestCount() { - return keepAliveMaxRequestCount; - } - public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { - this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; - } - - /** - * @return Returns the keepAliveConnectTime. - */ - public long getKeepAliveConnectTime() { - return keepAliveConnectTime; - } - /** - * @return Returns the keepAliveCount. - */ - public int getKeepAliveCount() { - return keepAliveCount; - } -} +} \ No newline at end of file 1.13 +5 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java Index: TcpReplicationThread.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v retrieving revision 1.12 retrieving revision 1.13 diff -u -r1.12 -r1.13 --- TcpReplicationThread.java 13 Jul 2004 09:43:58 -0000 1.12 +++ TcpReplicationThread.java 15 Feb 2005 09:31:45 -0000 1.13 @@ -39,7 +39,7 @@ org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class ); private ByteBuffer buffer = ByteBuffer.allocate (1024); private SelectionKey key; - private boolean synchronous=false; + private boolean waitForAck=true; TcpReplicationThread () { @@ -91,10 +91,10 @@ * to ignore read-readiness for this channel while the * worker thread is servicing it. */ - synchronized void serviceChannel (SelectionKey key, boolean synchronous) + synchronized void serviceChannel (SelectionKey key, boolean waitForAck) { this.key = key; - this.synchronous=synchronous; + this.waitForAck=waitForAck; key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); this.notify(); // awaken the thread @@ -125,11 +125,12 @@ //check to see if any data is available int pkgcnt = reader.execute(); while ( pkgcnt > 0 ) { - if (synchronous) { + if (waitForAck) { sendAck(key,channel); } //end if pkgcnt--; } + if (count < 0) { // close channel on EOF, invalidates the key channel.close(); 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java Index: DataSender.java =================================================================== /* * Copyright 1999,2005 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package org.apache.catalina.cluster.tcp; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import org.apache.catalina.util.StringManager; /** * Send cluster messages with only one socket. Ack and keep Alive Handling is * supported * * @author Peter Rossbach * @author Filip Hanik * @version 1.2 */ public class DataSender implements IDataSender { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory .getLog(DataSender.class); /** * The string manager for this package. */ protected static StringManager sm = StringManager .getManager(Constants.Package); // ----------------------------------------------------- Instance Variables /** * The descriptive information about this implementation. */ private static final String info = "DataSender/1.2"; private InetAddress address; private int port; private Socket sc = null; private boolean isSocketConnected = false; private boolean suspect; private long ackTimeout; protected long nrOfRequests = 0; protected long totalBytes = 0; protected long connectCounter = 0; protected long disconnectCounter = 0; protected long missingAckCounter = 0; protected long dataResendCounter = 0; /** * keep socket open for no more than one min */ private long keepAliveTimeout = 60 * 1000; /** * max 100 requests before reconnecting */ private int keepAliveMaxRequestCount = 100; /** * Last connect timestamp */ private long keepAliveConnectTime = 0; /** * keepalive counter */ private int keepAliveCount = 0; private boolean waitForAck = true; private int socketCloseCounter; private int socketOpenCounter; // ------------------------------------------------------------- Constructor public DataSender(InetAddress host, int port) { this.address = host; this.port = port; if (log.isInfoEnabled()) log.info(sm.getString("IDataSender.create", address, new Integer( port))); } // ------------------------------------------------------------- Properties /** * Return descriptive information about this implementation and the * corresponding version number, in the format * <code><description>/<version></code>. */ public String getInfo() { return (info); } /** * @return Returns the nrOfRequests. */ public long getNrOfRequests() { return nrOfRequests; } /** * @return Returns the totalBytes. */ public long getTotalBytes() { return totalBytes; } /** * @return Returns the connectCounter. */ public long getConnectCounter() { return connectCounter; } /** * @return Returns the disconnectCounter. */ public long getDisconnectCounter() { return disconnectCounter; } /** * @return Returns the missingAckCounter. */ public long getMissingAckCounter() { return missingAckCounter; } /** * @return Returns the socketOpenCounter. */ public int getSocketOpenCounter() { return socketOpenCounter; } /** * @return Returns the socketCloseCounter. */ public int getSocketCloseCounter() { return socketCloseCounter; } /** * @return Returns the dataResendCounter. */ public long getDataResendCounter() { return dataResendCounter; } public InetAddress getAddress() { return address; } public int getPort() { return port; } public boolean isConnected() { return isSocketConnected; } /** * @param isSocketConnected * The isSocketConnected to set. */ protected void setSocketConnected(boolean isSocketConnected) { this.isSocketConnected = isSocketConnected; } public boolean isSuspect() { return suspect; } public boolean getSuspect() { return suspect; } public void setSuspect(boolean suspect) { this.suspect = suspect; } public long getAckTimeout() { return ackTimeout; } public void setAckTimeout(long ackTimeout) { this.ackTimeout = ackTimeout; } public long getKeepAliveTimeout() { return keepAliveTimeout; } public void setKeepAliveTimeout(long keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } public int getKeepAliveMaxRequestCount() { return keepAliveMaxRequestCount; } public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; } /** * @return Returns the keepAliveConnectTime. */ public long getKeepAliveConnectTime() { return keepAliveConnectTime; } /** * @return Returns the keepAliveCount. */ public int getKeepAliveCount() { return keepAliveCount; } /** * @return Returns the waitForAck. */ public boolean isWaitForAck() { return waitForAck; } /** * @param waitForAck * The waitForAck to set. */ public void setWaitForAck(boolean waitForAck) { this.waitForAck = waitForAck; } // --------------------------------------------------------- Public Methods public void connect() throws java.io.IOException { connectCounter++; if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.connect", address, new Integer(port))); openSocket(); } /** * close socket * * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect() * @see DataSender#closeSocket() */ public void disconnect() { disconnectCounter++; if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.disconnect", address, new Integer(port))); closeSocket(); } /** * Check, if time to close socket! Important for AsyncSocketSender that * replication thread is not fork again! <b>Only work when keepAliveTimeout * or keepAliveMaxRequestCount greater -1 </b> * @return true, is socket close * @see DataSender#closeSocket() */ public boolean checkIfCloseSocket() { boolean isCloseSocket = true ; long ctime = System.currentTimeMillis() - this.keepAliveConnectTime; if ((keepAliveTimeout > -1 && ctime > this.keepAliveTimeout) || (keepAliveMaxRequestCount > -1 && this.keepAliveCount >= this.keepAliveMaxRequestCount)) { closeSocket(); } else isCloseSocket = false ; return isCloseSocket; } /* * Send message * * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String, * byte[]) */ public synchronized void sendMessage(String messageid, byte[] data) throws java.io.IOException { pushMessage(messageid, data); } /* * Reset sender statistics */ public synchronized void resetStatistics() { nrOfRequests = 0; totalBytes = 0; disconnectCounter = 0; connectCounter = isConnected() ? 1 : 0; missingAckCounter = 0; dataResendCounter = 0; socketOpenCounter =isConnected() ? 1 : 0; socketCloseCounter = 0; } /** * Name of this SockerSender */ public String toString() { StringBuffer buf = new StringBuffer("DataSender["); buf.append(getAddress()).append(":").append(getPort()).append("]"); return buf.toString(); } // --------------------------------------------------------- Protected // Methods /** * @throws IOException * @throws SocketException */ protected void openSocket() throws IOException, SocketException { socketOpenCounter++; if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.openSocket", address, new Integer( port))); sc = new Socket(getAddress(), getPort()); if (isWaitForAck()) sc.setSoTimeout((int) ackTimeout); isSocketConnected = true; this.keepAliveCount = 0; this.keepAliveConnectTime = System.currentTimeMillis(); } /** * close socket * * @see DataSender#disconnect() * @see DataSender#checkIfCloseSocket() */ protected void closeSocket() { if(isSocketConnected) { socketCloseCounter++; if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.socketclose", address, new Integer(port))); try { sc.close(); } catch (Exception x) { } isSocketConnected = false; } } /** * Add statistic for this socket instance * * @param length */ protected void addStats(int length) { nrOfRequests++; totalBytes += length; if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) { log.debug(sm.getString("IDataSender.stats", new Object[] { getAddress().getHostAddress(), new Integer(getPort()), new Long(totalBytes), new Long(nrOfRequests), new Long(totalBytes / nrOfRequests) })); } } /** * push messages with only one socket at a time * * @param messageid * unique message id * @param data * data to send * @throws java.io.IOException */ protected synchronized void pushMessage(String messageid, byte[] data) throws java.io.IOException { checkIfCloseSocket(); if (!isConnected()) openSocket(); try { sc.getOutputStream().write(data); sc.getOutputStream().flush(); if (isWaitForAck()) waitForAck(ackTimeout); } catch (java.io.IOException x) { // second try with fresh connection dataResendCounter++; if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", address, new Integer(port))); closeSocket(); openSocket(); sc.getOutputStream().write(data); sc.getOutputStream().flush(); if (isWaitForAck()) waitForAck(ackTimeout); } this.keepAliveCount++; checkIfCloseSocket(); addStats(data.length); if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.message", address, new Integer(port), messageid, new Long(data.length))); } /** * Wait for Acknowledgement from other server * * @param timeout * @throws java.io.IOException */ protected void waitForAck(long timeout) throws java.io.IOException { try { int i = sc.getInputStream().read(); while ((i != -1) && (i != 3)) { i = sc.getInputStream().read(); } } catch (java.net.SocketTimeoutException x) { missingAckCounter++; log.warn(sm.getString("IDataSender.missing.ack", getAddress(), new Integer(getPort()), new Long(this.ackTimeout))); throw x; } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]