pero 2005/03/25 14:12:32 Modified: modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitter.java Log: Refactor Code Add compress transfer handling Add api docs Revision Changes Path 1.22 +444 -250 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Index: ReplicationTransmitter.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v retrieving revision 1.21 retrieving revision 1.22 diff -u -r1.21 -r1.22 --- ReplicationTransmitter.java 15 Feb 2005 09:32:39 -0000 1.21 +++ ReplicationTransmitter.java 25 Mar 2005 22:12:31 -0000 1.22 @@ -16,6 +16,7 @@ package org.apache.catalina.cluster.tcp; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -24,18 +25,22 @@ import javax.management.ObjectName; import org.apache.catalina.cluster.ClusterSender; -import org.apache.catalina.cluster.Constants; import org.apache.catalina.cluster.Member; import org.apache.catalina.cluster.io.XByteBuffer; import org.apache.catalina.util.StringManager; import org.apache.tomcat.util.IntrospectionUtils; - /** + * Transmit message to ohter cluster members create sender from replicationMode + * type + * FIXME i18n log messages + * FIXME compress data depends on message type and size + * FIXME send very big messages at some block see FarmWarDeployer! + * TODO pause and resume senders + * * @author Peter Rossbach * @author Filip Hanik - * @version 1.2 - * + * @version $Revision$ $Date$ */ public class ReplicationTransmitter implements ClusterSender { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory @@ -44,37 +49,69 @@ /** * The descriptive information about this implementation. */ - private static final String info = "ReplicationTransmitter/1.2"; + private static final String info = "ReplicationTransmitter/1.3"; /** * The string manager for this package. */ protected StringManager sm = StringManager.getManager(Constants.Package); - private java.util.HashMap map = new java.util.HashMap(); + private Map map = new HashMap(); public ReplicationTransmitter() { } + /** + * number of transmitted messages> + */ private long nrOfRequests = 0; + /** + * number of transmitted bytes + */ private long totalBytes = 0; + private long failureCounter = 0; + + /** + * current sender replication mode + */ private String replicationMode; + /** + * sender default ackTimeout + */ private long ackTimeout = 15000; //15 seconds by default - private boolean waitForAck = true ; - - private SimpleTcpCluster cluster; + /** + * enabled wait for ack + */ + private boolean waitForAck = true; - private ObjectName objectName; + /** + * autoConnect sender when next message send + */ + private boolean autoConnect = true; - private boolean autoConnect = true ; + /** + * Compress message data bytes + */ + private boolean compress = true; + /** + * dynamic sender <code>properties</code> + */ private Map properties = new HashMap(); - private long failureCounter = 0 ; + /** + * my cluster + */ + private SimpleTcpCluster cluster; + + /** + * Transmitter Mbean name + */ + private ObjectName objectName; // ------------------------------------------------------------- Properties @@ -89,27 +126,6 @@ } - private synchronized void addStats(int length) { - nrOfRequests++; - totalBytes += length; - if (log.isDebugEnabled() && - (nrOfRequests % 100) == 0) { - log.debug("Nr of bytes sent=" + totalBytes + " over " - + nrOfRequests + " ==" + (totalBytes / nrOfRequests) - + " bytes/request"); - } - - } - - /* - * Reset sender statistics - */ - public synchronized void resetStatistics() { - nrOfRequests = 0; - totalBytes = 0; - failureCounter = 0; - } - /** * @return Returns the nrOfRequests. */ @@ -124,6 +140,28 @@ return totalBytes; } + /** + * @return Returns the failureCounter. + */ + public long getFailureCounter() { + return failureCounter; + } + + /** + * current replication mode + * + * @return + */ + public String getReplicationMode() { + return replicationMode; + } + + /** + * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue) + * + * @see IDataSenderFactory#validateMode(String) + * @param mode + */ public void setReplicationMode(String mode) { String msg = IDataSenderFactory.validateMode(mode); if (msg == null) { @@ -134,94 +172,195 @@ throw new IllegalArgumentException(msg); } - - public synchronized void add(Member member) { - try { - String key = getKey(member); - if (!map.containsKey(key)) { - IDataSender sender = IDataSenderFactory.getIDataSender( - replicationMode, member); - transferSenderProperty(sender); - map.put(key, sender); - registerSenderMBean(member, sender); - } - } catch (java.io.IOException x) { - log.error("Unable to create and add a IDataSender object.", x); - } - }//add /** - * Transfer all properties from transmitter to concrete sender - * @param sender + * Transmitter ObjectName + * + * @param name */ - protected void transferSenderProperty(IDataSender sender) { - for (Iterator iter = getPropertyNames(); iter.hasNext();) { - String pkey = (String) iter.next(); - Object value = getProperty(pkey); - IntrospectionUtils.setProperty(sender, pkey, value.toString()); - } + public void setObjectName(ObjectName name) { + objectName = name; } - protected String getKey(Member member) { - return member.getHost() + ":" + member.getPort(); + public ObjectName getObjectName() { + return objectName; } - public synchronized void remove(Member member) { - String key = getKey(member); - IDataSender toberemoved = (IDataSender) map.get(key); - if (toberemoved == null) - return; - unregisterSenderMBean(toberemoved); - toberemoved.disconnect(); - map.remove(key); - + /** + * @return Returns the compress. + */ + public boolean isCompress() { + return compress; } - protected void unregisterSenderMBean(IDataSender sender) { - try { - MBeanServer mserver = cluster.getMBeanServer(); - if (mserver != null) { - mserver.unregisterMBean(getSenderObjectName(sender)); - } - } catch (Exception e) { - log.warn(e); - } + /** + * @param compress + * The compress to set. + */ + public void setCompress(boolean compressMessageData) { + this.compress = compressMessageData; } - protected void registerSenderMBean(Member member, IDataSender sender) { - if (member != null && cluster != null) { - try { - MBeanServer mserver = cluster.getMBeanServer(); - ObjectName senderName = getSenderObjectName(sender); - if (mserver.isRegistered(senderName)) { - if (log.isWarnEnabled()) - log.warn(sm.getString( - "cluster.mbean.register.allready", senderName)); - return; - } - mserver.registerMBean(cluster.getManagedBean(sender), - senderName); - } catch (Exception e) { - log.warn(e); - } - } + /** + * @return Returns the autoConnect. + */ + public boolean isAutoConnect() { + return autoConnect; } - protected ObjectName getSenderObjectName(IDataSender sender) { - ObjectName senderName = null; - try { - ObjectName clusterName = cluster.getObjectName(); - MBeanServer mserver = cluster.getMBeanServer(); - senderName = new ObjectName(clusterName.getDomain() - + ":type=IDataSender,host=" - + clusterName.getKeyProperty("host") + ",senderAddress=" - + sender.getAddress().getHostAddress() + ",senderPort=" + sender.getPort()); - } catch (Exception e) { - log.warn(e); + /** + * @param autoConnect + * The autoConnect to set. + */ + public void setAutoConnect(boolean autoConnect) { + this.autoConnect = autoConnect; + setProperty("autoConnect", String.valueOf(autoConnect)); + + } + + /** + * @return + */ + public long getAckTimeout() { + return ackTimeout; + } + + /** + * @param ackTimeout + */ + public void setAckTimeout(long ackTimeout) { + this.ackTimeout = ackTimeout; + setProperty("ackTimeout", String.valueOf(ackTimeout)); + } + + /** + * @return Returns the waitForAck. + */ + public boolean isWaitForAck() { + return waitForAck; + } + + /** + * @param waitForAck + * The waitForAck to set. + */ + public void setWaitForAck(boolean waitForAck) { + this.waitForAck = waitForAck; + setProperty("waitForAck", String.valueOf(waitForAck)); + } + + /* + * configured in cluster + * + * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster) + */ + public void setCatalinaCluster(SimpleTcpCluster cluster) { + this.cluster = cluster; + + } + + /** + * @return + * @deprecated since version 5.5.7 + */ + public boolean getIsSenderSynchronized() { + return IDataSenderFactory.SYNC_MODE.equals(replicationMode) + || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode); + } + + // ------------------------------------------------------------- dynamic + // sender property handling + + /** + * set config attributes with reflect + * + * @param name + * @param value + */ + public void setProperty(String name, Object value) { + if (log.isTraceEnabled()) + log.trace(sm.getString("ReplicationTransmitter.setProperty", name, + value)); + + properties.put(name, value); + } + + /** + * get current config + * + * @param key + * @return + */ + public Object getProperty(String key) { + if (log.isTraceEnabled()) + log.trace(sm.getString("ReplicationTransmitter.getProperty", key)); + return properties.get(key); + } + + /** + * Get all properties keys + * + * @return + */ + public Iterator getPropertyNames() { + return properties.keySet().iterator(); + } + + /** + * remove a configured property. + * + * @param key + */ + public void removeProperty(String key) { + properties.remove(key); + } + + // ------------------------------------------------------------- public + + /** + * Send data to one member + * + * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String, + * byte[], org.apache.catalina.cluster.Member) + */ + public void sendMessage(String sessionId, byte[] indata, Member member) + throws java.io.IOException { + byte[] data = convertSenderData(indata); + String key = getKey(member); + IDataSender sender = (IDataSender) map.get(key); + sendMessageData(sessionId, data, sender); + } + + /** + * send message to all senders (broadcast) + * + * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String, + * byte[]) + */ + public void sendMessage(String sessionId, byte[] indata) + throws java.io.IOException { + IDataSender[] senders = getSenders(); + byte[] data = convertSenderData(indata); + for (int i = 0; i < senders.length; i++) { + + IDataSender sender = senders[i]; + try { + sendMessageData(sessionId, data, sender); + } catch (Exception x) { + + if (!sender.getSuspect()) + log.warn("Unable to send replicated message to " + sender + + ", is server down?", x); + sender.setSuspect(true); + } } - return senderName; } + /** + * start the sender and register transmitter mbean + * + * @see org.apache.catalina.cluster.ClusterSender#start() + */ public void start() throws java.io.IOException { if (cluster != null) { ObjectName clusterName = cluster.getObjectName(); @@ -248,14 +387,11 @@ } - public void setObjectName(ObjectName name) { - objectName = name; - } - - public ObjectName getObjectName() { - return objectName; - } - + /* + * stop the sender and deregister mbeans (transmitter, senders) + * + * @see org.apache.catalina.cluster.ClusterSender#stop() + */ public synchronized void stop() { Iterator i = map.entrySet().iterator(); while (i.hasNext()) { @@ -279,192 +415,250 @@ } + /** + * get all current senders + * + * @return + */ public IDataSender[] getSenders() { - java.util.Iterator i = map.entrySet().iterator(); - java.util.Vector v = new java.util.Vector(); - while (i.hasNext()) { - IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next()) - .getValue(); + java.util.Iterator iter = map.entrySet().iterator(); + IDataSender[] array = new IDataSender[map.size()]; + int i = 0; + while (iter.hasNext()) { + IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter + .next()).getValue(); if (sender != null) - v.addElement(sender); + array[i] = sender; + i++; } - IDataSender[] result = new IDataSender[v.size()]; - v.copyInto(result); - return result; + return array; } /** - * Send message to concrete sender. If autoConnect is true, check is connection broken - * and the reconnect the complete sender. - * <ul> - * <li>failure the suspect flag is set true. After successfully - * sending the suspect flag is set to false.</li> - * <li>Stats is only update after sussesfull sending</li> - * </ul> + * get all current senders * - * @param sessionId Unique Message Id - * @param data message Data - * @param sender concrete message sender - * @throws java.io.IOException + * @return */ - protected void sendMessageData(String sessionId, byte[] data, - IDataSender sender) throws java.io.IOException { - if (sender == null) - throw new java.io.IOException( - "Sender not available. Make sure sender information is available to the ReplicationTransmitter."); - try { - if (autoConnect && !sender.isConnected()) - sender.connect(); - sender.sendMessage(sessionId, data); - sender.setSuspect(false); - addStats(data.length); - } catch (Exception x) { - if (log.isWarnEnabled()) { - if (!sender.getSuspect()) { - log - .warn( - "Unable to send replicated message, is server down?", - x); - } - } - sender.setSuspect(true); - failureCounter++ ; + public ObjectName[] getSenderObjectNames() { + java.util.Iterator iter = map.entrySet().iterator(); + ObjectName array[] = new ObjectName[map.size()]; + int i = 0; + while (iter.hasNext()) { + IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter + .next()).getValue(); + if (sender != null) + array[i] = getSenderObjectName(sender); + i++; } - + return array; } - public void sendMessage(String sessionId, byte[] indata, Member member) - throws java.io.IOException { - byte[] data = XByteBuffer.createDataPackage(indata); - String key = getKey(member); - IDataSender sender = (IDataSender) map.get(key); - sendMessageData(sessionId, data, sender); + /* + * Reset sender statistics + */ + public synchronized void resetStatistics() { + nrOfRequests = 0; + totalBytes = 0; + failureCounter = 0; } - public void sendMessage(String sessionId, byte[] indata) - throws java.io.IOException { - IDataSender[] senders = getSenders(); - byte[] data = XByteBuffer.createDataPackage(indata); - for (int i = 0; i < senders.length; i++) { - - IDataSender sender = senders[i]; - try { - sendMessageData(sessionId, data, sender); - } catch (Exception x) { - - if (!sender.getSuspect()) - log.warn("Unable to send replicated message to " + sender - + ", is server down?", x); - sender.setSuspect(true); + /* + * add new cluster member and create sender ( s. replicationMode) transfer + * current properties to sender + * + * @see org.apache.catalina.cluster.ClusterSender#add(org.apache.catalina.cluster.Member) + */ + public synchronized void add(Member member) { + try { + String key = getKey(member); + if (!map.containsKey(key)) { + IDataSender sender = IDataSenderFactory.getIDataSender( + replicationMode, member); + transferSenderProperty(sender); + map.put(key, sender); + registerSenderMBean(member, sender); } - }//while - } - - public String getReplicationMode() { - return replicationMode; + } catch (java.io.IOException x) { + log.error("Unable to create and add a IDataSender object.", x); + } } /** - * @return - * @deprecated since Version 1.1 + * remove sender from transmitter. ( deregister mbean and disconnect sender ) + * + * @see org.apache.catalina.cluster.ClusterSender#remove(org.apache.catalina.cluster.Member) */ - public boolean getIsSenderSynchronized() { - return IDataSenderFactory.SYNC_MODE.equals(replicationMode) - || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode); + public synchronized void remove(Member member) { + String key = getKey(member); + IDataSender toberemoved = (IDataSender) map.get(key); + if (toberemoved == null) + return; + unregisterSenderMBean(toberemoved); + toberemoved.disconnect(); + map.remove(key); + } + // ------------------------------------------------------------- protected + /** - * @return Returns the autoConnect. + * calc number of requests and transfered bytes. Log stats all 100 requets + * + * @param length */ - public boolean isAutoConnect() { - return autoConnect; + protected synchronized void addStats(int length) { + nrOfRequests++; + totalBytes += length; + if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) { + log.debug("Nr of bytes sent=" + totalBytes + " over " + + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests) + + " bytes/request; failures=" + failureCounter); + } + } + /** - * @param autoConnect The autoConnect to set. + * Transfer all properties from transmitter to concrete sender + * + * @param sender */ - public void setAutoConnect(boolean autoConnect) { - this.autoConnect = autoConnect; - setProperty("autoConnect", String.valueOf(autoConnect)); - - } - - public long getAckTimeout() { - return ackTimeout; - } - - public void setAckTimeout(long ackTimeout) { - this.ackTimeout = ackTimeout; - setProperty("ackTimeout", String.valueOf(ackTimeout)); + protected void transferSenderProperty(IDataSender sender) { + for (Iterator iter = getPropertyNames(); iter.hasNext();) { + String pkey = (String) iter.next(); + Object value = getProperty(pkey); + IntrospectionUtils.setProperty(sender, pkey, value.toString()); + } } /** - * @return Returns the waitForAck. + * set unique key to find sender + * + * @param member + * @return concat member.host:member.port */ - public boolean isWaitForAck() { - return waitForAck; + protected String getKey(Member member) { + return member.getHost() + ":" + member.getPort(); } + /** - * @param waitForAck The waitForAck to set. - */ - public void setWaitForAck(boolean waitForAck) { - this.waitForAck = waitForAck; - setProperty("waitForAck", String.valueOf(waitForAck)); - } - - /* - * (non-Javadoc) + * unregsister sendern Mbean * - * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster) + * @see #getSenderObjectName(IDataSender) + * @param sender */ - public void setCatalinaCluster(SimpleTcpCluster cluster) { - this.cluster = cluster; - + protected void unregisterSenderMBean(IDataSender sender) { + try { + MBeanServer mserver = cluster.getMBeanServer(); + if (mserver != null) { + mserver.unregisterMBean(getSenderObjectName(sender)); + } + } catch (Exception e) { + log.warn(e); + } } - /** - * set config attributes with reflect - * @param name - * @param value + /** + * register MBean and check it exist (big problem!) + * + * @param member + * @param sender */ - public void setProperty( String name, Object value ) { - if( log.isTraceEnabled()) - log.trace(sm.getString("ReplicationTransmitter.setProperty", name, value)); - - properties.put(name, value); + protected void registerSenderMBean(Member member, IDataSender sender) { + if (member != null && cluster != null) { + try { + MBeanServer mserver = cluster.getMBeanServer(); + ObjectName senderName = getSenderObjectName(sender); + if (mserver.isRegistered(senderName)) { + if (log.isWarnEnabled()) + log.warn(sm.getString( + "cluster.mbean.register.allready", senderName)); + return; + } + mserver.registerMBean(cluster.getManagedBean(sender), + senderName); + } catch (Exception e) { + log.warn(e); + } + } } /** - * get current config - * @param key + * build sender ObjectName ( + * engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" ) + * + * @param sender * @return */ - public Object getProperty( String key ) { - if( log.isTraceEnabled()) - log.trace(sm.getString("ReplicationTransmitter.getProperty", key)); - return properties.get(key); + protected ObjectName getSenderObjectName(IDataSender sender) { + ObjectName senderName = null; + try { + ObjectName clusterName = cluster.getObjectName(); + MBeanServer mserver = cluster.getMBeanServer(); + senderName = new ObjectName(clusterName.getDomain() + + ":type=IDataSender,host=" + + clusterName.getKeyProperty("host") + ",senderAddress=" + + sender.getAddress().getHostAddress() + ",senderPort=" + + sender.getPort()); + } catch (Exception e) { + log.warn(e); + } + return senderName; } /** - * Get all properties keys + * compress data + * + * @see XByteBuffer#createDataPackage(byte[]) + * @param indata * @return + * @throws IOException + * FIXME get CompressMessageDate from cluster instanz */ - public Iterator getPropertyNames() { - return properties.keySet().iterator(); + protected byte[] convertSenderData(byte[] data) throws IOException { + return XByteBuffer.createDataPackage(data, isCompress()); } - - /** - * remove a configured property. - * @param key - */ - public void removeProperty(String key) { - properties.remove(key); - } - /** - * @return Returns the failureCounter. + * Send message to concrete sender. If autoConnect is true, check is + * connection broken and the reconnect the complete sender. + * <ul> + * <li>failure the suspect flag is set true. After successfully sending the + * suspect flag is set to false.</li> + * <li>Stats is only update after sussesfull sending</li> + * </ul> + * + * @param sessionId + * Unique Message Id + * @param data + * message Data + * @param sender + * concrete message sender + * @throws java.io.IOException */ - public long getFailureCounter() { - return failureCounter; + protected void sendMessageData(String sessionId, byte[] data, + IDataSender sender) throws java.io.IOException { + if (sender == null) + throw new java.io.IOException( + "Sender not available. Make sure sender information is available to the ReplicationTransmitter."); + try { + if (autoConnect && !sender.isConnected()) + sender.connect(); + sender.sendMessage(sessionId, data); + sender.setSuspect(false); + addStats(data.length); + } catch (Exception x) { + if (log.isWarnEnabled()) { + if (!sender.getSuspect()) { + log + .warn( + "Unable to send replicated message, is server down?", + x); + } + } + sender.setSuspect(true); + failureCounter++; + } + } + } \ No newline at end of file
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]