pero        2005/04/12 11:56:07

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster
                        ClusterSender.java
               modules/cluster/src/share/org/apache/catalina/cluster/io
                        XByteBuffer.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        DataSender.java FastAsyncSocketSender.java
                        ReplicationTransmitter.java SimpleTcpCluster.java
                        TcpReplicationThread.java mbeans-descriptors.xml
               modules/cluster to-do.txt
  Added:       modules/cluster/test/src/share/org/apache/catalina/cluster/tcp
                        ReplicationTransmitterTest.java
  Log:
  Optimize cluster send message
  add some usefull mbean attributes
  Refactor ReplicationTransmitter
  
  Revision  Changes    Path
  1.7       +2 -2      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java
  
  Index: ClusterSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- ClusterSender.java        10 Apr 2005 16:20:45 -0000      1.6
  +++ ClusterSender.java        12 Apr 2005 18:56:07 -0000      1.7
  @@ -39,9 +39,9 @@
   
       public void backgroundProcess() ;
   
  -    public void sendMessage(String messageId, byte[] indata, Member member) 
throws java.io.IOException;
  +    public void sendMessage(ClusterMessage message, Member member) throws 
java.io.IOException;
   
  -    public void sendMessage(String messageId, byte[] indata) throws 
java.io.IOException;
  +    public void sendMessage(ClusterMessage message) throws 
