This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new c27c9d8719 Fixes FateIT being flaky (#5738)
c27c9d8719 is described below
commit c27c9d8719704330f88af8db3deaff197d1e98af
Author: Keith Turner <[email protected]>
AuthorDate: Wed Jul 16 12:05:20 2025 -0400
Fixes FateIT being flaky (#5738)
In FateIT fate threads were hanging around after a test method completed
and sometimes causing problems for subsequent test. Made the test
methods wait for all threads in to stop in Fate
---
.../main/java/org/apache/accumulo/core/fate/Fate.java | 16 +++++++++++++---
.../main/java/org/apache/accumulo/manager/Manager.java | 2 +-
.../org/apache/accumulo/test/fate/zookeeper/FateIT.java | 13 ++++++++-----
3 files changed, 22 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index aac1921914..f1c3bb9e64 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -419,9 +419,19 @@ public class Fate<T> {
/**
* Flags that FATE threadpool to clear out and end. Does not actively stop
running FATE processes.
*/
- public void shutdown() {
+ public void shutdown(boolean wait) {
keepRunning.set(false);
- executor.shutdown();
+ if (wait) {
+ executor.shutdownNow();
+ while (!executor.isTerminated()) {
+ try {
+ executor.awaitTermination(1, SECONDS);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ } else {
+ executor.shutdown();
+ }
}
-
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index c4914fc0ff..a9b861fbd4 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1458,7 +1458,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
sa.server.stop();
log.debug("Shutting down fate.");
- fate().shutdown();
+ fate().shutdown(false);
final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
try {
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index 7cc90a26a9..155df5ab9c 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -42,6 +42,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -189,6 +190,8 @@ public class FateIT {
private static CountDownLatch finishCall;
private static CountDownLatch undoLatch;
+ private static AtomicInteger nextFateDir = new AtomicInteger(0);
+
private enum ExceptionLocation {
CALL, IS_READY
};
@@ -282,7 +285,7 @@ public class FateIT {
return false;
}, SECONDS.toMillis(30), 10);
} finally {
- fate.shutdown();
+ fate.shutdown(true);
}
}
@@ -325,7 +328,7 @@ public class FateIT {
fate.delete(txid);
assertThrows(KeeperException.NoNodeException.class, () ->
getTxStatus(zk, txid));
} finally {
- fate.shutdown();
+ fate.shutdown(true);
}
}
@@ -402,7 +405,7 @@ public class FateIT {
fate.delete(txid);
assertThrows(KeeperException.NoNodeException.class, () ->
getTxStatus(zk, txid));
} finally {
- fate.shutdown();
+ fate.shutdown(true);
}
}
@@ -443,7 +446,7 @@ public class FateIT {
// cancel the transaction
assertFalse(fate.cancel(txid));
} finally {
- fate.shutdown();
+ fate.shutdown(true);
}
}
@@ -507,7 +510,7 @@ public class FateIT {
assertEquals(FAILED, fate.waitForCompletion(txid));
assertTrue(fate.getException(txid).getMessage().contains("isReady()
failed"));
} finally {
- fate.shutdown();
+ fate.shutdown(true);
}
}