pero 2005/04/12 11:56:07 Modified: modules/cluster/src/share/org/apache/catalina/cluster ClusterSender.java modules/cluster/src/share/org/apache/catalina/cluster/io XByteBuffer.java modules/cluster/src/share/org/apache/catalina/cluster/tcp DataSender.java FastAsyncSocketSender.java ReplicationTransmitter.java SimpleTcpCluster.java TcpReplicationThread.java mbeans-descriptors.xml modules/cluster to-do.txt Added: modules/cluster/test/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitterTest.java Log: Optimize cluster send message add some usefull mbean attributes Refactor ReplicationTransmitter Revision Changes Path 1.7 +2 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java Index: ClusterSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- ClusterSender.java 10 Apr 2005 16:20:45 -0000 1.6 +++ ClusterSender.java 12 Apr 2005 18:56:07 -0000 1.7 @@ -39,9 +39,9 @@ public void backgroundProcess() ; - public void sendMessage(String messageId, byte[] indata, Member member) throws java.io.IOException; + public void sendMessage(ClusterMessage message, Member member) throws java.io.IOException; - public void sendMessage(String messageId, byte[] indata) throws java.io.IOException; + public void sendMessage(ClusterMessage message) throws java.io.IOException; public boolean isWaitForAck(); public void setWaitForAck(boolean isWaitForAck); 1.13 +2 -1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java Index: XByteBuffer.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v retrieving revision 1.12 retrieving revision 1.13 diff -u -r1.12 -r1.13 --- XByteBuffer.java 5 Apr 2005 18:05:52 -0000 1.12 +++ XByteBuffer.java 12 Apr 2005 18:56:07 -0000 1.13 @@ -366,6 +366,7 @@ * @param indata - the message data to be contained within the package * @param compress - compress message data or not * @return - a full package (header,size,data,footer) + * @deprecated since 5.5.10 */ public static byte[] createDataPackage(byte[] indata, boolean compress) throws java.io.IOException { 1.7 +18 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java Index: DataSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- DataSender.java 10 Apr 2005 16:20:46 -0000 1.6 +++ DataSender.java 12 Apr 2005 18:56:07 -0000 1.7 @@ -17,10 +17,12 @@ package org.apache.catalina.cluster.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import org.apache.catalina.cluster.io.XByteBuffer; import org.apache.catalina.util.StringManager; /** @@ -47,7 +49,7 @@ /** * The descriptive information about this implementation. */ - private static final String info = "DataSender/1.4"; + private static final String info = "DataSender/2.0"; /** * receiver address @@ -227,6 +229,13 @@ } /** + * @return Returns the avg totalBytes/nrOfRequests. + */ + public double getAvgMessageSize() { + return ((double)totalBytes) / nrOfRequests; + } + + /** * @return Returns the avg processingTime/nrOfRequests. */ public double getAvgProcessingTime() { @@ -693,7 +702,7 @@ openSocket(); } try { - writeData(data); + writeData(data); } catch (java.io.IOException x) { // second try with fresh connection dataResendCounter++; @@ -724,8 +733,12 @@ * @throws IOException */ protected void writeData(byte[] data) throws IOException { - socket.getOutputStream().write(data); - socket.getOutputStream().flush(); + OutputStream out = socket.getOutputStream(); + out.write(XByteBuffer.START_DATA); + out.write(XByteBuffer.toBytes(data.length)); + out.write(data); + out.write(XByteBuffer.END_DATA); + out.flush(); if (isWaitForAck()) waitForAck(ackTimeout); @@ -733,7 +746,6 @@ /** * Wait for Acknowledgement from other server - * FIXME Handle SocketTimeoutException - Retry message ? * @param timeout * @throws java.io.IOException * @throws java.net.SocketTimeoutException 1.5 +51 -28 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java Index: FastAsyncSocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- FastAsyncSocketSender.java 10 Apr 2005 16:54:06 -0000 1.4 +++ FastAsyncSocketSender.java 12 Apr 2005 18:56:07 -0000 1.5 @@ -54,7 +54,7 @@ /** * The descriptive information about this implementation. */ - private static final String info = "FastAsyncSocketSender/1.1"; + private static final String info = "FastAsyncSocketSender/2.0"; // ----------------------------------------------------- Instance Variables @@ -109,6 +109,7 @@ } + /** * get current add wait timeout * @return current wait timeout @@ -386,6 +387,8 @@ */ private long queuedNrOfBytes = 0; + + /** * Only use inside FastAsyncSocketSender * @param sender @@ -396,9 +399,12 @@ this.queue = queue; this.sender = sender; } - - protected long getQueuedNrOfBytes() { - return queuedNrOfBytes ; + + /** + * @return Returns the queuedNrOfBytes. + */ + public long getQueuedNrOfBytes() { + return queuedNrOfBytes; } protected synchronized void setQueuedNrOfBytes(long queuedNrOfBytes) { @@ -417,35 +423,17 @@ public void stopRunning() { keepRunning = false; } - + + /* Get the objects from queue and send all mesages to the sender. * @see java.lang.Runnable#run() */ public void run() { while (keepRunning) { - // get a link list of all queued objects - if(log.isTraceEnabled()) - log.trace("Queuesize before=" + ((FastQueue)queue).getSize()); - LinkObject entry = queue.remove(); - if(log.isTraceEnabled()) - log.trace("Queuesize after=" + ((FastQueue)queue).getSize()); + long queueSize; + LinkObject entry = getQueuedMessage(); if (entry != null) { - do { - int messagesize = 0; - try { - byte[] data = (byte[]) entry.data(); - messagesize = data.length; - sender.pushMessage((String) entry.getKey(), data); - outQueueCounter++; - } catch (Exception x) { - log.warn(sm.getString( - "AsyncSocketSender.send.error", entry - .getKey()),x); - } finally { - decQueuedNrOfBytes(messagesize); - } - entry = entry.next(); - } while (entry != null); + pushQueuedMessages(entry); } else { if (keepRunning) { log.warn(sm.getString("AsyncSocketSender.queue.empty", @@ -456,6 +444,41 @@ } } + /** + * @return + */ + protected LinkObject getQueuedMessage() { + // get a link list of all queued objects + if (log.isTraceEnabled()) + log.trace("Queuesize before=" + ((FastQueue) queue).getSize()); + LinkObject entry = queue.remove(); + if (log.isTraceEnabled()) + log.trace("Queuesize after=" + ((FastQueue) queue).getSize()); + return entry; + } + + /** + * @param entry + */ + protected void pushQueuedMessages(LinkObject entry) { + do { + int messagesize = 0; + try { + byte[] data = (byte[]) entry.data(); + messagesize = data.length; + sender.pushMessage((String) entry.getKey(), data); + outQueueCounter++; + } catch (Exception x) { + log.warn(sm.getString( + "AsyncSocketSender.send.error", entry + .getKey()), x); + } finally { + decQueuedNrOfBytes(messagesize); + } + entry = entry.next(); + } while (entry != null); + } + } } \ No newline at end of file 1.25 +154 -38 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Index: ReplicationTransmitter.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v retrieving revision 1.24 retrieving revision 1.25 diff -u -r1.24 -r1.25 --- ReplicationTransmitter.java 10 Apr 2005 16:20:46 -0000 1.24 +++ ReplicationTransmitter.java 12 Apr 2005 18:56:07 -0000 1.25 @@ -16,14 +16,18 @@ package org.apache.catalina.cluster.tcp; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.zip.GZIPOutputStream; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.ClusterSender; import org.apache.catalina.cluster.Member; import org.apache.catalina.cluster.io.XByteBuffer; @@ -49,7 +53,7 @@ /** * The descriptive information about this implementation. */ - private static final String info = "ReplicationTransmitter/1.3"; + private static final String info = "ReplicationTransmitter/2.0"; /** * The string manager for this package. @@ -109,9 +113,29 @@ /** * Compress message data bytes */ - private boolean compress = true; + private boolean compress = false; /** + * doTransmitterProcessingStats + */ + protected boolean doTransmitterProcessingStats = false; + + /** + * proessingTime + */ + protected long processingTime = 0; + + /** + * min proessingTime + */ + protected long minProcessingTime = Long.MAX_VALUE ; + + /** + * max proessingTime + */ + protected long maxProcessingTime = 0; + + /** * dynamic sender <code>properties</code> */ private Map properties = new HashMap(); @@ -187,6 +211,49 @@ } /** + * @return Returns the avg processingTime/nrOfRequests. + */ + public double getAvgProcessingTime() { + return ((double)processingTime) / nrOfRequests; + } + + /** + * @return Returns the maxProcessingTime. + */ + public long getMaxProcessingTime() { + return maxProcessingTime; + } + + /** + * @return Returns the minProcessingTime. + */ + public long getMinProcessingTime() { + return minProcessingTime; + } + + /** + * @return Returns the processingTime. + */ + public long getProcessingTime() { + return processingTime; + } + + /** + * @return Returns the doTransmitterProcessingStats. + */ + public boolean isDoTransmitterProcessingStats() { + return doTransmitterProcessingStats; + } + + /** + * @param doTransmitterProcessingStats The doTransmitterProcessingStats to set. + */ + public void setDoTransmitterProcessingStats(boolean doProcessingStats) { + this.doTransmitterProcessingStats = doProcessingStats; + } + + + /** * Transmitter ObjectName * * @param name @@ -344,42 +411,59 @@ } // ------------------------------------------------------------- public - + /** * Send data to one member - * - * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String, - * byte[], org.apache.catalina.cluster.Member) + * FIXME set filtering messages + * @see org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage, org.apache.catalina.cluster.Member) */ - public void sendMessage(String sessionId, byte[] indata, Member member) - throws java.io.IOException { - byte[] data = convertSenderData(indata); - String key = getKey(member); - IDataSender sender = (IDataSender) map.get(key); - sendMessageData(sessionId, data, sender); + public void sendMessage(ClusterMessage message, Member member) + throws java.io.IOException { + long time = 0 ; + if(doTransmitterProcessingStats) { + time = System.currentTimeMillis(); + } + try { + byte[] data = createMessageData(message); + String key = getKey(member); + IDataSender sender = (IDataSender) map.get(key); + sendMessageData(message.getUniqueId(), data, sender); + } finally { + if (doTransmitterProcessingStats) { + addProcessingStats(time); + } + } } /** * send message to all senders (broadcast) - * - * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String, - * byte[]) + * @see org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage) */ - public void sendMessage(String sessionId, byte[] indata) + public void sendMessage(ClusterMessage message) throws java.io.IOException { - IDataSender[] senders = getSenders(); - byte[] data = convertSenderData(indata); - for (int i = 0; i < senders.length; i++) { - - IDataSender sender = senders[i]; - try { - sendMessageData(sessionId, data, sender); - } catch (Exception x) { - - if (!sender.getSuspect()) - log.warn("Unable to send replicated message to " + sender - + ", is server down?", x); - sender.setSuspect(true); + long time = 0; + if (doTransmitterProcessingStats) { + time = System.currentTimeMillis(); + } + try { + byte[] data = createMessageData(message); + IDataSender[] senders = getSenders(); + for (int i = 0; i < senders.length; i++) { + + IDataSender sender = senders[i]; + try { + sendMessageData(message.getUniqueId(), data, sender); + } catch (Exception x) { + if (!sender.getSuspect()) { + log.warn("Unable to send replicated message to " + + sender + ", is server down?", x); + sender.setSuspect(true); + } + } + } + } finally { + if (doTransmitterProcessingStats) { + addProcessingStats(time); } } } @@ -518,6 +602,9 @@ nrOfRequests = 0; totalBytes = 0; failureCounter = 0; + processingTime = 0; + minProcessingTime = Long.MAX_VALUE; + maxProcessingTime = 0; } /* @@ -640,6 +727,7 @@ } } + /** * build sender ObjectName ( * engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" ) @@ -664,17 +752,32 @@ } /** - * compress data - * + * Send Message create Timestamp and generate message bytes form msg * @see XByteBuffer#createDataPackage(byte[]) - * @param indata - * @return + * @param msg cluster message + * @return cluster message as byte array * @throws IOException - * FIXME get CompressMessageDate from cluster instanz */ - protected byte[] convertSenderData(byte[] data) throws IOException { - return XByteBuffer.createDataPackage(data, isCompress()); + protected byte[] createMessageData(ClusterMessage msg) throws IOException { + msg.setTimestamp(System.currentTimeMillis()); + ByteArrayOutputStream outs = new ByteArrayOutputStream(); + ObjectOutputStream out; + GZIPOutputStream gout = null; + if (isCompress()) { + gout = new GZIPOutputStream(outs); + out = new ObjectOutputStream(gout); + } else { + out = new ObjectOutputStream(outs); + } + out.writeObject(msg); + // flush out the gzip stream to byte buffer + if(gout != null) { + gout.flush(); + gout.close(); + } + return outs.toByteArray(); } + /** * Send message to concrete sender. If autoConnect is true, check is @@ -723,5 +826,18 @@ } } - + /** + * Add processing stats times + * @param startTime + */ + protected void addProcessingStats(long startTime) { + long time = System.currentTimeMillis() - startTime ; + if(time < minProcessingTime) + minProcessingTime = time ; + if( time > maxProcessingTime) + maxProcessingTime = time ; + processingTime += time ; + } + + } \ No newline at end of file 1.62 +20 -51 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Index: SimpleTcpCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v retrieving revision 1.61 retrieving revision 1.62 diff -u -r1.61 -r1.62 --- SimpleTcpCluster.java 10 Apr 2005 16:20:46 -0000 1.61 +++ SimpleTcpCluster.java 12 Apr 2005 18:56:07 -0000 1.62 @@ -91,7 +91,7 @@ /** * Descriptive information about this component implementation. */ - protected static final String info = "SimpleTcpCluster/1.2"; + protected static final String info = "SimpleTcpCluster/2.0"; public static final String BEFORE_MEMBERREGISTER_EVENT = "before_member_register"; @@ -598,8 +598,16 @@ } /** - * send a cluster message to one member + * send message to all cluster members * + * @see org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage) + */ + public void send(ClusterMessage msg) { + send(msg, null); + } + + /** + * send a cluster message to one member * @param msg message to transfer * @param dest Receiver member * @see org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage, @@ -608,63 +616,24 @@ public void send(ClusterMessage msg, Member dest) { try { msg.setAddress(membershipService.getLocalMember()); - Member destination = dest; - - if (msg instanceof SessionMessage) { - SessionMessage smsg = (SessionMessage) msg; - //if we request session state, send to the oldest of members - if ((destination == null) - && (smsg.getEventType() == SessionMessage.EVT_GET_ALL_SESSIONS) - && (membershipService.getMembers().length > 0)) { - destination = membershipService.getMembers()[0]; - } - } - byte[] data = createMessageData(msg); - if (destination != null) { - Member tcpdest = dest; - if ((tcpdest != null) - && (!membershipService.getLocalMember().equals(tcpdest))) { - clusterSender.sendMessage(msg.getUniqueId(), data, tcpdest); - } + if (dest != null) { + if (!membershipService.getLocalMember().equals(dest)) { + clusterSender.sendMessage(msg, dest); + } else + log.error("Unable to send message to local member " + msg); } else { - clusterSender.sendMessage(msg.getUniqueId(), data); + clusterSender.sendMessage(msg); } } catch (Exception x) { - if(notifyLifecycleListenerOnFailure) { + if (notifyLifecycleListenerOnFailure) { // Notify our interested LifecycleListeners - lifecycle.fireLifecycleEvent(SEND_MESSAGE_FAILURE_EVENT, - new SendMessageData(msg,dest,x)); + lifecycle.fireLifecycleEvent(SEND_MESSAGE_FAILURE_EVENT, + new SendMessageData(msg, dest, x)); } log.error("Unable to send message through cluster sender.", x); } } - /** - * Send Message create Timestamp and generate message bytes form msg - * @param msg cluster message - * @return cluster message as byte array - * @throws IOException - */ - protected byte[] createMessageData(ClusterMessage msg) throws IOException { - msg.setTimestamp(System.currentTimeMillis()); - java.io.ByteArrayOutputStream outs = new java.io.ByteArrayOutputStream(); - java.io.ObjectOutputStream out = new java.io.ObjectOutputStream( - outs); - out.writeObject(msg); - byte[] data = outs.toByteArray(); - return data; - } - - /** - * send message to all cluster members - * - * @see org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage) - */ - public void send(ClusterMessage msg) { - send(msg, null); - } - - /* New cluster member is registered * FIXME notify someone (JMX(Listener) * @see org.apache.catalina.cluster.MembershipListener#memberAdded(org.apache.catalina.cluster.Member) 1.17 +3 -3 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java Index: TcpReplicationThread.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v retrieving revision 1.16 retrieving revision 1.17 diff -u -r1.16 -r1.17 --- TcpReplicationThread.java 10 Apr 2005 16:20:46 -0000 1.16 +++ TcpReplicationThread.java 12 Apr 2005 18:56:07 -0000 1.17 @@ -113,7 +113,7 @@ * re-enables OP_READ and calls wakeup() on the selector * so the selector will resume watching this channel. */ - private void drainChannel (SelectionKey key) + protected void drainChannel (SelectionKey key) throws Exception { boolean packetReceived=false; @@ -162,7 +162,7 @@ * @param key * @param channel */ - private void sendAck(SelectionKey key, SocketChannel channel) { + protected void sendAck(SelectionKey key, SocketChannel channel) { try { channel.write(ByteBuffer.wrap(ACK_COMMAND)); 1.8 +45 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml Index: mbeans-descriptors.xml =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml,v retrieving revision 1.7 retrieving revision 1.8 diff -u -r1.7 -r1.8 --- mbeans-descriptors.xml 10 Apr 2005 16:20:46 -0000 1.7 +++ mbeans-descriptors.xml 12 Apr 2005 18:56:07 -0000 1.8 @@ -9,7 +9,10 @@ domain="Catalina" group="Cluster" type="org.apache.catalina.cluster.tcp.SimpleTcpCluster"> - + <attribute name="info" + description="Class version info" + type="java.lang.String" + writeable="false"/> <attribute name="notifyListenersOnReplication" description="notify session attribute listener at backups" type="boolean"/> @@ -71,7 +74,10 @@ domain="Catalina" group="ClusterSender" type="org.apache.catalina.cluster.tcp.ReplicationTransmitter"> - + <attribute name="info" + description="Class version info" + type="java.lang.String" + writeable="false"/> <attribute name="replicationMode" description="replication mode (synchnous,pooled.asynchnous)" type="java.lang.String"/> @@ -87,6 +93,26 @@ is="true" type="boolean" writeable="false" /> + <attribute name="processingTime" + description="sending processing time" + type="long" + writeable="false"/> + <attribute name="minProcessingTime" + description="minimal sending processing time" + type="long" + writeable="false"/> + <attribute name="avgProcessingTime" + description="processing time / nrOfRequests" + type="double" + writeable="false"/> + <attribute name="maxProcessingTime" + description="maximal sending processing time" + type="long" + writeable="false"/> + <attribute name="doTransmitterProcessingStats" + description="create processing time stats" + is="true" + type="boolean" /> <attribute name="nrOfRequests" description="number of send messages to other members" type="long" @@ -152,6 +178,10 @@ writeable="false"/> <attribute name="ackTimeout" description="acknowledge timeout" + type="long"/> + <attribute name="avgMessageSize" + writeable="false" + description="avg message size (totalbytes/nrOfRequests" type="long"/> <attribute name="queueSize" writeable="false" @@ -318,6 +348,10 @@ <attribute name="ackTimeout" description="acknowledge timeout" type="long"/> + <attribute name="avgMessageSize" + writeable="false" + description="avg message size (totalbytes/nrOfRequests" + type="long" /> <attribute name="queueSize" writeable="false" description="queue size" @@ -458,7 +492,7 @@ description="queue remove wait time (queue thread waits)" type="long" writeable="false"/> - <operation name="connect" + <operation name="connect" description="connect to other replication node" impact="ACTION" returnType="void"> @@ -519,6 +553,10 @@ description="socket connected" type="boolean" writeable="false"/> + <attribute name="avgMessageSize" + writeable="false" + description="avg message size (totalbytes/nrOfRequests" + type="long"/> <attribute name="nrOfRequests" description="number of send messages to other members" type="long" @@ -577,9 +615,6 @@ is="true" type="boolean" writeable="false" /> - <attribute name="maxPoolSocketLimit" - description="Max parallel sockets" - type="int"/> <attribute name="keepAliveTimeout" description="active socket keep alive timeout" type="long"/> @@ -599,6 +634,10 @@ description="socket connected" type="boolean" writeable="false"/> + <attribute name="avgMessageSize" + writeable="false" + description="avg message size (totalbytes/nrOfRequests" + type="long"/> <attribute name="nrOfRequests" description="number of send messages to other members" type="long" 1.11 +34 -8 jakarta-tomcat-catalina/modules/cluster/to-do.txt Index: to-do.txt =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/to-do.txt,v retrieving revision 1.10 retrieving revision 1.11 diff -u -r1.10 -r1.11 --- to-do.txt 10 Apr 2005 16:20:46 -0000 1.10 +++ to-do.txt 12 Apr 2005 18:56:07 -0000 1.11 @@ -1,5 +1,16 @@ -Next Steps: +============================== +Next actions +============================== +- reduce cpu and memory consume (Receiver) + - set new compress sender flag at default=false ( < CPU usage) + - Make compact algo + currently message receive data is split at XbyteBuffer#extractPackage and + SimpleTcpCluster#messageDataReceived +- when a lot of messages expire it comes to burst of messages + - all 60 Sec when ManagerBase#processExpires is called! + - Better is to transfer a spezial Epxire Message with an array of expired messages. + - This reduce message transfer and reduce waits for acks. - Documentation wrote a complete new how-to add example configurations @@ -18,12 +29,7 @@ detect long wait acks - Implement fragmentation of large replication objects Compress at message level - s. FarmDeployer war handling -- reduce memory consume - - set new compress sender flag at default=false - - don't copy the buffer to add message header - transfer this from SimpleTcpCluster to DataSender pushMessage - - make it possible that a subclass cryp the transferd messages + Splitting Messages ala FarmDeployer war handling - add a message type to the message header. - filtering at receiver that drop message before build Object - add test cluster project @@ -31,8 +37,11 @@ automated regression testing with some standard configs - add support to dynamic property transfer from SimpleTcpCluster to the Manager like ReplicationTransmitter - +- better restart szenario after failure. + +============================== Nice to have: +============================== - Implement a NonSerializable interface for session attributes that do not wish to be replicated Then we must have ClusterNonSerializable at common classloader @@ -71,8 +80,24 @@ Fixed! Last FileMessage fragment need longe ackTimeout <Cluster ..> <Sender ... ackTimeout="60000"/> </Cluster> +- ReplicationListener and SocketReplicationListener only accept data from cluster member (low level ip restriction) +- Change Message protocol (risk) + Currently 6 byte header, data.length 4 bytes , data, 6 byte end header + Optimized to 2 type header, data.length 4 bytes, data + change at DataSender.writeData and XByteBuffer +============================== COMPLETED +============================== +5.5.10 +- reduce memory and cpu consume (send message) + - set new compress sender flag at default=false ( < CPU usage) + - don't copy the buffer to add message header + transfer this from SimpleTcpCluster to DataSender pushMessage + successfull refactored + - make it possible that a subclass crypt the transfered messages + sub class ReplicationTransmitter and override createMessageData + - don't copy START and END Header for every message, instead send dirctly and DataSender.writeData. - Add a flag for replicated attribute events, to enable or disable them Now can configued with notifyListenersOnReplication=false at SimpleTCPCluster Also can drop HttpSessionLsitener events @@ -87,6 +112,7 @@ - Add new SocketReplicationListener - Add Stats to DeltaManager - Add single sign on support +5.5.9 - Add Keep Alive and WaitForAck at async mode implementation. Make this feature configurable to Sender element at server.xml Is include with 5.5.8 1.1 jakarta-tomcat-catalina/modules/cluster/test/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitterTest.java Index: ReplicationTransmitterTest.java =================================================================== /* * Copyright 1999,2005 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package org.apache.catalina.cluster.tcp; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.zip.GZIPInputStream; import junit.framework.TestCase; import org.apache.catalina.cluster.session.ReplicationStream; import org.apache.catalina.cluster.session.SessionMessageImpl; /** * @author Peter Rossbach * * @version $Revision: 1.1 $ $Date: 2005/04/12 18:56:07 $ */ public class ReplicationTransmitterTest extends TestCase { public void testCreateMessageData() throws Exception { ReplicationTransmitter transmitter = new ReplicationTransmitter(); transmitter.setCompress(true); SessionMessageImpl message= new SessionMessageImpl(); message.setUniqueId("test"); byte [] data = transmitter.createMessageData(message); assertTrue(200 < data.length); Object myobj = getGZPObject(data); assertTrue(myobj instanceof SessionMessageImpl); assertEquals("test", ((SessionMessageImpl)myobj).getUniqueId()); } /** * @param data * @return * @throws IOException * @throws ClassNotFoundException */ private Object getGZPObject(byte[] data) throws IOException, ClassNotFoundException { ByteArrayInputStream bin = new ByteArrayInputStream(data); GZIPInputStream gin = new GZIPInputStream(bin); byte[] tmp = new byte[1024]; int length = gin.read(tmp); byte[] result = new byte[0]; while (length > 0) { byte[] tmpdata = result; result = new byte[result.length + length]; System.arraycopy(tmpdata, 0, result, 0, tmpdata.length); System.arraycopy(tmp, 0, result, tmpdata.length, length); length = gin.read(tmp); } gin.close(); ReplicationStream stream = new ReplicationStream( new java.io.ByteArrayInputStream(result), getClass() .getClassLoader()); Object myobj = stream.readObject(); return myobj; } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]