java.io.IOException;
       
       public boolean isWaitForAck();
       public void setWaitForAck(boolean isWaitForAck);
  
  
  
  1.13      +2 -1      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
  
  Index: XByteBuffer.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- XByteBuffer.java  5 Apr 2005 18:05:52 -0000       1.12
  +++ XByteBuffer.java  12 Apr 2005 18:56:07 -0000      1.13
  @@ -366,6 +366,7 @@
        * @param indata - the message data to be contained within the package
        * @param compress - compress message data or not
        * @return - a full package (header,size,data,footer)
  +     * @deprecated since 5.5.10
        */
       public static byte[] createDataPackage(byte[] indata, boolean compress)
               throws java.io.IOException {
  
  
  
  1.7       +18 -6     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
  
  Index: DataSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- DataSender.java   10 Apr 2005 16:20:46 -0000      1.6
  +++ DataSender.java   12 Apr 2005 18:56:07 -0000      1.7
  @@ -17,10 +17,12 @@
   package org.apache.catalina.cluster.tcp;
   
   import java.io.IOException;
  +import java.io.OutputStream;
   import java.net.InetAddress;
   import java.net.Socket;
   import java.net.SocketException;
   
  +import org.apache.catalina.cluster.io.XByteBuffer;
   import org.apache.catalina.util.StringManager;
   
   /**
  @@ -47,7 +49,7 @@
       /**
        * The descriptive information about this implementation.
        */
  -    private static final String info = "DataSender/1.4";
  +    private static final String info = "DataSender/2.0";
   
       /**
        * receiver address
  @@ -227,6 +229,13 @@
       }
   
       /**
  +     * @return Returns the avg totalBytes/nrOfRequests.
  +     */
  +    public double getAvgMessageSize() {
  +        return ((double)totalBytes) / nrOfRequests;
  +    }
  +
  +    /**
        * @return Returns the avg processingTime/nrOfRequests.
        */
       public double getAvgProcessingTime() {
  @@ -693,7 +702,7 @@
                   openSocket();
           }
           try {
  -            writeData(data);
  +             writeData(data);
           } catch (java.io.IOException x) {
               // second try with fresh connection
               dataResendCounter++;
  @@ -724,8 +733,12 @@
        * @throws IOException
        */
       protected void writeData(byte[] data) throws IOException {
  -        socket.getOutputStream().write(data);
  -        socket.getOutputStream().flush();
  +        OutputStream out = socket.getOutputStream();
  +        out.write(XByteBuffer.START_DATA);
  +        out.write(XByteBuffer.toBytes(data.length));
  +        out.write(data);
  +        out.write(XByteBuffer.END_DATA);
  +        out.flush();
           if (isWaitForAck())
               waitForAck(ackTimeout);
           
  @@ -733,7 +746,6 @@
   
       /**
        * Wait for Acknowledgement from other server
  -     * FIXME Handle SocketTimeoutException - Retry message ?
        * @param timeout
        * @throws java.io.IOException
        * @throws java.net.SocketTimeoutException
  
  
  
  1.5       +51 -28    
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
  
  Index: FastAsyncSocketSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- FastAsyncSocketSender.java        10 Apr 2005 16:54:06 -0000      1.4
  +++ FastAsyncSocketSender.java        12 Apr 2005 18:56:07 -0000      1.5
  @@ -54,7 +54,7 @@
       /**
        * The descriptive information about this implementation.
        */
  -    private static final String info = "FastAsyncSocketSender/1.1";
  +    private static final String info = "FastAsyncSocketSender/2.0";
   
       // ----------------------------------------------------- Instance 
Variables
   
  @@ -109,6 +109,7 @@
   
       }
    
  +
       /**
        * get current add wait timeout 
        * @return current wait timeout
  @@ -386,6 +387,8 @@
            */
           private long queuedNrOfBytes = 0;
   
  +       
  +
           /**
            * Only use inside FastAsyncSocketSender
            * @param sender
  @@ -396,9 +399,12 @@
               this.queue = queue;
               this.sender = sender;
           }
  -
  -        protected long getQueuedNrOfBytes() {
  -            return queuedNrOfBytes ;
  +        
  +        /**
  +         * @return Returns the queuedNrOfBytes.
  +         */
  +        public long getQueuedNrOfBytes() {
  +            return queuedNrOfBytes;
           }
           
           protected synchronized void setQueuedNrOfBytes(long queuedNrOfBytes) 
{
  @@ -417,35 +423,17 @@
           public void stopRunning() {
               keepRunning = false;
           }
  -
  +        
  +        
           /* Get the objects from queue and send all mesages to the sender.
            * @see java.lang.Runnable#run()
            */
           public void run() {
               while (keepRunning) {
  -                // get a link list of all queued objects
  -             if(log.isTraceEnabled())
  -                    log.trace("Queuesize before=" + 
((FastQueue)queue).getSize());
  -                LinkObject entry = queue.remove();
  -             if(log.isTraceEnabled())
  -                log.trace("Queuesize after=" + ((FastQueue)queue).getSize());
  +                long queueSize;
  +                LinkObject entry = getQueuedMessage();
                   if (entry != null) {
  -                    do {
  -                        int messagesize = 0;
  -                        try {
  -                            byte[] data = (byte[]) entry.data();
  -                            messagesize = data.length;
  -                            sender.pushMessage((String) entry.getKey(), 
data);
  -                            outQueueCounter++;
  -                        } catch (Exception x) {
  -                            log.warn(sm.getString(
  -                                    "AsyncSocketSender.send.error", entry
  -                                            .getKey()),x);
  -                        } finally {
  -                            decQueuedNrOfBytes(messagesize);
  -                        }
  -                        entry = entry.next();
  -                    } while (entry != null);
  +                    pushQueuedMessages(entry);
                   } else {
                       if (keepRunning) {
                           
log.warn(sm.getString("AsyncSocketSender.queue.empty",
  @@ -456,6 +444,41 @@
               }
           }
   
  +        /**
  +         * @return
  +         */
  +        protected LinkObject getQueuedMessage() {
  +            // get a link list of all queued objects
  +            if (log.isTraceEnabled())
  +                log.trace("Queuesize before=" + ((FastQueue) 
queue).getSize());
  +            LinkObject entry = queue.remove();
  +            if (log.isTraceEnabled())
  +                log.trace("Queuesize after=" + ((FastQueue) 
queue).getSize());
  +            return entry;
  +        }
  +
  +        /**
  +         * @param entry
  +         */
  +        protected void pushQueuedMessages(LinkObject entry) {
  +            do {
  +                int messagesize = 0;
  +                try {
  +                    byte[] data = (byte[]) entry.data();
  +                    messagesize = data.length;
  +                    sender.pushMessage((String) entry.getKey(), data);
  +                    outQueueCounter++;
  +                } catch (Exception x) {
  +                    log.warn(sm.getString(
  +                            "AsyncSocketSender.send.error", entry
  +                                    .getKey()), x);
  +                } finally {
  +                    decQueuedNrOfBytes(messagesize);
  +                }
  +                entry = entry.next();
  +            } while (entry != null);
  +        }
  +
       }
   
   }
  \ No newline at end of file
  
  
  
  1.25      +154 -38   
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
  
  Index: ReplicationTransmitter.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
  retrieving revision 1.24
  retrieving revision 1.25
  diff -u -r1.24 -r1.25
  --- ReplicationTransmitter.java       10 Apr 2005 16:20:46 -0000      1.24
  +++ ReplicationTransmitter.java       12 Apr 2005 18:56:07 -0000      1.25
  @@ -16,14 +16,18 @@
   
   package org.apache.catalina.cluster.tcp;
   
  +import java.io.ByteArrayOutputStream;
   import java.io.IOException;
  +import java.io.ObjectOutputStream;
   import java.util.HashMap;
   import java.util.Iterator;
   import java.util.Map;
  +import java.util.zip.GZIPOutputStream;
   
   import javax.management.MBeanServer;
   import javax.management.ObjectName;
   
  +import org.apache.catalina.cluster.ClusterMessage;
   import org.apache.catalina.cluster.ClusterSender;
   import org.apache.catalina.cluster.Member;
   import org.apache.catalina.cluster.io.XByteBuffer;
  @@ -49,7 +53,7 @@
       /**
        * The descriptive information about this implementation.
        */
  -    private static final String info = "ReplicationTransmitter/1.3";
  +    private static final String info = "ReplicationTransmitter/2.0";
   
       /**
        * The string manager for this package.
  @@ -109,9 +113,29 @@
       /**
        * Compress message data bytes
        */
  -    private boolean compress = true;
  +    private boolean compress = false;
   
       /**
  +     * doTransmitterProcessingStats
  +     */
  +    protected boolean doTransmitterProcessingStats = false;
  +
  +    /**
  +     * proessingTime
  +     */
  +    protected long processingTime = 0;
  +    
  +    /**
  +     * min proessingTime
  +     */
  +    protected long minProcessingTime = Long.MAX_VALUE ;
  +
  +    /**
  +     * max proessingTime
  +     */
  +    protected long maxProcessingTime = 0;
  +   
  +    /**
        * dynamic sender <code>properties</code>
        */
       private Map properties = new HashMap();
  @@ -187,6 +211,49 @@
       }
   
       /**
  +     * @return Returns the avg processingTime/nrOfRequests.
  +     */
  +    public double getAvgProcessingTime() {
  +        return ((double)processingTime) / nrOfRequests;
  +    }
  + 
  +    /**
  +     * @return Returns the maxProcessingTime.
  +     */
  +    public long getMaxProcessingTime() {
  +        return maxProcessingTime;
  +    }
  +    
  +    /**
  +     * @return Returns the minProcessingTime.
  +     */
  +    public long getMinProcessingTime() {
  +        return minProcessingTime;
  +    }
  +    
  +    /**
  +     * @return Returns the processingTime.
  +     */
  +    public long getProcessingTime() {
  +        return processingTime;
  +    }
  +    
  +    /**
  +     * @return Returns the doTransmitterProcessingStats.
  +     */
  +    public boolean isDoTransmitterProcessingStats() {
  +        return doTransmitterProcessingStats;
  +    }
  +    
  +    /**
  +     * @param doTransmitterProcessingStats The doTransmitterProcessingStats 
to set.
  +     */
  +    public void setDoTransmitterProcessingStats(boolean doProcessingStats) {
  +        this.doTransmitterProcessingStats = doProcessingStats;
  +    }
  + 
  +
  +    /**
        * Transmitter ObjectName
        * 
        * @param name
  @@ -344,42 +411,59 @@
       }
   
       // ------------------------------------------------------------- public
  -
  +    
       /**
        * Send data to one member
  -     * 
  -     * @see 
org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
  -     *      byte[], org.apache.catalina.cluster.Member)
  +     * FIXME set filtering messages
  +     * @see 
org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage,
 org.apache.catalina.cluster.Member)
        */
  -    public void sendMessage(String sessionId, byte[] indata, Member member)
  -            throws java.io.IOException {
  -        byte[] data = convertSenderData(indata);
  -        String key = getKey(member);
  -        IDataSender sender = (IDataSender) map.get(key);
  -        sendMessageData(sessionId, data, sender);
  +    public void sendMessage(ClusterMessage message, Member member)
  +            throws java.io.IOException {       
  +        long time = 0 ;
  +        if(doTransmitterProcessingStats) {
  +            time = System.currentTimeMillis();
  +        }
  +        try {
  +            byte[] data = createMessageData(message);
  +            String key = getKey(member);
  +            IDataSender sender = (IDataSender) map.get(key);
  +            sendMessageData(message.getUniqueId(), data, sender);
  +        } finally {
  +            if (doTransmitterProcessingStats) {
  +                addProcessingStats(time);
  +            }
  +        }
       }
   
       /**
        * send message to all senders (broadcast)
  -     * 
  -     * @see 
org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
  -     *      byte[])
  +     * @see 
org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage)
        */
  -    public void sendMessage(String sessionId, byte[] indata)
  +    public void sendMessage(ClusterMessage message)
               throws java.io.IOException {
  -        IDataSender[] senders = getSenders();
  -        byte[] data = convertSenderData(indata);
  -        for (int i = 0; i < senders.length; i++) {
  -
  -            IDataSender sender = senders[i];
  -            try {
  -                sendMessageData(sessionId, data, sender);
  -            } catch (Exception x) {
  -
  -                if (!sender.getSuspect())
  -                    log.warn("Unable to send replicated message to " + sender
  -                            + ", is server down?", x);
  -                sender.setSuspect(true);
  +        long time = 0;
  +        if (doTransmitterProcessingStats) {
  +            time = System.currentTimeMillis();
  +        }
  +        try {
  +            byte[] data = createMessageData(message);
  +            IDataSender[] senders = getSenders();
  +            for (int i = 0; i < senders.length; i++) {
  +
  +                IDataSender sender = senders[i];
  +                try {
  +                    sendMessageData(message.getUniqueId(), data, sender);
  +                } catch (Exception x) {
  +                    if (!sender.getSuspect()) {
  +                        log.warn("Unable to send replicated message to "
  +                                + sender + ", is server down?", x);
  +                        sender.setSuspect(true);
  +                    }
  +                }
  +            }
  +        } finally {
  +            if (doTransmitterProcessingStats) {
  +                addProcessingStats(time);
               }
           }
       }
  @@ -518,6 +602,9 @@
           nrOfRequests = 0;
           totalBytes = 0;
           failureCounter = 0;
  +        processingTime = 0;
  +        minProcessingTime = Long.MAX_VALUE;
  +        maxProcessingTime = 0;
       }
   
       /*
  @@ -640,6 +727,7 @@
           }
       }
   
  +    
       /**
        * build sender ObjectName (
        * 
engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port"
 )
  @@ -664,17 +752,32 @@
       }
   
       /**
  -     * compress data
  -     * 
  +     * Send Message create Timestamp and generate message bytes form msg
        * @see XByteBuffer#createDataPackage(byte[])
  -     * @param indata
  -     * @return
  +     * @param msg cluster message
  +     * @return cluster message as byte array
        * @throws IOException
  -     *             FIXME get CompressMessageDate from cluster instanz
        */
  -    protected byte[] convertSenderData(byte[] data) throws IOException {
  -        return XByteBuffer.createDataPackage(data, isCompress());
  +    protected byte[] createMessageData(ClusterMessage msg) throws 
IOException {
  +        msg.setTimestamp(System.currentTimeMillis());
  +        ByteArrayOutputStream outs = new ByteArrayOutputStream();
  +        ObjectOutputStream out;
  +        GZIPOutputStream gout = null;
  +        if (isCompress()) {
  +            gout = new GZIPOutputStream(outs);
  +            out = new ObjectOutputStream(gout);
  +        } else {
  +            out = new ObjectOutputStream(outs);
  +        }
  +        out.writeObject(msg);
  +        // flush out the gzip stream to byte buffer
  +        if(gout != null) {
  +            gout.flush();
  +            gout.close();
  +        }
  +        return outs.toByteArray();
       }
  + 
   
       /**
        * Send message to concrete sender. If autoConnect is true, check is
  @@ -723,5 +826,18 @@
           }
   
       }
  -
  +    /**
  +     * Add processing stats times
  +     * @param startTime
  +     */
  +    protected void addProcessingStats(long startTime) {
  +        long time = System.currentTimeMillis() - startTime ;
  +        if(time < minProcessingTime)
  +            minProcessingTime = time ;
  +        if( time > maxProcessingTime)
  +            maxProcessingTime = time ;
  +        processingTime += time ;
  +    }
  + 
  + 
   }
  \ No newline at end of file
  
  
  
  1.62      +20 -51    
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.61
  retrieving revision 1.62
  diff -u -r1.61 -r1.62
  --- SimpleTcpCluster.java     10 Apr 2005 16:20:46 -0000      1.61
  +++ SimpleTcpCluster.java     12 Apr 2005 18:56:07 -0000      1.62
  @@ -91,7 +91,7 @@
       /**
        * Descriptive information about this component implementation.
        */
  -    protected static final String info = "SimpleTcpCluster/1.2";
  +    protected static final String info = "SimpleTcpCluster/2.0";
   
       public static final String BEFORE_MEMBERREGISTER_EVENT = 
"before_member_register";
   
  @@ -598,8 +598,16 @@
       }
   
       /**
  -     * send a cluster message to one member
  +     * send message to all cluster members
        * 
  +     * @see 
org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage)
  +     */
  +    public void send(ClusterMessage msg) {
  +        send(msg, null);
  +    }
  +
  +    /**
  +     * send a cluster message to one member
        * @param msg message to transfer
        * @param dest Receiver member
        * @see 
org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage,
  @@ -608,63 +616,24 @@
       public void send(ClusterMessage msg, Member dest) {
           try {
               msg.setAddress(membershipService.getLocalMember());
  -            Member destination = dest;
  -
  -            if (msg instanceof SessionMessage) {
  -                SessionMessage smsg = (SessionMessage) msg;
  -                //if we request session state, send to the oldest of members
  -                if ((destination == null)
  -                        && (smsg.getEventType() == 
SessionMessage.EVT_GET_ALL_SESSIONS)
  -                        && (membershipService.getMembers().length > 0)) {
  -                    destination = membershipService.getMembers()[0];
  -                }
  -            }
  -            byte[] data = createMessageData(msg);
  -            if (destination != null) {
  -                Member tcpdest = dest;
  -                if ((tcpdest != null)
  -                        && 
(!membershipService.getLocalMember().equals(tcpdest))) {
  -                    clusterSender.sendMessage(msg.getUniqueId(), data, 
tcpdest);
  -                }
  +            if (dest != null) {
  +                if (!membershipService.getLocalMember().equals(dest)) {
  +                    clusterSender.sendMessage(msg, dest);
  +                } else
  +                    log.error("Unable to send message to local member " + 
msg);
               } else {
  -                clusterSender.sendMessage(msg.getUniqueId(), data);
  +                clusterSender.sendMessage(msg);
               }
           } catch (Exception x) {
  -            if(notifyLifecycleListenerOnFailure) {
  +            if (notifyLifecycleListenerOnFailure) {
                   // Notify our interested LifecycleListeners
  -                lifecycle.fireLifecycleEvent(SEND_MESSAGE_FAILURE_EVENT, 
  -                 new SendMessageData(msg,dest,x));                
  +                lifecycle.fireLifecycleEvent(SEND_MESSAGE_FAILURE_EVENT,
  +                        new SendMessageData(msg, dest, x));
               }
               log.error("Unable to send message through cluster sender.", x);
           }
       }
   
  -    /**
  -     * Send Message create Timestamp and generate message bytes form msg
  -     * @param msg cluster message
  -     * @return cluster message as byte array
  -     * @throws IOException
  -     */
  -    protected byte[] createMessageData(ClusterMessage msg) throws 
IOException {
  -        msg.setTimestamp(System.currentTimeMillis());
  -        java.io.ByteArrayOutputStream outs = new 
java.io.ByteArrayOutputStream();
  -        java.io.ObjectOutputStream out = new java.io.ObjectOutputStream(
  -                outs);
  -        out.writeObject(msg);
  -        byte[] data = outs.toByteArray();
  -        return data;
  -    }
  -
  -    /**
  -     * send message to all cluster members
  -     * 
  -     * @see 
org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage)
  -     */
  -    public void send(ClusterMessage msg) {
  -        send(msg, null);
  -    }
  -
  -
       /* New cluster member is registered
        * FIXME notify someone (JMX(Listener)
        * @see 
org.apache.catalina.cluster.MembershipListener#memberAdded(org.apache.catalina.cluster.Member)
  
  
  
  1.17      +3 -3      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -r1.16 -r1.17
  --- TcpReplicationThread.java 10 Apr 2005 16:20:46 -0000      1.16
  +++ TcpReplicationThread.java 12 Apr 2005 18:56:07 -0000      1.17
  @@ -113,7 +113,7 @@
        * re-enables OP_READ and calls wakeup() on the selector
        * so the selector will resume watching this channel.
        */
  -    private void drainChannel (SelectionKey key)
  +    protected void drainChannel (SelectionKey key)
           throws Exception
       {
           boolean packetReceived=false;
  @@ -162,7 +162,7 @@
        * @param key
        * @param channel
        */
  -    private void sendAck(SelectionKey key, SocketChannel channel) {
  +    protected void sendAck(SelectionKey key, SocketChannel channel) {
           
           try {
               channel.write(ByteBuffer.wrap(ACK_COMMAND));
  
  
  
  1.8       +45 -6     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
  
  Index: mbeans-descriptors.xml
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- mbeans-descriptors.xml    10 Apr 2005 16:20:46 -0000      1.7
  +++ mbeans-descriptors.xml    12 Apr 2005 18:56:07 -0000      1.8
  @@ -9,7 +9,10 @@
                  domain="Catalina"
                   group="Cluster"
                    type="org.apache.catalina.cluster.tcp.SimpleTcpCluster">
  -
  +    <attribute   name="info"
  +          description="Class version info"
  +                 type="java.lang.String"
  +                 writeable="false"/>
       <attribute   name="notifyListenersOnReplication"
             description="notify session attribute listener at backups"
                    type="boolean"/>
  @@ -71,7 +74,10 @@
                  domain="Catalina"
                   group="ClusterSender"
                    
type="org.apache.catalina.cluster.tcp.ReplicationTransmitter">
  -
  +    <attribute   name="info"
  +          description="Class version info"
  +                 type="java.lang.String"
  +                 writeable="false"/>
       <attribute   name="replicationMode"
             description="replication mode (synchnous,pooled.asynchnous)"
                    type="java.lang.String"/>
  @@ -87,6 +93,26 @@
                             is="true"
                    type="boolean"
                    writeable="false" />
  +    <attribute   name="processingTime"
  +          description="sending processing time"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="minProcessingTime"
  +          description="minimal sending processing time"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="avgProcessingTime"
  +          description="processing time / nrOfRequests"
  +                 type="double"
  +                 writeable="false"/>
  +    <attribute   name="maxProcessingTime"
  +          description="maximal sending processing time"
  +                 type="long"
  +                 writeable="false"/>
  +    <attribute   name="doTransmitterProcessingStats"
  +          description="create processing time stats"
  +                          is="true"
  +                 type="boolean" />                
       <attribute   name="nrOfRequests"
             description="number of send messages to other members"
                    type="long"
  @@ -152,6 +178,10 @@
                    writeable="false"/>
       <attribute   name="ackTimeout"
             description="acknowledge timeout"
  +                 type="long"/>                 
  +    <attribute   name="avgMessageSize"
  +                 writeable="false"
  +          description="avg message size (totalbytes/nrOfRequests"
                    type="long"/>
       <attribute   name="queueSize"
                    writeable="false"
  @@ -318,6 +348,10 @@
       <attribute   name="ackTimeout"
             description="acknowledge timeout"
                    type="long"/>
  +    <attribute   name="avgMessageSize"
  +                 writeable="false"
  +          description="avg message size (totalbytes/nrOfRequests"
  +                 type="long" />
       <attribute   name="queueSize"
                    writeable="false"
             description="queue size"
  @@ -458,7 +492,7 @@
             description="queue remove wait time (queue thread waits)"
                    type="long"
                    writeable="false"/>
  -     <operation name="connect"
  +     <operation name="connect"
                  description="connect to other replication node"
                  impact="ACTION"
                  returnType="void">
  @@ -519,6 +553,10 @@
             description="socket connected"
                    type="boolean"
                    writeable="false"/>
  +    <attribute   name="avgMessageSize"
  +                 writeable="false"
  +          description="avg message size (totalbytes/nrOfRequests"
  +                 type="long"/>
       <attribute   name="nrOfRequests"
             description="number of send messages to other members"
                    type="long"
  @@ -577,9 +615,6 @@
                             is="true"
                    type="boolean"
                    writeable="false" />
  -    <attribute   name="maxPoolSocketLimit"
  -          description="Max parallel sockets"
  -                 type="int"/>
       <attribute   name="keepAliveTimeout"
             description="active socket keep alive timeout"
                    type="long"/>
  @@ -599,6 +634,10 @@
             description="socket connected"
                    type="boolean"
                    writeable="false"/>
  +    <attribute   name="avgMessageSize"
  +                 writeable="false"
  +          description="avg message size (totalbytes/nrOfRequests"
  +                 type="long"/>
       <attribute   name="nrOfRequests"
             description="number of send messages to other members"
                    type="long"
  
  
  
  1.11      +34 -8     jakarta-tomcat-catalina/modules/cluster/to-do.txt
  
  Index: to-do.txt
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/to-do.txt,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- to-do.txt 10 Apr 2005 16:20:46 -0000      1.10
  +++ to-do.txt 12 Apr 2005 18:56:07 -0000      1.11
  @@ -1,5 +1,16 @@
  -Next Steps:
  +==============================
  +Next actions
  +==============================
   
  +- reduce cpu and memory consume (Receiver)
  +    - set new compress sender flag at default=false ( < CPU usage)
  +    - Make compact algo 
  +        currently message receive data is split at 
XbyteBuffer#extractPackage and
  +        SimpleTcpCluster#messageDataReceived     
  +- when a lot of messages expire it comes to burst of messages 
  +     - all 60 Sec when ManagerBase#processExpires is called!
  +     - Better is to transfer a spezial Epxire Message with an array of 
expired messages.
  +     - This reduce message transfer and reduce waits for acks.     
   - Documentation
        wrote a complete new how-to
        add example configurations
  @@ -18,12 +29,7 @@
        detect long wait acks     
   - Implement fragmentation of large replication objects
        Compress at message level 
  -     s. FarmDeployer war handling
  -- reduce memory consume
  -    - set new compress sender flag at default=false
  -    - don't copy the buffer to add message header
  -        transfer this from SimpleTcpCluster to DataSender pushMessage
  -    - make it possible that a subclass cryp the transferd messages
  +     Splitting Messages ala FarmDeployer war handling
   - add a message type to the message header.
       - filtering at receiver that drop message before build Object
   - add test cluster project
  @@ -31,8 +37,11 @@
       automated regression testing with some standard configs
   - add support to dynamic property transfer from SimpleTcpCluster to the 
Manager
       like ReplicationTransmitter
  -                             
  +- better restart szenario after failure.
  +        
  +==============================
   Nice to have:
  +==============================
   - Implement a NonSerializable interface for session attributes that do not
   wish to be replicated
           Then we must have ClusterNonSerializable at common classloader
  @@ -71,8 +80,24 @@
                Fixed!          
        Last FileMessage fragment need longe ackTimeout 
        <Cluster ..> <Sender ... ackTimeout="60000"/> </Cluster>        
  +- ReplicationListener and SocketReplicationListener only accept data from 
cluster member (low level ip restriction)
  +- Change Message protocol (risk)
  +   Currently 6 byte header, data.length 4 bytes , data, 6 byte end header
  +   Optimized to 2 type header, data.length 4 bytes, data
  +   change at DataSender.writeData and XByteBuffer
   
  +==============================   
   COMPLETED
  +==============================
  +5.5.10
  +- reduce memory and cpu consume (send message)
  +    - set new compress sender flag at default=false ( < CPU usage)
  +    - don't copy the buffer to add message header
  +        transfer this from SimpleTcpCluster to DataSender pushMessage
  +        successfull refactored
  +    - make it possible that a subclass crypt the transfered messages
  +       sub class ReplicationTransmitter and override createMessageData
  +    - don't copy START and END Header for every message, instead send 
dirctly and DataSender.writeData.   
   - Add a flag for replicated  attribute events, to enable or disable them 
        Now can configued with notifyListenersOnReplication=false at 
SimpleTCPCluster
        Also can drop HttpSessionLsitener events
  @@ -87,6 +112,7 @@
   - Add new SocketReplicationListener
   - Add Stats to DeltaManager
   - Add single sign on support
  +5.5.9
   - Add Keep Alive and WaitForAck at async mode implementation.
          Make this feature configurable to Sender element at server.xml
          Is include with 5.5.8
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/test/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitterTest.java
  
  Index: ReplicationTransmitterTest.java
  ===================================================================
  /*
   * Copyright 1999,2005 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License"); you may not
   * use this file except in compliance with the License. You may obtain a copy 
of
   * the License at
   * 
   * http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   * License for the specific language governing permissions and limitations 
under
   * the License.
   */
  package org.apache.catalina.cluster.tcp;
  
  import java.io.ByteArrayInputStream;
  import java.io.IOException;
  import java.util.zip.GZIPInputStream;
  
  import junit.framework.TestCase;
  
  import org.apache.catalina.cluster.session.ReplicationStream;
  import org.apache.catalina.cluster.session.SessionMessageImpl;
  
  /**
   * @author Peter Rossbach
   * 
   * @version $Revision: 1.1 $ $Date: 2005/04/12 18:56:07 $
   */
  public class ReplicationTransmitterTest extends TestCase {
  
      public void testCreateMessageData() throws Exception {
          ReplicationTransmitter transmitter = new ReplicationTransmitter();
          transmitter.setCompress(true);
          SessionMessageImpl message= new SessionMessageImpl();
          message.setUniqueId("test");
          byte [] data = transmitter.createMessageData(message);
          assertTrue(200 < data.length);
          Object myobj = getGZPObject(data);
          assertTrue(myobj instanceof SessionMessageImpl);
          assertEquals("test", ((SessionMessageImpl)myobj).getUniqueId());
          
      }
  
      /**
       * @param data
       * @return
       * @throws IOException
       * @throws ClassNotFoundException
       */
      private Object getGZPObject(byte[] data) throws IOException, 
ClassNotFoundException {
          ByteArrayInputStream bin = 
              new ByteArrayInputStream(data);
          GZIPInputStream gin = 
              new GZIPInputStream(bin);
          byte[] tmp = new byte[1024];
          int length = gin.read(tmp);
          byte[] result = new byte[0];
          while (length > 0) {
              byte[] tmpdata = result;
              result = new byte[result.length + length];
              System.arraycopy(tmpdata, 0, result, 0, tmpdata.length);
              System.arraycopy(tmp, 0, result, tmpdata.length, length);
              length = gin.read(tmp);
          }
          gin.close();
          ReplicationStream stream = new ReplicationStream(
                  new java.io.ByteArrayInputStream(result), getClass()
                          .getClassLoader());
          Object myobj = stream.readObject();
          return myobj;
      }
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to