pero 2005/02/15 01:32:39 Modified: modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitter.java Log: Add dynamic property handling to replicationtransmitter to transfer attributes to senders Add autoReconnect and WaitForAck handling Revision Changes Path 1.21 +157 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Index: ReplicationTransmitter.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v retrieving revision 1.20 retrieving revision 1.21 diff -u -r1.20 -r1.21 --- ReplicationTransmitter.java 27 Dec 2004 09:30:36 -0000 1.20 +++ ReplicationTransmitter.java 15 Feb 2005 09:32:39 -0000 1.21 @@ -16,7 +16,9 @@ package org.apache.catalina.cluster.tcp; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -26,12 +28,25 @@ import org.apache.catalina.cluster.Member; import org.apache.catalina.cluster.io.XByteBuffer; import org.apache.catalina.util.StringManager; +import org.apache.tomcat.util.IntrospectionUtils; + +/** + * @author Peter Rossbach + * @author Filip Hanik + * @version 1.2 + * + */ public class ReplicationTransmitter implements ClusterSender { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory .getLog(ReplicationTransmitter.class); /** + * The descriptive information about this implementation. + */ + private static final String info = "ReplicationTransmitter/1.2"; + + /** * The string manager for this package. */ protected StringManager sm = StringManager.getManager(Constants.Package); @@ -49,10 +64,31 @@ private long ackTimeout = 15000; //15 seconds by default + private boolean waitForAck = true ; + private SimpleTcpCluster cluster; private ObjectName objectName; + private boolean autoConnect = true ; + + private Map properties = new HashMap(); + + private long failureCounter = 0 ; + + // ------------------------------------------------------------- Properties + + /** + * Return descriptive information about this implementation and the + * corresponding version number, in the format + * <code><description>/<version></code>. + */ + public String getInfo() { + + return (info); + + } + private synchronized void addStats(int length) { nrOfRequests++; totalBytes += length; @@ -64,6 +100,16 @@ } } + + /* + * Reset sender statistics + */ + public synchronized void resetStatistics() { + nrOfRequests = 0; + totalBytes = 0; + failureCounter = 0; + } + /** * @return Returns the nrOfRequests. */ @@ -88,13 +134,14 @@ throw new IllegalArgumentException(msg); } - + public synchronized void add(Member member) { try { String key = getKey(member); if (!map.containsKey(key)) { IDataSender sender = IDataSenderFactory.getIDataSender( replicationMode, member); + transferSenderProperty(sender); map.put(key, sender); registerSenderMBean(member, sender); } @@ -103,7 +150,19 @@ } }//add - private String getKey(Member member) { + /** + * Transfer all properties from transmitter to concrete sender + * @param sender + */ + protected void transferSenderProperty(IDataSender sender) { + for (Iterator iter = getPropertyNames(); iter.hasNext();) { + String pkey = (String) iter.next(); + Object value = getProperty(pkey); + IntrospectionUtils.setProperty(sender, pkey, value.toString()); + } + } + + protected String getKey(Member member) { return member.getHost() + ":" + member.getPort(); } @@ -234,16 +293,28 @@ return result; } + /** + * Send message to concrete sender. If autoConnect is true, check is connection broken + * and the reconnect the complete sender. + * <ul> + * <li>failure the suspect flag is set true. After successfully + * sending the suspect flag is set to false.</li> + * <li>Stats is only update after sussesfull sending</li> + * </ul> + * + * @param sessionId Unique Message Id + * @param data message Data + * @param sender concrete message sender + * @throws java.io.IOException + */ protected void sendMessageData(String sessionId, byte[] data, IDataSender sender) throws java.io.IOException { if (sender == null) throw new java.io.IOException( "Sender not available. Make sure sender information is available to the ReplicationTransmitter."); try { - if (!sender.isConnected()) + if (autoConnect && !sender.isConnected()) sender.connect(); - //set the timeout, will be ignored by async senders - sender.setAckTimeout(getAckTimeout()); sender.sendMessage(sessionId, data); sender.setSuspect(false); addStats(data.length); @@ -257,7 +328,7 @@ } } sender.setSuspect(true); - + failureCounter++ ; } } @@ -293,19 +364,53 @@ return replicationMode; } + /** + * @return + * @deprecated since Version 1.1 + */ public boolean getIsSenderSynchronized() { return IDataSenderFactory.SYNC_MODE.equals(replicationMode) || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode); } + /** + * @return Returns the autoConnect. + */ + public boolean isAutoConnect() { + return autoConnect; + } + /** + * @param autoConnect The autoConnect to set. + */ + public void setAutoConnect(boolean autoConnect) { + this.autoConnect = autoConnect; + setProperty("autoConnect", String.valueOf(autoConnect)); + + } + public long getAckTimeout() { return ackTimeout; } public void setAckTimeout(long ackTimeout) { this.ackTimeout = ackTimeout; + setProperty("ackTimeout", String.valueOf(ackTimeout)); } + /** + * @return Returns the waitForAck. + */ + public boolean isWaitForAck() { + return waitForAck; + } + /** + * @param waitForAck The waitForAck to set. + */ + public void setWaitForAck(boolean waitForAck) { + this.waitForAck = waitForAck; + setProperty("waitForAck", String.valueOf(waitForAck)); + } + /* * (non-Javadoc) * @@ -316,4 +421,50 @@ } + /** + * set config attributes with reflect + * @param name + * @param value + */ + public void setProperty( String name, Object value ) { + if( log.isTraceEnabled()) + log.trace(sm.getString("ReplicationTransmitter.setProperty", name, value)); + + properties.put(name, value); + } + + /** + * get current config + * @param key + * @return + */ + public Object getProperty( String key ) { + if( log.isTraceEnabled()) + log.trace(sm.getString("ReplicationTransmitter.getProperty", key)); + return properties.get(key); + } + + /** + * Get all properties keys + * @return + */ + public Iterator getPropertyNames() { + return properties.keySet().iterator(); + } + + + /** + * remove a configured property. + * @param key + */ + public void removeProperty(String key) { + properties.remove(key); + } + + /** + * @return Returns the failureCounter. + */ + public long getFailureCounter() { + return failureCounter; + } } \ No newline at end of file
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]