pero        2005/03/14 13:24:30

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        AsyncSocketSender.java Constants.java
                        DataSender.java IDataSender.java
                        IDataSenderFactory.java LocalStrings.properties
                        mbeans-descriptors.xml
  Added:       modules/cluster/src/share/org/apache/catalina/cluster/util
                        FastQueue.java IQueue.java LinkObject.java
                        SingleRemoveSynchronizedAddLock.java
  Log:
  Add support to new fast async mode
  submitted by Rainer Jung
  
  Revision  Changes    Path
  1.11      +3 -4      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
  
  Index: AsyncSocketSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- AsyncSocketSender.java    15 Feb 2005 09:31:45 -0000      1.10
  +++ AsyncSocketSender.java    14 Mar 2005 21:24:30 -0000      1.11
  @@ -36,7 +36,7 @@
    * 
    * @author Filip Hanik
    * @author Peter Rossbach
  - * @version 1.2
  + * @version $Revision$ $Date$
    */
   public class AsyncSocketSender extends DataSender {
       
  @@ -89,7 +89,6 @@
       public AsyncSocketSender(InetAddress host, int port) {
           super(host, port);
           checkThread();
  -        long a = Long.MAX_VALUE;
       }
   
       // ------------------------------------------------------------- 
Properties
  @@ -169,7 +168,7 @@
           queuedNrOfBytes += data.length;
           if (log.isTraceEnabled())
               log.trace(sm.getString("AsyncSocketSender.queue.message",
  -                    getAddress(), new Integer(getPort()), messageid, new 
Long(
  +                    getAddress().getHostAddress(), new Integer(getPort()), 
messageid, new Long(
                               data.length)));
       }
   
  @@ -188,7 +187,7 @@
        */
       public String toString() {
           StringBuffer buf = new StringBuffer("AsyncSocketSender[");
  -        buf.append(getAddress()).append(":").append(getPort()).append("]");
  +        
buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]");
           return buf.toString();
       }
   
  
  
  
  1.2       +1 -0      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/Constants.java
  
  Index: Constants.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/Constants.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Constants.java    15 Feb 2005 09:26:19 -0000      1.1
  +++ Constants.java    14 Mar 2005 21:24:30 -0000      1.2
  @@ -22,6 +22,7 @@
    * package.
    *
    * @author Peter Rossbach
  + * @version $Revision$ $Date$
    */
   
   public class Constants {
  
  
  
  1.3       +7 -7      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
  
  Index: DataSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- DataSender.java   26 Feb 2005 09:12:26 -0000      1.2
  +++ DataSender.java   14 Mar 2005 21:24:30 -0000      1.3
  @@ -29,7 +29,7 @@
    * 
    * @author Peter Rossbach
    * @author Filip Hanik
  - * @version 1.2
  + * @version $Revision$ $Date$
    */
   public class DataSender implements IDataSender {
   
  @@ -268,7 +268,7 @@
       public void connect() throws java.io.IOException {
           connectCounter++;
           if (log.isDebugEnabled())
  -            log.debug(sm.getString("IDataSender.connect", address,
  +            log.debug(sm.getString("IDataSender.connect", 
address.getHostAddress(),
                       new Integer(port)));
           openSocket();
       }
  @@ -283,7 +283,7 @@
       public void disconnect() {
           disconnectCounter++;
           if (log.isDebugEnabled())
  -            log.debug(sm.getString("IDataSender.disconnect", address,
  +            log.debug(sm.getString("IDataSender.disconnect", 
address.getHostAddress(),
                       new Integer(port)));
           closeSocket();
       }
  @@ -350,7 +350,7 @@
       protected void openSocket() throws IOException, SocketException {
           socketOpenCounter++;
           if (log.isDebugEnabled())
  -            log.debug(sm.getString("IDataSender.openSocket", address, new 
Integer(
  +            log.debug(sm.getString("IDataSender.openSocket", 
address.getHostAddress(), new Integer(
                       port)));
           sc = new Socket(getAddress(), getPort());
           if (isWaitForAck())
  @@ -371,7 +371,7 @@
               socketCloseCounter++;
               if (log.isDebugEnabled())
                   log.debug(sm.getString("IDataSender.closeSocket",
  -                        address, new Integer(port)));
  +                        address.getHostAddress(), new Integer(port)));
               try {
                   sc.close();
               } catch (Exception x) {
  @@ -419,7 +419,7 @@
               // second try with fresh connection
               dataResendCounter++;
               if (log.isTraceEnabled())
  -                log.trace(sm.getString("IDataSender.send.again", address,
  +                log.trace(sm.getString("IDataSender.send.again", 
address.getHostAddress(),
                           new Integer(port)));
               closeSocket();
               openSocket();
  @@ -432,7 +432,7 @@
           checkIfCloseSocket();
           addStats(data.length);
           if (log.isTraceEnabled())
  -            log.trace(sm.getString("IDataSender.send.message", address,
  +            log.trace(sm.getString("IDataSender.send.message", 
address.getHostAddress(),
                       new Integer(port), messageid, new Long(data.length)));
   
       }
  
  
  
  1.7       +2 -5      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java
  
  Index: IDataSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSender.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- IDataSender.java  15 Feb 2005 09:31:45 -0000      1.6
  +++ IDataSender.java  14 Mar 2005 21:24:30 -0000      1.7
  @@ -17,12 +17,9 @@
   package org.apache.catalina.cluster.tcp;
   
   /**
  - * <p>Title: </p>
  - * <p>Description: </p>
  - * <p>Copyright: Copyright (c) 2002</p>
  - * <p>Company: </p>
  - * @author not attributable
  + * @author Peter Rossbach
    * @version 1.0
  + * @since 5.5.7
    */
   
   public interface IDataSender
  
  
  
  1.4       +12 -2     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java
  
  Index: IDataSenderFactory.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- IDataSenderFactory.java   27 Feb 2004 14:58:56 -0000      1.3
  +++ IDataSenderFactory.java   14 Mar 2005 21:24:30 -0000      1.4
  @@ -17,12 +17,19 @@
   package org.apache.catalina.cluster.tcp;
   import org.apache.catalina.cluster.Member;
   import java.net.InetAddress;
  +
  +/**
  + * @author Peter Rossbach
  + * @version 1.0
  + * @since 5.5.7
  + */
   public class IDataSenderFactory {
       private IDataSenderFactory() {
       }
       public static final String SYNC_MODE="synchronous";
       public static final String ASYNC_MODE="asynchronous";
       public static final String POOLED_SYNC_MODE="pooled";
  +    public static final String FAST_ASYNC_QUEUE_MODE="fastasyncqueue";
   
       public synchronized static IDataSender getIDataSender(String mode, 
Member mbr)
       throws java.io.IOException {
  @@ -30,7 +37,9 @@
               return new 
SocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
           else if ( ASYNC_MODE.equals(mode) )
               return new 
AsyncSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
  -        if (POOLED_SYNC_MODE.equals(mode) )
  +        else if ( FAST_ASYNC_QUEUE_MODE.equals(mode) )
  +            return new 
FastAsyncSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
  +        else if (POOLED_SYNC_MODE.equals(mode) )
               return new 
PooledSocketSender(InetAddress.getByName(mbr.getHost()),mbr.getPort());
           else
               throw new java.io.IOException("Invalid replication mode="+mode);
  @@ -39,10 +48,11 @@
       public static String validateMode(String mode) {
           if (SYNC_MODE.equals(mode) ||
               ASYNC_MODE.equals(mode) ||
  +            FAST_ASYNC_QUEUE_MODE.equals(mode) ||
               POOLED_SYNC_MODE.equals(mode) ) {
               return null;
           } else {
  -            return "Replication mode has to be '"+SYNC_MODE+"', 
'"+ASYNC_MODE+"' or '"+POOLED_SYNC_MODE+"'";
  +            return "Replication mode has to be '"+SYNC_MODE+"', '" + 
FAST_ASYNC_QUEUE_MODE +"', '"+ASYNC_MODE+"' or '"+POOLED_SYNC_MODE+"'";
           }
       }
   
  
  
  
  1.3       +1 -0      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
  
  Index: LocalStrings.properties
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- LocalStrings.properties   26 Feb 2005 09:12:26 -0000      1.2
  +++ LocalStrings.properties   14 Mar 2005 21:24:30 -0000      1.3
  @@ -1,6 +1,7 @@
   AsyncSocketSender.create.thread=Create sender [{0}:{1,number,integer}] queue 
thread to tcp background replication
   AsyncSocketSender.queue.message=Queue message to [{0}:{1,number,integer}] 
id=[{2}] size={3}
   AsyncSocketSender.send.error=Unable to asynchronously send session w/ 
id=[{0}] message will be ignored.
  +AsyncSocketSender.queue.empty=Queue in sender [{0}:{1,number,integer}] 
returned null element!
   IDataSender.connect=Sender connect to [{0}:{1,number,integer}]
   IDataSender.create=Create sender [{0}:{1,number,integer}]
   IDataSender.disconnect=Sender disconnect from [{0}:{1,number,integer}]
  
  
  
  1.4       +147 -0    
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
  
  Index: mbeans-descriptors.xml
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- mbeans-descriptors.xml    15 Feb 2005 09:33:05 -0000      1.3
  +++ mbeans-descriptors.xml    14 Mar 2005 21:24:30 -0000      1.4
  @@ -230,6 +230,153 @@
                                 
     </mbean>
   
  + <mbean         name="FastAsyncSocketSender"
  +          description="Fast Async Cluster Sender"
  +               domain="Catalina"
  +                group="IDataSender"
  +                 
type="org.apache.catalina.cluster.tcp.FastAsyncSocketSender">
  +    <attribute   name="info"
  +          description="Class version info"
  +                 type="java.lang.String"
  +                 writeable="false"/>
  +    <attribute   name="address"
  +          description="sender ip address"
  +                 type="java.net.InetAddress"
  +                 writeable="false"/>
  +    <attribute   name="port"
  +          description="sender port"
  +                 type="int"
  +                 writeable="false" />
  +    <attribute   name="suspect"
  +          description="Socket is gone"
  +                 type="boolean"/>
  +    <attribute   name="waitForAck"
  +          description="Wait for ack after data send"
  +                          is="true"
  +                 type="boolean"
  +                 writeable="false" />
  +    <attribute   name="ackTimeout"
  +          description="acknowledge timeout"
  +                 type="long"/>
  +    <attribute   name="queueSize"
  +                 writeable="false"
  +          description="queue size"
  +                 type="int"/>
  +    <attribute   name="queuedNrOfBytes"
  +                 writeable="false"
  +          description="number of bytes over all queued messages"
  +                 type="long"/>
  +    <attribute   name="keepAliveTimeout"
  +          description="active socket keep alive timeout"
  +                 type="long"/>
  +    <attribute   name="keepAliveMaxRequestCount"
  +          description="max request over this socket"
  +                 type="int"/>
  +    <attribute   name="queueAddWaitTimeout"
  +          description="add wait timeout (default 10000 msec)"
  +                 type="long"/>
  +    <attribute   name="queueRemoveWaitTimeout"
  +          description="remove wait timeout (default 30000 msec)"
  +                 type="long"/>
  +    <attribute   name="maxQueueLength"
  +          description="max queue length"
  +                 type="int"/>
  +    <attribute   name="queueTimeWait"
  +          description="remember queue wait times"
  +                 is="true"
  +                 type="boolean"/>
  +    <attribute   name="queueCheckLock"
  +          description="check to lost locks"
  +                 is="true"
  +                 type="boolean"/>
  +    <attribute   name="queueDoStats"
  +          description="activated queue stats"
  +                 is="true"
  +                 type="boolean"/>
  +    <attribute   name="keepAliveCount"
  +          description="keep Alive request count"
  +                 type="int"
  +                 writeable="false"/>
  +    <attribute   name="keepAliveConnectTime"
  +          description="Connect time for keep alive"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="connected"
  +                 is="true"
  +          description="socket connected"
  +                 type="boolean"
  +                 writeable="false"/>
  +    <attribute   name="nrOfRequests"
  +          description="number of send messages to other members"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="totalBytes"
  +          description="number of bytes transfered"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="connectCounter"
  +          description="counts connects"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="disconnectCounter"
  +          description="counts disconnects"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="socketOpenCounter"
  +          description="counts open socket (KeepAlive and connects)"
  +                 type="long"
  +                 writeable="false"/>                          
  +    <attribute   name="socketCloseCounter"
  +          description="counts closed socket (KeepAlive and disconnects)"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="missingAckCounter"
  +          description="counts missing ack"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="dataResendCounter"
  +          description="counts data resends"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="inQueueCounter"
  +          description="counts all queued messages"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="outQueueCounter"
  +          description="counts all successfully sended messages"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="queueAddWaitTime"
  +          description="queue add wait time (tomcat thread waits)"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="queueRemoveWaitTime"
  +          description="queue remove wait time (queue thread waits)"
  +                 type="long"
  +                 writeable="false"/>
  +     <operation name="connect"
  +               description="connect to other replication node"
  +               impact="ACTION"
  +               returnType="void">
  +    </operation>
  +     <operation name="disconnect"
  +               description="disconnect to other replication node"
  +               impact="ACTION"
  +               returnType="void">
  +    </operation>
  +     <operation name="checkIfCloseSocket"
  +               description="Check connection for close socket"
  +               impact="ACTION"
  +               returnType="boolean">
  +    </operation>
  +     <operation name="resetStatistics"
  +               description="Reset all statistics"
  +               impact="ACTION"
  +               returnType="void">
  +    </operation>
  +                              
  +  </mbean>
  +
     <mbean         name="PooledSocketSender"
             description="Pooled Cluster Sender"
                  domain="Catalina"
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java
  
  Index: FastQueue.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.util;
  
  /**
   * A fast queue that remover thread lock the adder thread. <br/>Limit the 
queue
   * length when you have strange producer thread problemes.
   * 
   * FIXME add i18n support to log messages
   * @author Rainer Jung
   * @author Peter Rossbach
   * @version $Revision: 1.1 $ $Date: 2005/03/14 21:24:30 $
   */
  /**
   * @author peter
   *
   * TODO To change the template for this generated type comment go to
   * Window - Preferences - Java - Code Style - Code Templates
   */
  public class FastQueue implements IQueue {
  
      private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory
              .getLog(FastQueue.class);
  
      /**
       * This is the actual queue
       */
      private SingleRemoveSynchronizedAddLock lock = null;
  
      /**
       * First Object at queue (consumer message)
       */
      private LinkObject first = null;
  
      /**
       * Last object in queue (producer Object)
       */
      private LinkObject last = null;
  
      /**
       * Current Queue elements size
       */
      private int size = 0;
  
      /**
       * check lock to detect strange threadings things
       */
      private boolean checkLock = false;
  
      /**
       * protocol the thread wait times
       */
      private boolean timeWait = false;
  
      /**
       * calc stats data
       */
      private boolean doStats = false;
  
      /**
       *  
       */
      private boolean inAdd = false;
  
      /**
       *  
       */
      private boolean inRemove = false;
  
      /**
       *  
       */
      private boolean inMutex = false;
  
      /**
       * limit the queue legnth ( default is unlimited)
       */
      private int maxQueueLength = 0;
  
      /**
       * addWaitTimeout for producer
       */
      private long addWaitTimeout = 10000L;
  
      
      /**
       * removeWaitTimeout for consumer
       */
      private long removeWaitTimeout = 30000L;
  
      /**
       * enabled the queue
       */
      private boolean enabled = true;
  
      /**
       * calc all add objects
       */
      private long addCounter = 0;
  
      /**
       * calc all add objetcs in error state ( see limit queue length)
       */
      private long addErrorCounter = 0;
  
      /**
       * calc all remove objects
       */
      private long removeCounter = 0;
  
      /**
       * calc all remove objects failures (hupps probleme detection)
       */
      private long removeErrorCounter = 0;
  
      /**
       * Calc wait time thread
       */
      private long addWait = 0;
  
      /**
       * Calc remove time threads
       */
      private long removeWait = 0;
  
      /**
       *  max queue size
       */
      private int maxSize = 0;
  
      /**
       * avg queue size
       */
      private long avgSize = 0;
  
      /*
       *  
       */
      private int maxSizeSample = 0;
  
      /*
       *  
       */
      private long avgSizeSample = 0;
  
      /**
       *  avg size sample interval
       */
      private int sampleInterval = 100;
  
      /**
       * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
       * Timeouts
       */
      public FastQueue() {
          lock = new SingleRemoveSynchronizedAddLock();
          lock.setAddWaitTimeout(addWaitTimeout);
          lock.setRemoveWaitTimeout(removeWaitTimeout);
      }
  
      /**
       * get current add wait timeout
       * 
       * @return current wait timeout
       */
      public long getAddWaitTimeout() {
          addWaitTimeout = lock.getAddWaitTimeout();
          return addWaitTimeout;
      }
  
      /**
       * Set add wait timeout (default 10000 msec)
       * 
       * @param timeout
       */
      public void setAddWaitTimeout(long timeout) {
          addWaitTimeout = timeout;
          lock.setAddWaitTimeout(addWaitTimeout);
      }
  
      /**
       * get current remove wait timeout
       * 
       * @return
       */
      public long getRemoveWaitTimeout() {
          removeWaitTimeout = lock.getRemoveWaitTimeout();
          return removeWaitTimeout;
      }
  
      /**
       * set remove wait timeout ( default 30000 msec)
       * 
       * @param timeout
       */
      public void setRemoveWaitTimeout(long timeout) {
          removeWaitTimeout = timeout;
          lock.setRemoveWaitTimeout(removeWaitTimeout);
      }
  
      /*
       * get Max Queue length
       * 
       * @see org.apache.catalina.cluster.util.IQueue#getMaxQueueLength()
       */
      public int getMaxQueueLength() {
          return maxQueueLength;
      }
  
      public void setMaxQueueLength(int length) {
          maxQueueLength = length;
      }
  
      public boolean isEnabled() {
          return enabled;
      }
  
      public void setEnabled(boolean enable) {
          enabled = enable;
          if (!enabled) {
              lock.abortRemove();
          }
      }
  
      /*
       * @return Returns the checkLock.
       */
      public boolean isCheckLock() {
          return checkLock;
      }
  
      /*
       * @param checkLock The checkLock to set.
       */
      public void setCheckLock(boolean checkLock) {
          this.checkLock = checkLock;
      }
  
      /*
       * @return Returns the doStats.
       */
      public boolean isDoStats() {
          return doStats;
      }
  
      /*
       * @param doStats The doStats to set.
       */
      public void setDoStats(boolean doStats) {
          this.doStats = doStats;
      }
  
      /*
       * @return Returns the timeWait.
       */
      public boolean isTimeWait() {
          return timeWait;
      }
  
      /*
       * @param timeWait The timeWait to set.
       */
      public void setTimeWait(boolean timeWait) {
          this.timeWait = timeWait;
      }
  
      public int getSampleInterval() {
          return sampleInterval;
      }
  
      public void setSampleInterval(int interval) {
          sampleInterval = interval;
      }
  
      public long getAddCounter() {
          return addCounter;
      }
  
      public void setAddCounter(long counter) {
          addCounter = counter;
      }
  
      public long getAddErrorCounter() {
          return addErrorCounter;
      }
  
      public void setAddErrorCounter(long counter) {
          addErrorCounter = counter;
      }
  
      public long getRemoveCounter() {
          return removeCounter;
      }
  
      public void setRemoveCounter(long counter) {
          removeCounter = counter;
      }
  
      public long getRemoveErrorCounter() {
          return removeErrorCounter;
      }
  
      public void setRemoveErrorCounter(long counter) {
          removeErrorCounter = counter;
      }
  
      public long getAddWait() {
          return addWait;
      }
  
      public void setAddWait(long wait) {
          addWait = wait;
      }
  
      public long getRemoveWait() {
          return removeWait;
      }
  
      public void setRemoveWait(long wait) {
          removeWait = wait;
      }
  
      /**
       * @return
       */
      public int getMaxSize() {
          return maxSize;
      }
  
      /**
       * @param size
       */
      public void setMaxSize(int size) {
          maxSize = size;
      }
  
      
      /**
       * Avg queue size
       * @return
       */
      public long getAvgSize() {
          if (addCounter > 0) {
              return avgSize / addCounter;
          } else {
              return 0;
          }
      }
  
      /**
       * reset all stats data 
       */
      public void resetStatistics() {
          addCounter = 0;
          addErrorCounter = 0;
          removeCounter = 0;
          removeErrorCounter = 0;
          avgSize = 0;
          maxSize = 0;
          addWait = 0;
          removeWait = 0;
      }
  
      /**
       * unlock queue for next add 
       */
      public void unlockAdd() {
          lock.unlockAdd(size > 0 ? true : false);
      }
  
      /**
       * unlock queue for next remove 
       */
      public void unlockRemove() {
          lock.unlockRemove();
      }
  
      /**
       * start queuing
       */
      public void start() {
          setEnabled(true);
      }
  
      /**
       * start queuing
       */
      public void stop() {
          setEnabled(false);
      }
  
      public long getSample() {
          return addCounter % sampleInterval;
      }
  
      public int getMaxSizeSample() {
          return maxSizeSample;
      }
  
      public void setMaxSizeSample(int size) {
          maxSizeSample = size;
      }
  
      public long getAvgSizeSample() {
          long sample = addCounter % sampleInterval;
          if (sample > 0) {
              return avgSizeSample / sample;
          } else if (addCounter > 0) {
              return avgSizeSample / sampleInterval;
          } else {
              return 0;
          }
      }
  
      public int getSize() {
          int sz;
          sz = size;
          return sz;
      }
  
      /* Add new data to the queue
       * @see org.apache.catalina.cluster.util.IQueue#add(java.lang.String, 
java.lang.Object)
       * FIXME extract some method
       */
      public boolean add(String key, Object data) {
          boolean ok = true;
          long time = 0;
  
          if (!enabled) {
              if (log.isInfoEnabled())
                  log.info("FastQueue: queue disabled, add aborted");
              return false;
          }
  
          if (timeWait) {
              time = System.currentTimeMillis();
          }
          lock.lockAdd();
          try {
              if (timeWait) {
                  addWait += (System.currentTimeMillis() - time);
              }
  
              if (log.isTraceEnabled()) {
                  log.trace("FastQueue: add starting with size " + size);
              }
              if (checkLock) {
                  if (inAdd)
                      log.warn("FastQueue.add: Detected other add");
                  inAdd = true;
                  if (inMutex)
                      log.warn("FastQueue.add: Detected other mutex in add");
                  inMutex = true;
              }
  
              if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
                  ok = false;
                  if (log.isTraceEnabled()) {
                      log.trace("FastQueue: Could not add, since queue is full 
("
                              + size + ">=" + maxQueueLength + ")");
                  }
  
              } else {
                  LinkObject element = new LinkObject(key, data);
                  if (size == 0) {
                      first = last = element;
                      size = 1;
                  } else {
                      if (last == null) {
                          ok = false;
                          log
                                  .error("FastQueue: Could not add, since last 
is null although size is "
                                          + size + " (>0)");
                      } else {
                          last.append(element);
                          last = element;
                          size++;
                      }
                  }
  
              }
  
              if (doStats) {
                  if (ok) {
                      if (addCounter % sampleInterval == 0) {
                          maxSizeSample = 0;
                          avgSizeSample = 0;
                      }
                      addCounter++;
                      if (size > maxSize) {
                          maxSize = size;
                      }
                      if (size > maxSizeSample) {
                          maxSizeSample = size;
                      }
                      avgSize += size;
                      avgSizeSample += size;
                  } else {
                      addErrorCounter++;
                  }
              }
  
              if (first == null) {
                  log.error("FastQueue: first is null, size is " + size
                          + " at end of add");
              }
              if (last == null) {
                  log.error("FastQueue: last is null, size is " + size
                          + " at end of add");
              }
  
              if (checkLock) {
                  if (!inMutex)
                      log.warn("FastQueue: Cancelled by other mutex in add");
                  inMutex = false;
                  if (!inAdd)
                      log.warn("FastQueue: Cancelled by other add");
                  inAdd = false;
              }
              if (log.isTraceEnabled()) {
                  log.trace("FastQueue: add ending with size " + size);
              }
  
              if (timeWait) {
                  time = System.currentTimeMillis();
              }
          } finally {
              lock.unlockAdd(true);
          }
          if (timeWait) {
              addWait += (System.currentTimeMillis() - time);
          }
          return ok;
      }
  
      /* remove the complete queued object list
       * @see org.apache.catalina.cluster.util.IQueue#remove()
       * FIXME extract some method
       */
      public LinkObject remove() {
          LinkObject element;
          boolean gotLock;
          long time = 0;
  
          if (!enabled) {
              if (log.isInfoEnabled())
                  log.info("FastQueue: queue disabled, remove aborted");
              return null;
          }
  
          if (timeWait) {
              time = System.currentTimeMillis();
          }
          gotLock = lock.lockRemove();
          try {
  
              if (!gotLock) {
                  if (enabled) {
                      if (timeWait) {
                          removeWait += (System.currentTimeMillis() - time);
                      }
                      if (doStats) {
                          removeErrorCounter++;
                      }
                      if (log.isInfoEnabled())
                          log
                                  .info("FastQueue: Remove aborted although 
queue enabled");
                  } else {
                      if (log.isInfoEnabled())
                          log.info("FastQueue: queue disabled, remove aborted");
                  }
                  return null;
              }
  
              if (timeWait) {
                  removeWait += (System.currentTimeMillis() - time);
              }
  
              if (log.isTraceEnabled()) {
                  log.trace("FastQueue: remove starting with size " + size);
              }
              if (checkLock) {
                  if (inRemove)
                      log.warn("FastQueue: Detected other remove");
                  inRemove = true;
                  if (inMutex)
                      log.warn("FastQueue: Detected other mutex in remove");
                  inMutex = true;
              }
  
              element = first;
  
              if (doStats) {
                  if (element != null) {
                      removeCounter++;
                  } else {
                      removeErrorCounter++;
                      log
                              .error("FastQueue: Could not remove, since first 
is null although size is "
                                      + size + " (>0)");
                  }
              }
  
              first = last = null;
              size = 0;
  
              if (checkLock) {
                  if (!inMutex)
                      log.warn("FastQueue: Cancelled by other mutex in remove");
                  inMutex = false;
                  if (!inRemove)
                      log.warn("FastQueue: Cancelled by other remove");
                  inRemove = false;
              }
              if (log.isTraceEnabled()) {
                  log.trace("FastQueue: remove ending with size " + size);
              }
  
              if (timeWait) {
                  time = System.currentTimeMillis();
              }
          } finally {
              lock.unlockRemove();
          }
          if (timeWait) {
              removeWait += (System.currentTimeMillis() - time);
          }
          return element;
      }
  
  }
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/IQueue.java
  
  Index: IQueue.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.util;
  
  /**
   * A queue interface<BR>
   *
   * @author Rainer Jung
   * @author Peter Rossbach
   * @version $Revision: 1.1 $ $Date: 2005/03/14 21:24:30 $
   */
  
  public interface IQueue {
  
      public LinkObject remove();
      public boolean add(String key,Object data);
      public int getMaxQueueLength();
      public void setMaxQueueLength(int length);
      public void start();
      public void stop();
  }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/LinkObject.java
  
  Index: LinkObject.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.util;
  
  /**
   * The class <b>LinkObject</b> implements an element
   * for a linked list, consisting of a general
   * data object and a pointer to the next element.
   *
   * @author Rainer Jung
   * @author Peter Rossbach
   * @version $Revision: 1.1 $ $Date: 2005/03/14 21:24:30 $
  
   */
  
  public class LinkObject {
  
      private Object payload;
      private LinkObject next;
      private String key ;
      
      /**
       * Construct a new element from the data object.
       * Sets the pointer to null.
       * @param Object payload The data object.
       */
      public LinkObject(String key,Object payload) {
          this.payload = payload;
          this.next = null;
          this.key = key ;
      }
  
      /**
       * Set the next element.
       * @param LinkObject next The next element.
       */
      public void append(LinkObject next) {
          this.next = next;
      }
  
      /**
       * Get the next element.
       * @return The next element.
       */
      public LinkObject next() {
          return next;
      }
  
      /**
       * Get the data object from the element.
       * @return The data object from the element.
       */
      public Object data() {
          return payload;
      }
  
      /**
       * Get the unique message id
       * @return the unique message id
       */
      public Object getKey() {
          return key;
      }
  
  }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java
  
  Index: SingleRemoveSynchronizedAddLock.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.util;
  
  /**
   * The class <b>SingleRemoveSynchronizedAddLock</b> implement locking for 
accessing the queue
   * by a single remove thread and multiple add threads.
   *
   * A thread is only allowed to be either the remove or
   * an add thread.
   *
   * The lock can either be owned by the remove thread
   * or by a single add thread.
   *
   * If the remove thread tries to get the lock,
   * but the queue is empty, it will block (poll)
   * until an add threads adds an entry to the queue and
   * releases the lock.
   * 
   * If the remove thread and add threads compete for
   * the lock and an add thread releases the lock, then
   * the remove thread will get the lock first.
   *
   * The remove thread removes all entries in the queue
   * at once and proceeses them without further
   * polling the queue.
   *
   * The lock is not reentrant, in the sense, that all
   * threads must release an owned lock before competing
   * for the lock again!
   *
   * @author Rainer Jung
   * @author Peter Rossbach
   * @version 1.1
   */
   
  public class SingleRemoveSynchronizedAddLock {
      
      public SingleRemoveSynchronizedAddLock() {
      }
      
      public SingleRemoveSynchronizedAddLock(boolean dataAvailable) {
          this.dataAvailable=dataAvailable;
      }
      
      /**
       * Time in milliseconds after which threads
       * waiting for an add lock are woken up.
       * This is used as a safety measure in case
       * thread notification via the unlock methods
       * has a bug.
       */
      private long addWaitTimeout = 10000L;
  
      /**
       * Time in milliseconds after which threads
       * waiting for a remove lock are woken up.
       * This is used as a safety measure in case
       * thread notification via the unlock methods
       * has a bug.
       */
      private long removeWaitTimeout = 30000L;
  
      /**
       * The current remove thread.
       * It is set to the remove thread polling for entries.
       * It is reset to null when the remove thread
       * releases the lock and proceeds processing
       * the removed entries.
       */
      private Thread remover = null;
  
      /**
       * A flag indicating, if an add thread owns the lock.
       */
      private boolean addLocked = false;
  
      /**
       * A flag indicating, if the remove thread owns the lock.
       */
      private boolean removeLocked = false;
  
      /**
       * A flag indicating, if the remove thread is allowed
       * to wait for the lock. The flag is set to false, when aborting.
       */
      private boolean removeEnabled = true;
  
      /**
       * A flag indicating, if the remover needs polling.
       * It indicates, if the locked object has data available
       * to be removed.
       */
      private boolean dataAvailable = false;
  
      /**
       * @return Value of addWaitTimeout
       */
      public synchronized long getAddWaitTimeout() {
          return addWaitTimeout;
      }
  
      /**
       * Set value of addWaitTimeout
       */
      public synchronized void setAddWaitTimeout(long timeout) {
          addWaitTimeout = timeout;
      }
  
      /**
       * @return Value of removeWaitTimeout
       */
      public synchronized long getRemoveWaitTimeout() {
          return removeWaitTimeout;
      }
  
      /**
       * Set value of removeWaitTimeout
       */
      public synchronized void setRemoveWaitTimeout(long timeout) {
          removeWaitTimeout = timeout;
      }
  
      /**
       * Check if the locked object has data available
       * i.e. the remover can stop poling and get the lock.
       * @return True iff the lock Object has data available.
       */
      public synchronized boolean isDataAvailable() {
          return dataAvailable;
      }
  
      /**
       * Check if an add thread owns the lock.
       * @return True iff an add thread owns the lock.
       */
      public synchronized boolean isAddLocked() {
          return addLocked;
      }
  
      /**
       * Check if the remove thread owns the lock.
       * @return True iff the remove thread owns the lock.
       */
      public synchronized boolean isRemoveLocked() {
          return removeLocked;
      }
  
      /**
       * Check if the remove thread is polling.
       * @return True iff the remove thread is polling.
       */
      public synchronized boolean isRemovePolling() {
          if ( remover != null ) {
              return true;
          }
          return false;
      }
  
      /**
       * Acquires the lock by an add thread and sets the add flag.
       * If any add thread or the remove thread already acquired the lock
       * this add thread will block until the lock is released.
       */
      public synchronized void lockAdd() {
          if ( addLocked || removeLocked ) {
              do {
                  try {
                      wait(addWaitTimeout);
                  } catch ( InterruptedException e ) {
                  }
              } while ( addLocked || removeLocked );
          }
          addLocked=true;
      }
  
      /**
       * Acquires the lock by the remove thread and sets the remove flag.
       * If any add thread already acquired the lock or the queue is
       * empty, the remove thread will block until the lock is released
       * and the queue is not empty.
       */
      public synchronized boolean lockRemove() {
          removeLocked=false;
          if ( ( addLocked || ! dataAvailable ) && removeEnabled ) {
              remover=Thread.currentThread();
              do {
                  try {
                      wait(removeWaitTimeout);
                  } catch ( InterruptedException e ) {
                  }
              } while ( ( addLocked || ! dataAvailable ) && removeEnabled );
              remover=null;
          }
          if ( removeEnabled ) {
              removeLocked=true;
          } 
          return removeLocked;
      }
  
      /**
       * Releases the lock by an add thread and reset the remove flag.
       * If the reader thread is polling, notify it.
       */
      public synchronized void unlockAdd(boolean dataAvailable) {
          addLocked=false;
          this.dataAvailable=dataAvailable;
          if ( ( remover != null ) && ( dataAvailable || ! removeEnabled ) ) {
              remover.interrupt();
          } else {
              notifyAll();
          }
      }
  
      /**
       * Releases the lock by the remove thread and reset the add flag.
       * Notify all waiting add threads,
       * that the lock has been released by the remove thread.
       */
      public synchronized void unlockRemove() {
          removeLocked=false;
          dataAvailable=false;
          notifyAll();
      }
  
      /**
       * Abort any polling remover thread
       */
      public synchronized void abortRemove() {
          removeEnabled=false;
          if ( remover != null ) {
              remover.interrupt();
          }
      }
  
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to