pero 2005/03/14 13:24:30 Modified: modules/cluster/src/share/org/apache/catalina/cluster/tcp AsyncSocketSender.java Constants.java DataSender.java IDataSender.java IDataSenderFactory.java LocalStrings.properties mbeans-descriptors.xml Added: modules/cluster/src/share/org/apache/catalina/cluster/util FastQueue.java IQueue.java LinkObject.java SingleRemoveSynchronizedAddLock.java Log: Add support to new fast async mode submitted by Rainer Jung Revision Changes Path 1.11 +3 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java Index: AsyncSocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v retrieving revision 1.10 retrieving revision 1.11 diff -u -r1.10 -r1.11 --- AsyncSocketSender.java 15 Feb 2005 09:31:45 -0000 1.10 +++ AsyncSocketSender.java 14 Mar 2005 21:24:30 -0000 1.11 @@ -36,7 +36,7 @@ * * @author Filip Hanik * @author Peter Rossbach - * @version 1.2 + * @version $Revision$ $Date$ */ public class AsyncSocketSender extends DataSender { @@ -89,7 +89,6 @@ public AsyncSocketSender(InetAddress host, int port) { super(host, port); checkThread(); - long a = Long.MAX_VALUE; } // ------------------------------------------------------------- Properties @@ -169,7 +168,7 @@ queuedNrOfBytes += data.length; if (log.isTraceEnabled()) log.trace(sm.getString("AsyncSocketSender.queue.message", - getAddress(), new Integer(getPort()), messageid, new Long( + getAddress().getHostAddress(), new Integer(getPort()), messageid, new Long( data.length))); } @@ -188,7 +187,7 @@ */ public String toString() { StringBuffer buf = new StringBuffer("AsyncSocketSender["); - buf.append(getAddress()).append(":").append(getPort()).append("]"); + buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]"); return buf.toString(); } 1.2 +1 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/Constants.java Index: Constants.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/Constants.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- Constants.java 15 Feb 2005 09:26:19 -0000 1.1 +++ Constants.java 14 Mar 2005 21:24:30 -0000 1.2 @@ -22,6 +22,7 @@ * package. * * @author Peter Rossbach + * @version $Revision$ $Date$ */ public class Constants { 1.3 +7 -7 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java Index: DataSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- DataSender.java 26 Feb 2005 09:12:26 -0000 1.2 +++ DataSender.java 14 Mar 2005 21:24:30 -0000 1.3 @@ -29,7 +29,7 @@ * * @author Peter Rossbach * @author Filip Hanik - * @version 1.2 + * @version $Revision$ $Date$ */ public class DataSender implements IDataSender { @@ -268,7 +268,7 @@ public void connect() throws java.io.IOException { connectCounter++; if (log.isDebugEnabled()) - log.debug(sm.getString("IDataSender.connect", address, + log.debug(sm.getString("IDataSender.connect", address.getHostAddress(), new Integer(port))); openSocket(); } @@ -283,7 +283,7 @@ public void disconnect() { disconnectCounter++; if (log.isDebugEnabled()) - log.debug(sm.getString("IDataSender.disconnect", address, + log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(), new Integer(port))); closeSocket(); } @@ -350,7 +350,7 @@ protected void openSocket() throws IOException, SocketException { socketOpenCounter++; if (log.isDebugEnabled()) - log.debug(sm.getString("IDataSender.openSocket", address, new Integer( + log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer( port))); sc = new Socket(getAddress(), getPort()); if (isWaitForAck()) @@ -371,7 +371,7 @@ socketCloseCounter++; if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.closeSocket", - address, new Integer(port))); + address.getHostAddress(), new Integer(port))); try { sc.close(); } catch (Exception x) { @@ -419,7 +419,7 @@ // second try with fresh connection dataResendCounter++; if (log.isTraceEnabled()) - log.trace(sm.getString("IDataSender.send.again", address, + log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(), new Integer(port))); closeSocket(); openSocket(); @@ -432,7 +432,7 @@ checkIfCloseSocket(); addStats(data.length); if (log.isTraceEnabled()) - log.trace(sm.getString("IDataSender.send.message", address, + log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(), new Integer(port), messageid, new Long(data.length))); } 1.7 +2 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java Index: IDataSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- IDataSender.java 15 Feb 2005 09:31:45 -0000 1.6 +++ IDataSender.java 14 Mar 2005 21:24:30 -0000 1.7 @@ -17,12 +17,9 @@ package org.apache.catalina.cluster.tcp; /** - * <p>Title: </p> - * <p>Description: </p> - * <p>Copyright: Copyright (c) 2002</p> - * <p>Company: </p> - * @author not attributable + * @author Peter Rossbach * @version 1.0 + * @since 5.5.7 */ public interface IDataSender 1.4 +12 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java Index: IDataSenderFactory.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- IDataSenderFactory.java 27 Feb 2004 14:58:56 -0000 1.3 +++ IDataSenderFactory.java 14 Mar 2005 21:24:30 -0000 1.4 @@ -17,12 +17,19 @@ package org.apache.catalina.cluster.tcp; import org.apache.catalina.cluster.Member; import java.net.InetAddress; + +/** + * @author Peter Rossbach + * @version 1.0 + * @since 5.5.7 + */ public class IDataSenderFactory { private IDataSenderFactory() { } public static final String SYNC_MODE="synchronous"; public static final String ASYNC_MODE="asynchronous"; public static final String POOLED_SYNC_MODE="pooled"; + public static final String FAST_ASYNC_QUEUE_MODE="fastasyncqueue"; public synchronized static IDataSender getIDataSender(String mode, Member mbr) throws java.io.IOException { @@ -30,7 +37,9 @@ return new SocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort()); else if ( ASYNC_MODE.equals(mode) ) return new AsyncSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort()); - if (POOLED_SYNC_MODE.equals(mode) ) + else if ( FAST_ASYNC_QUEUE_MODE.equals(mode) ) + return new FastAsyncSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort()); + else if (POOLED_SYNC_MODE.equals(mode) ) return new PooledSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort()); else throw new java.io.IOException("Invalid replication mode="+mode); @@ -39,10 +48,11 @@ public static String validateMode(String mode) { if (SYNC_MODE.equals(mode) || ASYNC_MODE.equals(mode) || + FAST_ASYNC_QUEUE_MODE.equals(mode) || POOLED_SYNC_MODE.equals(mode) ) { return null; } else { - return "Replication mode has to be '"+SYNC_MODE+"', '"+ASYNC_MODE+"' or '"+POOLED_SYNC_MODE+"'"; + return "Replication mode has to be '"+SYNC_MODE+"', '" + FAST_ASYNC_QUEUE_MODE +"', '"+ASYNC_MODE+"' or '"+POOLED_SYNC_MODE+"'"; } } 1.3 +1 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties Index: LocalStrings.properties =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- LocalStrings.properties 26 Feb 2005 09:12:26 -0000 1.2 +++ LocalStrings.properties 14 Mar 2005 21:24:30 -0000 1.3 @@ -1,6 +1,7 @@ AsyncSocketSender.create.thread=Create sender [{0}:{1,number,integer}] queue thread to tcp background replication AsyncSocketSender.queue.message=Queue message to [{0}:{1,number,integer}] id=[{2}] size={3} AsyncSocketSender.send.error=Unable to asynchronously send session w/ id=[{0}] message will be ignored. +AsyncSocketSender.queue.empty=Queue in sender [{0}:{1,number,integer}] returned null element! IDataSender.connect=Sender connect to [{0}:{1,number,integer}] IDataSender.create=Create sender [{0}:{1,number,integer}] IDataSender.disconnect=Sender disconnect from [{0}:{1,number,integer}] 1.4 +147 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml Index: mbeans-descriptors.xml =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- mbeans-descriptors.xml 15 Feb 2005 09:33:05 -0000 1.3 +++ mbeans-descriptors.xml 14 Mar 2005 21:24:30 -0000 1.4 @@ -230,6 +230,153 @@ </mbean> + <mbean name="FastAsyncSocketSender" + description="Fast Async Cluster Sender" + domain="Catalina" + group="IDataSender" + type="org.apache.catalina.cluster.tcp.FastAsyncSocketSender"> + <attribute name="info" + description="Class version info" + type="java.lang.String" + writeable="false"/> + <attribute name="address" + description="sender ip address" + type="java.net.InetAddress" + writeable="false"/> + <attribute name="port" + description="sender port" + type="int" + writeable="false" /> + <attribute name="suspect" + description="Socket is gone" + type="boolean"/> + <attribute name="waitForAck" + description="Wait for ack after data send" + is="true" + type="boolean" + writeable="false" /> + <attribute name="ackTimeout" + description="acknowledge timeout" + type="long"/> + <attribute name="queueSize" + writeable="false" + description="queue size" + type="int"/> + <attribute name="queuedNrOfBytes" + writeable="false" + description="number of bytes over all queued messages" + type="long"/> + <attribute name="keepAliveTimeout" + description="active socket keep alive timeout" + type="long"/> + <attribute name="keepAliveMaxRequestCount" + description="max request over this socket" + type="int"/> + <attribute name="queueAddWaitTimeout" + description="add wait timeout (default 10000 msec)" + type="long"/> + <attribute name="queueRemoveWaitTimeout" + description="remove wait timeout (default 30000 msec)" + type="long"/> + <attribute name="maxQueueLength" + description="max queue length" + type="int"/> + <attribute name="queueTimeWait" + description="remember queue wait times" + is="true" + type="boolean"/> + <attribute name="queueCheckLock" + description="check to lost locks" + is="true" + type="boolean"/> + <attribute name="queueDoStats" + description="activated queue stats" + is="true" + type="boolean"/> + <attribute name="keepAliveCount" + description="keep Alive request count" + type="int" + writeable="false"/> + <attribute name="keepAliveConnectTime" + description="Connect time for keep alive" + type="long" + writeable="false"/> + <attribute name="connected" + is="true" + description="socket connected" + type="boolean" + writeable="false"/> + <attribute name="nrOfRequests" + description="number of send messages to other members" + type="long" + writeable="false"/> + <attribute name="totalBytes" + description="number of bytes transfered" + type="long" + writeable="false"/> + <attribute name="connectCounter" + description="counts connects" + type="long" + writeable="false"/> + <attribute name="disconnectCounter" + description="counts disconnects" + type="long" + writeable="false"/> + <attribute name="socketOpenCounter" + description="counts open socket (KeepAlive and connects)" + type="long" + writeable="false"/> + <attribute name="socketCloseCounter" + description="counts closed socket (KeepAlive and disconnects)" + type="long" + writeable="false"/> + <attribute name="missingAckCounter" + description="counts missing ack" + type="long" + writeable="false"/> + <attribute name="dataResendCounter" + description="counts data resends" + type="long" + writeable="false"/> + <attribute name="inQueueCounter" + description="counts all queued messages" + type="long" + writeable="false"/> + <attribute name="outQueueCounter" + description="counts all successfully sended messages" + type="long" + writeable="false"/> + <attribute name="queueAddWaitTime" + description="queue add wait time (tomcat thread waits)" + type="long" + writeable="false"/> + <attribute name="queueRemoveWaitTime" + description="queue remove wait time (queue thread waits)" + type="long" + writeable="false"/> + <operation name="connect" + description="connect to other replication node" + impact="ACTION" + returnType="void"> + </operation> + <operation name="disconnect" + description="disconnect to other replication node" + impact="ACTION" + returnType="void"> + </operation> + <operation name="checkIfCloseSocket" + description="Check connection for close socket" + impact="ACTION" + returnType="boolean"> + </operation> + <operation name="resetStatistics" + description="Reset all statistics" + impact="ACTION" + returnType="void"> + </operation> + + </mbean> + <mbean name="PooledSocketSender" description="Pooled Cluster Sender" domain="Catalina" 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java Index: FastQueue.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.util; /** * A fast queue that remover thread lock the adder thread. <br/>Limit the queue * length when you have strange producer thread problemes. * * FIXME add i18n support to log messages * @author Rainer Jung * @author Peter Rossbach * @version $Revision: 1.1 $ $Date: 2005/03/14 21:24:30 $ */ /** * @author peter * * TODO To change the template for this generated type comment go to * Window - Preferences - Java - Code Style - Code Templates */ public class FastQueue implements IQueue { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory .getLog(FastQueue.class); /** * This is the actual queue */ private SingleRemoveSynchronizedAddLock lock = null; /** * First Object at queue (consumer message) */ private LinkObject first = null; /** * Last object in queue (producer Object) */ private LinkObject last = null; /** * Current Queue elements size */ private int size = 0; /** * check lock to detect strange threadings things */ private boolean checkLock = false; /** * protocol the thread wait times */ private boolean timeWait = false; /** * calc stats data */ private boolean doStats = false; /** * */ private boolean inAdd = false; /** * */ private boolean inRemove = false; /** * */ private boolean inMutex = false; /** * limit the queue legnth ( default is unlimited) */ private int maxQueueLength = 0; /** * addWaitTimeout for producer */ private long addWaitTimeout = 10000L; /** * removeWaitTimeout for consumer */ private long removeWaitTimeout = 30000L; /** * enabled the queue */ private boolean enabled = true; /** * calc all add objects */ private long addCounter = 0; /** * calc all add objetcs in error state ( see limit queue length) */ private long addErrorCounter = 0; /** * calc all remove objects */ private long removeCounter = 0; /** * calc all remove objects failures (hupps probleme detection) */ private long removeErrorCounter = 0; /** * Calc wait time thread */ private long addWait = 0; /** * Calc remove time threads */ private long removeWait = 0; /** * max queue size */ private int maxSize = 0; /** * avg queue size */ private long avgSize = 0; /* * */ private int maxSizeSample = 0; /* * */ private long avgSizeSample = 0; /** * avg size sample interval */ private int sampleInterval = 100; /** * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait * Timeouts */ public FastQueue() { lock = new SingleRemoveSynchronizedAddLock(); lock.setAddWaitTimeout(addWaitTimeout); lock.setRemoveWaitTimeout(removeWaitTimeout); } /** * get current add wait timeout * * @return current wait timeout */ public long getAddWaitTimeout() { addWaitTimeout = lock.getAddWaitTimeout(); return addWaitTimeout; } /** * Set add wait timeout (default 10000 msec) * * @param timeout */ public void setAddWaitTimeout(long timeout) { addWaitTimeout = timeout; lock.setAddWaitTimeout(addWaitTimeout); } /** * get current remove wait timeout * * @return */ public long getRemoveWaitTimeout() { removeWaitTimeout = lock.getRemoveWaitTimeout(); return removeWaitTimeout; } /** * set remove wait timeout ( default 30000 msec) * * @param timeout */ public void setRemoveWaitTimeout(long timeout) { removeWaitTimeout = timeout; lock.setRemoveWaitTimeout(removeWaitTimeout); } /* * get Max Queue length * * @see org.apache.catalina.cluster.util.IQueue#getMaxQueueLength() */ public int getMaxQueueLength() { return maxQueueLength; } public void setMaxQueueLength(int length) { maxQueueLength = length; } public boolean isEnabled() { return enabled; } public void setEnabled(boolean enable) { enabled = enable; if (!enabled) { lock.abortRemove(); } } /* * @return Returns the checkLock. */ public boolean isCheckLock() { return checkLock; } /* * @param checkLock The checkLock to set. */ public void setCheckLock(boolean checkLock) { this.checkLock = checkLock; } /* * @return Returns the doStats. */ public boolean isDoStats() { return doStats; } /* * @param doStats The doStats to set. */ public void setDoStats(boolean doStats) { this.doStats = doStats; } /* * @return Returns the timeWait. */ public boolean isTimeWait() { return timeWait; } /* * @param timeWait The timeWait to set. */ public void setTimeWait(boolean timeWait) { this.timeWait = timeWait; } public int getSampleInterval() { return sampleInterval; } public void setSampleInterval(int interval) { sampleInterval = interval; } public long getAddCounter() { return addCounter; } public void setAddCounter(long counter) { addCounter = counter; } public long getAddErrorCounter() { return addErrorCounter; } public void setAddErrorCounter(long counter) { addErrorCounter = counter; } public long getRemoveCounter() { return removeCounter; } public void setRemoveCounter(long counter) { removeCounter = counter; } public long getRemoveErrorCounter() { return removeErrorCounter; } public void setRemoveErrorCounter(long counter) { removeErrorCounter = counter; } public long getAddWait() { return addWait; } public void setAddWait(long wait) { addWait = wait; } public long getRemoveWait() { return removeWait; } public void setRemoveWait(long wait) { removeWait = wait; } /** * @return */ public int getMaxSize() { return maxSize; } /** * @param size */ public void setMaxSize(int size) { maxSize = size; } /** * Avg queue size * @return */ public long getAvgSize() { if (addCounter > 0) { return avgSize / addCounter; } else { return 0; } } /** * reset all stats data */ public void resetStatistics() { addCounter = 0; addErrorCounter = 0; removeCounter = 0; removeErrorCounter = 0; avgSize = 0; maxSize = 0; addWait = 0; removeWait = 0; } /** * unlock queue for next add */ public void unlockAdd() { lock.unlockAdd(size > 0 ? true : false); } /** * unlock queue for next remove */ public void unlockRemove() { lock.unlockRemove(); } /** * start queuing */ public void start() { setEnabled(true); } /** * start queuing */ public void stop() { setEnabled(false); } public long getSample() { return addCounter % sampleInterval; } public int getMaxSizeSample() { return maxSizeSample; } public void setMaxSizeSample(int size) { maxSizeSample = size; } public long getAvgSizeSample() { long sample = addCounter % sampleInterval; if (sample > 0) { return avgSizeSample / sample; } else if (addCounter > 0) { return avgSizeSample / sampleInterval; } else { return 0; } } public int getSize() { int sz; sz = size; return sz; } /* Add new data to the queue * @see org.apache.catalina.cluster.util.IQueue#add(java.lang.String, java.lang.Object) * FIXME extract some method */ public boolean add(String key, Object data) { boolean ok = true; long time = 0; if (!enabled) { if (log.isInfoEnabled()) log.info("FastQueue: queue disabled, add aborted"); return false; } if (timeWait) { time = System.currentTimeMillis(); } lock.lockAdd(); try { if (timeWait) { addWait += (System.currentTimeMillis() - time); } if (log.isTraceEnabled()) { log.trace("FastQueue: add starting with size " + size); } if (checkLock) { if (inAdd) log.warn("FastQueue.add: Detected other add"); inAdd = true; if (inMutex) log.warn("FastQueue.add: Detected other mutex in add"); inMutex = true; } if ((maxQueueLength > 0) && (size >= maxQueueLength)) { ok = false; if (log.isTraceEnabled()) { log.trace("FastQueue: Could not add, since queue is full (" + size + ">=" + maxQueueLength + ")"); } } else { LinkObject element = new LinkObject(key, data); if (size == 0) { first = last = element; size = 1; } else { if (last == null) { ok = false; log .error("FastQueue: Could not add, since last is null although size is " + size + " (>0)"); } else { last.append(element); last = element; size++; } } } if (doStats) { if (ok) { if (addCounter % sampleInterval == 0) { maxSizeSample = 0; avgSizeSample = 0; } addCounter++; if (size > maxSize) { maxSize = size; } if (size > maxSizeSample) { maxSizeSample = size; } avgSize += size; avgSizeSample += size; } else { addErrorCounter++; } } if (first == null) { log.error("FastQueue: first is null, size is " + size + " at end of add"); } if (last == null) { log.error("FastQueue: last is null, size is " + size + " at end of add"); } if (checkLock) { if (!inMutex) log.warn("FastQueue: Cancelled by other mutex in add"); inMutex = false; if (!inAdd) log.warn("FastQueue: Cancelled by other add"); inAdd = false; } if (log.isTraceEnabled()) { log.trace("FastQueue: add ending with size " + size); } if (timeWait) { time = System.currentTimeMillis(); } } finally { lock.unlockAdd(true); } if (timeWait) { addWait += (System.currentTimeMillis() - time); } return ok; } /* remove the complete queued object list * @see org.apache.catalina.cluster.util.IQueue#remove() * FIXME extract some method */ public LinkObject remove() { LinkObject element; boolean gotLock; long time = 0; if (!enabled) { if (log.isInfoEnabled()) log.info("FastQueue: queue disabled, remove aborted"); return null; } if (timeWait) { time = System.currentTimeMillis(); } gotLock = lock.lockRemove(); try { if (!gotLock) { if (enabled) { if (timeWait) { removeWait += (System.currentTimeMillis() - time); } if (doStats) { removeErrorCounter++; } if (log.isInfoEnabled()) log .info("FastQueue: Remove aborted although queue enabled"); } else { if (log.isInfoEnabled()) log.info("FastQueue: queue disabled, remove aborted"); } return null; } if (timeWait) { removeWait += (System.currentTimeMillis() - time); } if (log.isTraceEnabled()) { log.trace("FastQueue: remove starting with size " + size); } if (checkLock) { if (inRemove) log.warn("FastQueue: Detected other remove"); inRemove = true; if (inMutex) log.warn("FastQueue: Detected other mutex in remove"); inMutex = true; } element = first; if (doStats) { if (element != null) { removeCounter++; } else { removeErrorCounter++; log .error("FastQueue: Could not remove, since first is null although size is " + size + " (>0)"); } } first = last = null; size = 0; if (checkLock) { if (!inMutex) log.warn("FastQueue: Cancelled by other mutex in remove"); inMutex = false; if (!inRemove) log.warn("FastQueue: Cancelled by other remove"); inRemove = false; } if (log.isTraceEnabled()) { log.trace("FastQueue: remove ending with size " + size); } if (timeWait) { time = System.currentTimeMillis(); } } finally { lock.unlockRemove(); } if (timeWait) { removeWait += (System.currentTimeMillis() - time); } return element; } } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/IQueue.java Index: IQueue.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.util; /** * A queue interface<BR> * * @author Rainer Jung * @author Peter Rossbach * @version $Revision: 1.1 $ $Date: 2005/03/14 21:24:30 $ */ public interface IQueue { public LinkObject remove(); public boolean add(String key,Object data); public int getMaxQueueLength(); public void setMaxQueueLength(int length); public void start(); public void stop(); } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/LinkObject.java Index: LinkObject.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.util; /** * The class <b>LinkObject</b> implements an element * for a linked list, consisting of a general * data object and a pointer to the next element. * * @author Rainer Jung * @author Peter Rossbach * @version $Revision: 1.1 $ $Date: 2005/03/14 21:24:30 $ */ public class LinkObject { private Object payload; private LinkObject next; private String key ; /** * Construct a new element from the data object. * Sets the pointer to null. * @param Object payload The data object. */ public LinkObject(String key,Object payload) { this.payload = payload; this.next = null; this.key = key ; } /** * Set the next element. * @param LinkObject next The next element. */ public void append(LinkObject next) { this.next = next; } /** * Get the next element. * @return The next element. */ public LinkObject next() { return next; } /** * Get the data object from the element. * @return The data object from the element. */ public Object data() { return payload; } /** * Get the unique message id * @return the unique message id */ public Object getKey() { return key; } } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java Index: SingleRemoveSynchronizedAddLock.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.util; /** * The class <b>SingleRemoveSynchronizedAddLock</b> implement locking for accessing the queue * by a single remove thread and multiple add threads. * * A thread is only allowed to be either the remove or * an add thread. * * The lock can either be owned by the remove thread * or by a single add thread. * * If the remove thread tries to get the lock, * but the queue is empty, it will block (poll) * until an add threads adds an entry to the queue and * releases the lock. * * If the remove thread and add threads compete for * the lock and an add thread releases the lock, then * the remove thread will get the lock first. * * The remove thread removes all entries in the queue * at once and proceeses them without further * polling the queue. * * The lock is not reentrant, in the sense, that all * threads must release an owned lock before competing * for the lock again! * * @author Rainer Jung * @author Peter Rossbach * @version 1.1 */ public class SingleRemoveSynchronizedAddLock { public SingleRemoveSynchronizedAddLock() { } public SingleRemoveSynchronizedAddLock(boolean dataAvailable) { this.dataAvailable=dataAvailable; } /** * Time in milliseconds after which threads * waiting for an add lock are woken up. * This is used as a safety measure in case * thread notification via the unlock methods * has a bug. */ private long addWaitTimeout = 10000L; /** * Time in milliseconds after which threads * waiting for a remove lock are woken up. * This is used as a safety measure in case * thread notification via the unlock methods * has a bug. */ private long removeWaitTimeout = 30000L; /** * The current remove thread. * It is set to the remove thread polling for entries. * It is reset to null when the remove thread * releases the lock and proceeds processing * the removed entries. */ private Thread remover = null; /** * A flag indicating, if an add thread owns the lock. */ private boolean addLocked = false; /** * A flag indicating, if the remove thread owns the lock. */ private boolean removeLocked = false; /** * A flag indicating, if the remove thread is allowed * to wait for the lock. The flag is set to false, when aborting. */ private boolean removeEnabled = true; /** * A flag indicating, if the remover needs polling. * It indicates, if the locked object has data available * to be removed. */ private boolean dataAvailable = false; /** * @return Value of addWaitTimeout */ public synchronized long getAddWaitTimeout() { return addWaitTimeout; } /** * Set value of addWaitTimeout */ public synchronized void setAddWaitTimeout(long timeout) { addWaitTimeout = timeout; } /** * @return Value of removeWaitTimeout */ public synchronized long getRemoveWaitTimeout() { return removeWaitTimeout; } /** * Set value of removeWaitTimeout */ public synchronized void setRemoveWaitTimeout(long timeout) { removeWaitTimeout = timeout; } /** * Check if the locked object has data available * i.e. the remover can stop poling and get the lock. * @return True iff the lock Object has data available. */ public synchronized boolean isDataAvailable() { return dataAvailable; } /** * Check if an add thread owns the lock. * @return True iff an add thread owns the lock. */ public synchronized boolean isAddLocked() { return addLocked; } /** * Check if the remove thread owns the lock. * @return True iff the remove thread owns the lock. */ public synchronized boolean isRemoveLocked() { return removeLocked; } /** * Check if the remove thread is polling. * @return True iff the remove thread is polling. */ public synchronized boolean isRemovePolling() { if ( remover != null ) { return true; } return false; } /** * Acquires the lock by an add thread and sets the add flag. * If any add thread or the remove thread already acquired the lock * this add thread will block until the lock is released. */ public synchronized void lockAdd() { if ( addLocked || removeLocked ) { do { try { wait(addWaitTimeout); } catch ( InterruptedException e ) { } } while ( addLocked || removeLocked ); } addLocked=true; } /** * Acquires the lock by the remove thread and sets the remove flag. * If any add thread already acquired the lock or the queue is * empty, the remove thread will block until the lock is released * and the queue is not empty. */ public synchronized boolean lockRemove() { removeLocked=false; if ( ( addLocked || ! dataAvailable ) && removeEnabled ) { remover=Thread.currentThread(); do { try { wait(removeWaitTimeout); } catch ( InterruptedException e ) { } } while ( ( addLocked || ! dataAvailable ) && removeEnabled ); remover=null; } if ( removeEnabled ) { removeLocked=true; } return removeLocked; } /** * Releases the lock by an add thread and reset the remove flag. * If the reader thread is polling, notify it. */ public synchronized void unlockAdd(boolean dataAvailable) { addLocked=false; this.dataAvailable=dataAvailable; if ( ( remover != null ) && ( dataAvailable || ! removeEnabled ) ) { remover.interrupt(); } else { notifyAll(); } } /** * Releases the lock by the remove thread and reset the add flag. * Notify all waiting add threads, * that the lock has been released by the remove thread. */ public synchronized void unlockRemove() { removeLocked=false; dataAvailable=false; notifyAll(); } /** * Abort any polling remover thread */ public synchronized void abortRemove() { removeEnabled=false; if ( remover != null ) { remover.interrupt(); } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]