remm 2005/03/15 03:26:58 Added: modules/cluster/src/share/org/apache/catalina/cluster/tcp FastAsyncSocketSender.java Log: - Add missing file, as Peter is behind a firewall. Revision Changes Path 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java Index: FastAsyncSocketSender.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package org.apache.catalina.cluster.tcp; import java.net.InetAddress; import org.apache.catalina.cluster.util.FastQueue; import org.apache.catalina.cluster.util.LinkObject; import org.apache.catalina.cluster.util.IQueue; /** * Send cluster messages from a Message queue with only one socket. Ack and keep * Alive Handling is supported. Fast Queue can limit queue size and consume all messages at queue at one block.<br/> * Limit the queue lock contention under high load!<br/> * <ul> * <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the * sender and all messages are queued. Only use this for small maintaince * isuses!</li> * <li>waitForAck=true, means that receiver ack the transfer</li> * <li>after one minute idle time, or number of request (100) the connection is * reconnected with next request. Change this for production use!</li> * <li>default ackTimeout is 15 sec: this is very low for big all session * replication messages after restart a node</li> * <li>disable keepAlive: keepAliveTimeout="-1" and * keepAliveMaxRequestCount="-1"</li> * <li>maxQueueLength: Limit the sender queue length (membership goes well, but transfer is failure!!)</li> * </ul> * FIXME: refactor code duplications with AsyncSocketSender => configurable or extract super class * @author Peter Rossbach ( idea comes form Rainer Jung) * @version $Revision: 1.1 $ $Date: 2005/03/15 11:26:58 $ * @since 5.5.9 */ public class FastAsyncSocketSender extends DataSender { private static int threadCounter = 1; private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory .getLog(FastAsyncSocketSender.class); /** * The descriptive information about this implementation. */ private static final String info = "FastAsyncSocketSender/1.1"; // ----------------------------------------------------- Instance Variables /** * Message Queue */ private FastQueue queue = new FastQueue(); /** * Active thread to push messages asynchronous to the other replication node */ private FastQueueThread queueThread = null; /** * Count number of queue message */ private long inQueueCounter = 0; /** * Count all successfull push messages from queue */ private long outQueueCounter = 0; /** * Current number of bytes from all queued messages */ private long queuedNrOfBytes = 0; // ------------------------------------------------------------- Constructor /** * start background thread to push incomming cluster messages to replication * node * * @param host * replication node tcp address * @param port * replication node tcp port */ public FastAsyncSocketSender(InetAddress host, int port) { super(host, port); checkThread(); } // ------------------------------------------------------------- Properties /** * Return descriptive information about this implementation and the * corresponding version number, in the format * <code><description>/<version></code>. */ public String getInfo() { return (info); } /** * get current add wait timeout * @return current wait timeout */ public long getQueueAddWaitTimeout() { return queue.getAddWaitTimeout(); } /** * Set add wait timeout (default 10000 msec) * @param timeout */ public void setQueueAddWaitTimeout(long timeout) { queue.setAddWaitTimeout(timeout); } /** * get current remove wait timeout * @return */ public long getQueueRemoveWaitTimeout() { return queue.getRemoveWaitTimeout(); } /** * set remove wait timeout ( default 30000 msec) * @param timeout */ public void setRemoveWaitTimeout(long timeout) { queue.setRemoveWaitTimeout(timeout); } /** * @return Returns the checkLock. */ public boolean isQueueCheckLock() { return queue.isCheckLock(); } /** * @param checkLock The checkLock to set. */ public void setQueueCheckLock(boolean checkLock) { queue.setCheckLock(checkLock); } /** * @return Returns the doStats. */ public boolean isQueueDoStats() { return queue.isDoStats(); } /** * @param doStats The doStats to set. */ public void setQueueDoStats(boolean doStats) { queue.setDoStats(doStats); } /** * @return Returns the timeWait. */ public boolean isQueueTimeWait() { return queue.isTimeWait(); } /** * @param timeWait The timeWait to set. */ public void setQueueTimeWait(boolean timeWait) { queue.setTimeWait(timeWait); } /** * @return Returns the inQueueCounter. */ public int getMaxQueueLength() { return queue.getMaxQueueLength(); } /** * @param set max * Queue length */ public void setMaxQueueLength(int length) { queue.setMaxQueueLength(length); } /** * @return Returns the add wait times. */ public long getQueueAddWaitTime() { return queue.getAddWait(); } /** * @return Returns the add wait times. */ public long getQueueRemoveWaitTime() { return queue.getRemoveWait(); } /** * @return Returns the inQueueCounter. */ public long getInQueueCounter() { return inQueueCounter; } /** * @return Returns the outQueueCounter. */ public long getOutQueueCounter() { return outQueueCounter; } /** * @return Returns the queueSize. */ public int getQueueSize() { return queue.getSize(); } /** * @return Returns the queuedNrOfBytes. */ public long getQueuedNrOfBytes() { return queuedNrOfBytes; } // --------------------------------------------------------- Public Methods /* * Connect to socket and start background thread to ppush queued messages * * @see org.apache.catalina.cluster.tcp.IDataSender#connect() */ public void connect() throws java.io.IOException { super.connect(); checkThread(); queue.start() ; } /** * Disconnect socket ad stop queue thread * * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect() */ public void disconnect() { stopThread(); queue.stop() ; // FIXME what is when message in queue => auto reconnect after one sending failure? super.disconnect(); } /* * Send message to queue for later sending * * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String, * byte[]) */ public synchronized void sendMessage(String messageid, byte[] data) throws java.io.IOException { queue.add(messageid, data); inQueueCounter++; queuedNrOfBytes += data.length; if (log.isTraceEnabled()) log.trace(sm.getString("AsyncSocketSender.queue.message", getAddress().getHostAddress(), new Integer(getPort()), messageid, new Long( data.length))); } /* * Reset sender statistics */ public synchronized void resetStatistics() { super.resetStatistics(); inQueueCounter = queue.getSize(); outQueueCounter = 0; queue.resetStatistics(); } /** * Name of this SockerSender */ public String toString() { StringBuffer buf = new StringBuffer("FastAsyncSocketSender["); buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]"); return buf.toString(); } // --------------------------------------------------------- Public Methods /** * Start Queue thread as daemon */ protected void checkThread() { if (queueThread == null) { if (log.isInfoEnabled()) log.info(sm.getString("AsyncSocketSender.create.thread", getAddress(), new Integer(getPort()))); queueThread = new FastQueueThread(this, queue); queueThread.setDaemon(true); queueThread.start(); } } /** * stop queue worker thread */ protected void stopThread() { if (queueThread != null) { queueThread.stopRunning(); queueThread = null; } } /* * Reduce queued message date size counter */ protected void reduceQueuedCounter(int size) { queuedNrOfBytes -= size; } // -------------------------------------------------------- Inner Class private class FastQueueThread extends Thread { /** * Sender queue */ private IQueue queue = null; /** * Active sender */ private FastAsyncSocketSender sender = null; /** * Thread is active */ private boolean keepRunning = true; /** * Only use inside FastAsyncSocketSender * @param sender * @param queue */ private FastQueueThread(FastAsyncSocketSender sender, IQueue queue) { setName("Cluster-FastAsyncSocketSender-" + (threadCounter++)); this.queue = queue; this.sender = sender; } public void stopRunning() { keepRunning = false; } /* Get the objects from queue and send all mesages to the sender. * @see java.lang.Runnable#run() */ public void run() { while (keepRunning) { // get a link list of all queued objects LinkObject entry = queue.remove(); if (entry != null) { do { int messagesize = 0; try { byte[] data = (byte[]) entry.data(); messagesize = data.length; sender.pushMessage((String) entry.getKey(), data); outQueueCounter++; } catch (Exception x) { log.warn(sm.getString( "AsyncSocketSender.send.error", entry .getKey()),x); } finally { reduceQueuedCounter(messagesize); } entry = entry.next(); } while (entry != null); } else { log.error(sm.getString("syncSocketSender.queue.empty",sender.getAddress(), new Integer(sender.getPort()))); } } } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]