This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new e8357c4d7f fixes CloseScannerIT to work with changes in #4840 (#4846)
e8357c4d7f is described below
commit e8357c4d7fc84f609f27ac530ea7b3554d4ea0ff
Author: Keith Turner <[email protected]>
AuthorDate: Thu Aug 29 16:54:28 2024 -0700
fixes CloseScannerIT to work with changes in #4840 (#4846)
After the changes in #4840 a scan session with an active thread would
not be discarded. The change made CloseScannerIT start failing in 3.1.
Adjusted the test to account for this by giving time for deferred
session cleanup that happens when there is an active thread associated
with a scan session.
The test was not failing in 2.1 because the test was less strict in this
branch. Applied this fix starting in 2.1 to make the test consistent
across branches.
---
.../org/apache/accumulo/test/CloseScannerIT.java | 72 ++++++++++++++++++----
.../apache/accumulo/test/functional/ScannerIT.java | 4 ++
2 files changed, 64 insertions(+), 12 deletions(-)
diff --git a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
index 9f0bd56bb1..f6c157fd2b 100644
--- a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
@@ -18,25 +18,45 @@
*/
package org.apache.accumulo.test;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
-import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CloseScannerIT extends AccumuloClusterHarness {
static final int ROWS = 1000;
static final int COLS = 1000;
+ private static final Logger log =
LoggerFactory.getLogger(CloseScannerIT.class);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), "20s");
+ cfg.setSiteConfig(siteConfig);
+ }
+
+ /**
+ * {@link
org.apache.accumulo.test.functional.ScannerIT#testSessionCleanup()} is a
similar test.
+ */
@Test
public void testManyScans() throws Exception {
@@ -49,25 +69,53 @@ public class CloseScannerIT extends AccumuloClusterHarness {
client.tableOperations().flush(tableName, null, null, true);
- for (int i = 0; i < 200; i++) {
- try (Scanner scanner = createScanner(client, tableName, i)) {
+ Timer timer = Timer.startNew();
+
+ int count = 0;
+ while (count < 200 && timer.elapsed(TimeUnit.MILLISECONDS) < 3000) {
+ try (Scanner scanner = createScanner(client, tableName, count)) {
scanner.setRange(new Range());
- scanner.setReadaheadThreshold(i % 2 == 0 ? 0 : 3);
+ scanner.setReadaheadThreshold(count % 2 == 0 ? 0 : 3);
- for (int j = 0; j < i % 7 + 1; j++) {
+ for (int j = 0; j < count % 7 + 1; j++) {
// only read a little data and quit, this should leave a session
open on the tserver
scanner.stream().limit(10).forEach(e -> {});
}
} // when the scanner is closed, all open sessions should be closed
+ count++;
}
- List<String> tservers = client.instanceOperations().getTabletServers();
- int activeScans = 0;
- for (String tserver : tservers) {
- activeScans +=
client.instanceOperations().getActiveScans(tserver).size();
- }
+ log.debug("Ran {} scans in {} ms", count,
timer.elapsed(TimeUnit.MILLISECONDS));
+
+ // The goal of this test it to ensure the scanner client object closes
server side scan
+ // sessions and not idle session cleanup. To do this the test is making
the following
+ // assumptions about how Accumulo works to set the timings in this test :
+ // 1. Sessions not closed by the scanner will be cleaned up in 20s based
on config set before
+ // starting test
+ // 2. This test creates readahead threads for some scans. The presence
of a thread will
+ // prevent immediate cleanup of the server side scan session. So when
the scanner sends the
+ // RPC to close the session if a thread is present, then cleanup will be
deferred. A scheduled
+ // task in the tserver runs deferred cleanup every
TSERV_SESSION_MAXIDLE/2 which is 10s.
+ //
+ // Putting the assumptions above together we know that if sessions are
closed in less than
+ // 20s, then they were closed as result of the scanner.close() method
initiating an RPC to
+ // remove the scan session. The 13s below allows time for the 10s
deferred cleanup to run in
+ // the case when a thread is present. The 3s cap the test puts on
running scans sets the total
+ // time the test will allow to 3s+13s=16s which is less than the 20s
when idle session clean
+ // starts.
- assertTrue(activeScans < 3);
+ Wait.waitFor(() -> countActiveScans(client, tableName) < 1, 13000, 250,
+ "Found active scans after closing all scanners. Expected to find no
scans");
+
+ var elasped = timer.elapsed(TimeUnit.MILLISECONDS);
+ if (elasped > 20000) {
+ log.warn(
+ "Total time since first scan was run {}ms. Unable to verify that
scanner RPC closed "
+ + "sessions, could have been closed by idle session cleanup.",
+ elasped);
+ } else {
+ log.debug("Total time since first scan was run {}ms.", elasped);
+ }
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 57c511f6ae..f767df4fdf 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.CloseScannerIT;
import org.apache.accumulo.test.util.Wait;
import org.junit.jupiter.api.Test;
@@ -118,6 +119,9 @@ public class ScannerIT extends AccumuloClusterHarness {
}
}
+ /**
+ * {@link CloseScannerIT#testManyScans()} is a similar test.
+ */
@Test
public void testSessionCleanup() throws Exception {
final String tableName = getUniqueNames(1)[0];