fhanik 2004/04/07 13:43:55
Modified: modules/cluster/src/share/org/apache/catalina/cluster
SessionMessage.java
modules/cluster/src/share/org/apache/catalina/cluster/session
DeltaManager.java
modules/cluster/src/share/org/apache/catalina/cluster/tcp
AsyncSocketSender.java ReplicationValve.java
SimpleTcpCluster.java
modules/cluster/src/share/org/apache/catalina/cluster/util
SmartQueue.java
Log:
Two fixes for async replication
1. Bug 28161 - The "smart" queue will now use the session message getUniqueId
instead of sessionId, this makes sure that
no messages are overridden when the queue fills up
2. The async replication thread will now properly exit when members join or leave
the cluster
Revision Changes Path
1.7 +18 -0
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java
Index: SessionMessage.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SessionMessage.java 27 Feb 2004 14:58:55 -0000 1.6
+++ SessionMessage.java 7 Apr 2004 20:43:54 -0000 1.7
@@ -104,6 +104,7 @@
private Member mSrc;
private String mContextName;
private long serializationTimestamp;
+ private String uniqueId;
/**
@@ -144,6 +145,17 @@
mSession = session;
mSessionID = sessionID;
mContextName = contextName;
+ uniqueId = sessionID;
+ }
+
+ public SessionMessage( String contextName,
+ int eventtype,
+ byte[] session,
+ String sessionID,
+ String uniqueID)
+ {
+ this(contextName,eventtype,session,sessionID);
+ uniqueId = uniqueID;
}
/**
@@ -211,5 +223,11 @@
public String getContextName() {
return mContextName;
+ }
+ public String getUniqueId() {
+ return uniqueId;
+ }
+ public void setUniqueId(String uniqueId) {
+ this.uniqueId = uniqueId;
}
}//SessionMessage
1.21 +6 -4
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
Index: DeltaManager.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- DeltaManager.java 7 Apr 2004 19:02:30 -0000 1.20
+++ DeltaManager.java 7 Apr 2004 20:43:54 -0000 1.21
@@ -767,13 +767,15 @@
byte[] data = unloadDeltaRequest(deltaRequest);
msg = new SessionMessage(name, SessionMessage.EVT_SESSION_DELTA,
- data, sessionId);
+ data, sessionId,
+ sessionId+System.currentTimeMillis());
session.resetDeltaRequest();
} else if ( !session.isPrimarySession() ) {
msg = new SessionMessage(getName(),
SessionMessage.EVT_SESSION_ACCESSED,
null,
- sessionId);
+ sessionId,
+ sessionId+System.currentTimeMillis());
}
session.setPrimarySession(true);
//check to see if we need to send out an access message
@@ -783,7 +785,7 @@
msg = new SessionMessage(getName(),
SessionMessage.EVT_SESSION_ACCESSED,
null,
- sessionId);
+ sessionId+System.currentTimeMillis());
}
}
1.6 +25 -5
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
Index: AsyncSocketSender.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- AsyncSocketSender.java 27 Feb 2004 14:58:56 -0000 1.5
+++ AsyncSocketSender.java 7 Apr 2004 20:43:55 -0000 1.6
@@ -34,12 +34,12 @@
private SmartQueue queue = new SmartQueue();
private boolean suspect;
+ private QueueThread queueThread = null;
+
public AsyncSocketSender(InetAddress host, int port) {
this.address = host;
this.port = port;
- QueueThread t = new QueueThread(this);
- t.setDaemon(true);
- t.start();
+ checkThread();
log.info("Started async sender thread for TCP replication.");
}
@@ -54,6 +54,16 @@
public void connect() throws java.io.IOException {
sc = new Socket(getAddress(),getPort());
isSocketConnected = true;
+ checkThread();
+
+ }
+
+ protected void checkThread() {
+ if ( queueThread == null ) {
+ queueThread = new QueueThread(this);
+ queueThread.setDaemon(true);
+ queueThread.start();
+ }
}
public void disconnect() {
@@ -63,6 +73,11 @@
}catch ( Exception x)
{}
isSocketConnected = false;
+ if ( queueThread != null ) {
+ queueThread.stopRunning();
+ queueThread = null;
+ }
+
}
public boolean isConnected() {
@@ -112,15 +127,20 @@
private class QueueThread extends Thread {
AsyncSocketSender sender;
+ private boolean keepRunning = true;
public QueueThread(AsyncSocketSender sender) {
this.sender = sender;
setName("Cluster-AsyncSocketSender-"+(threadCounter++));
}
+ public void stopRunning() {
+ keepRunning = false;
+ }
+
public void run() {
- while (true) {
- SmartQueue.SmartEntry entry = sender.queue.remove();
+ while (keepRunning) {
+ SmartQueue.SmartEntry entry = sender.queue.remove(5000);
if ( entry != null ) {
try {
byte[] data = (byte[]) entry.getValue();
1.12 +3 -1
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
Index: ReplicationValve.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- ReplicationValve.java 27 Feb 2004 14:58:56 -0000 1.11
+++ ReplicationValve.java 7 Apr 2004 20:43:55 -0000 1.12
@@ -132,7 +132,9 @@
throws IOException, ServletException
{
//this happens before the request
+ //long _debugstart = System.currentTimeMillis();
context.invokeNext(request, response);
+ //System.out.println("[DEBUG] Regular invoke
took="+(System.currentTimeMillis()-_debugstart)+" ms.");
//this happens after the request
try
{
1.37 +3 -2
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
Index: SimpleTcpCluster.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -r1.36 -r1.37
--- SimpleTcpCluster.java 27 Feb 2004 14:58:56 -0000 1.36
+++ SimpleTcpCluster.java 7 Apr 2004 20:43:55 -0000 1.37
@@ -400,7 +400,7 @@
}//end if
}
else {
- clusterSender.sendMessage(msg.getSessionID(),data);
+ clusterSender.sendMessage(msg.getUniqueId(),data);
}
} catch ( Exception x ) {
log.error("Unable to send message through cluster sender.",x);
@@ -493,6 +493,7 @@
if ( myobj != null && myobj instanceof SessionMessage ) {
SessionMessage msg = (SessionMessage)myobj;
+ log.debug("Assuming clocks are synched: Replication
took="+(System.currentTimeMillis()-msg.getTimestamp())+" ms.");
String ctxname = msg.getContextName();
//check if the message is a EVT_GET_ALL_SESSIONS,
//if so, wait until we are fully started up
1.4 +9 -2
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java
Index: SmartQueue.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SmartQueue.java 27 Feb 2004 14:58:56 -0000 1.3
+++ SmartQueue.java 7 Apr 2004 20:43:55 -0000 1.4
@@ -88,12 +88,19 @@
* @return
*/
public SmartEntry remove() {
- SmartEntry result = null;
+ return remove(0);
+ }
+ public SmartEntry remove(long timeout) {
+ SmartEntry result = null;
+ long startEntry = System.currentTimeMillis();
synchronized (mutex) {
while ( size() == 0 ) {
try {
if ( debug != 0 )
log.debug("["+Thread.currentThread().getName()+"][SmartQueue] Queue sleeping until
object added size="+size()+".");
- mutex.wait();
+ if ( (timeout != 0) &&
((System.currentTimeMillis()-startEntry)>timeout) ) {
+ return null;
+ }
+ mutex.wait(timeout);
if ( debug != 0 )
log.debug("["+Thread.currentThread().getName()+"][SmartQueue] Queue woke up or
interrupted size="+size()+".");
}
catch(IllegalMonitorStateException ex) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]