This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new f3b7a300a4 adapts new scan session tests to also run on scan servers
(#4844)
f3b7a300a4 is described below
commit f3b7a300a41b23d45991edecbc19695c1c790dbb
Author: Keith Turner <[email protected]>
AuthorDate: Fri Sep 13 12:31:17 2024 -0400
adapts new scan session tests to also run on scan servers (#4844)
New ITs were added in #4840 and #4841 for zombie scan metrics and
ensuring that scanners close scan sessions. Ths commit adapts these new
test to also run against scan servers. For the zombie scan case found a
bug in the scan sever where the session manager cleanup thread would get
stuck forever in SnapshotTablet.close() preventing any further session
cleanup. Fixed this bug by changing SnapshotTablet.close() to no longer
wait forever for scans to finish.
---
.../core/client/admin/InstanceOperations.java | 1 +
.../accumulo/tserver/tablet/SnapshotTablet.java | 15 ++---
.../org/apache/accumulo/tserver/tablet/Tablet.java | 9 ---
.../apache/accumulo/tserver/tablet/TabletBase.java | 9 +++
.../org/apache/accumulo/test/CloseScannerIT.java | 9 ++-
.../org/apache/accumulo/test/ZombieScanIT.java | 61 +++++++++++++------
.../test/functional/ScanSessionTimeOutIT.java | 11 ++--
.../apache/accumulo/test/functional/ScannerIT.java | 71 +++++++++++++++++-----
8 files changed, 125 insertions(+), 61 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
index 033dacbe94..cce51b7981 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
@@ -176,6 +176,7 @@ public interface InstanceOperations {
* Returns the locations of the active scan servers
*
* @return A set of currently active scan servers.
+ * @since 2.1.0
*/
Set<String> getScanServers();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java
index 1f0f754082..985f7339dd 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java
@@ -127,16 +127,11 @@ public class SnapshotTablet extends TabletBase {
for (ScanDataSource activeScan : activeScans) {
activeScan.interrupt();
- }
-
- // wait for reads and writes to complete
- while (!activeScans.isEmpty()) {
- try {
- log.debug("Closing tablet {} waiting for {} scans", extent,
activeScans.size());
- this.wait(50);
- } catch (InterruptedException e) {
- log.error(e.toString());
- }
+ // The tablet server will wait for this to return true because it wants
to be sure no scans
+ // will run after the tablet is unloaded to ensure immediate
consistency. Scans running in the
+ // scan server are eventually consistent, so there is no need to wait
here. Disallow future
+ // activity on the scan session so it can be cleaned up eventually, but
do not need to wait.
+ disallowNewReservations(activeScan.getScanParameters());
}
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index f229799f6d..70c200bb7f 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1131,15 +1131,6 @@ public class Tablet extends TabletBase {
}
}
- private boolean disallowNewReservations(ScanParameters scanParameters) {
- var scanSessId = scanParameters.getScanSessionId();
- if (scanSessId != null) {
- return
getTabletServer().getSessionManager().disallowNewReservations(scanSessId);
- } else {
- return true;
- }
- }
-
private void closeConsistencyCheck() {
long num = tabletMemory.getMemTable().getNumEntries();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
index 12890106d4..dcc54e9da7 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@ -110,6 +110,15 @@ public abstract class TabletBase {
}
}
+ protected boolean disallowNewReservations(ScanParameters scanParameters) {
+ var scanSessId = scanParameters.getScanSessionId();
+ if (scanSessId != null) {
+ return server.getSessionManager().disallowNewReservations(scanSessId);
+ } else {
+ return true;
+ }
+ }
+
public abstract boolean isClosed();
public abstract SortedMap<StoredTabletFile,DataFileValue> getDatafiles();
diff --git a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
index f6c157fd2b..6a4c4f6812 100644
--- a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
@@ -27,11 +27,13 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ReadWriteIT;
import org.apache.accumulo.test.util.Wait;
@@ -55,7 +57,8 @@ public class CloseScannerIT extends AccumuloClusterHarness {
}
/**
- * {@link
org.apache.accumulo.test.functional.ScannerIT#testSessionCleanup()} is a
similar test.
+ * {@link
org.apache.accumulo.test.functional.ScannerIT#testSessionCleanup(ScannerBase.ConsistencyLevel)}
+ * is a similar test.
*/
@Test
public void testManyScans() throws Exception {
@@ -104,8 +107,8 @@ public class CloseScannerIT extends AccumuloClusterHarness {
// time the test will allow to 3s+13s=16s which is less than the 20s
when idle session clean
// starts.
- Wait.waitFor(() -> countActiveScans(client, tableName) < 1, 13000, 250,
- "Found active scans after closing all scanners. Expected to find no
scans");
+ Wait.waitFor(() -> countActiveScans(client, ServerType.TABLET_SERVER,
tableName) < 1, 13000,
+ 250, "Found active scans after closing all scanners. Expected to
find no scans");
var elasped = timer.elapsed(TimeUnit.MILLISECONDS);
if (elasped > 20000) {
diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
index e8c3a47a3e..bd0be182c0 100644
--- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
@@ -18,6 +18,9 @@
*/
package org.apache.accumulo.test;
+import static
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE;
+import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -38,6 +41,7 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
@@ -58,6 +62,8 @@ import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
public class ZombieScanIT extends ConfigurableMacBase {
@@ -242,8 +248,9 @@ public class ZombieScanIT extends ConfigurableMacBase {
/**
* Create some zombie scans and ensure metrics for them show up.
*/
- @Test
- public void testMetrics() throws Exception {
+ @ParameterizedTest
+ @EnumSource
+ public void testMetrics(ConsistencyLevel consistency) throws Exception {
Wait.waitFor(() -> {
var zsmc = getZombieScansMetric();
@@ -252,8 +259,18 @@ public class ZombieScanIT extends ConfigurableMacBase {
String table = getUniqueNames(1)[0];
+ final ServerType serverType = consistency == IMMEDIATE ? TABLET_SERVER :
SCAN_SERVER;
+
try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
+ if (serverType == SCAN_SERVER) {
+ getCluster().getConfig().setNumScanServers(1);
+ getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+ // Scans will fall back to tablet servers when no scan servers are
present. So wait for scan
+ // servers to show up in zookeeper. Can remove this in 3.1.
+ Wait.waitFor(() -> !c.instanceOperations().getScanServers().isEmpty());
+ }
+
c.tableOperations().create(table);
var executor = Executors.newCachedThreadPool();
@@ -262,21 +279,21 @@ public class ZombieScanIT extends ConfigurableMacBase {
List<Future<String>> futures = new ArrayList<>();
for (var row : List.of("2", "4")) {
// start a scan with an iterator that gets stuck and can not be
interrupted
- futures.add(startStuckScan(c, table, executor, row, false));
+ futures.add(startStuckScan(c, table, executor, row, false,
consistency));
// start a scan with an iterator that gets stuck and can be interrupted
- futures.add(startStuckScan(c, table, executor, row, true));
+ futures.add(startStuckScan(c, table, executor, row, true,
consistency));
}
// start four stuck scans, using a batch scanner, that should never
return data
for (var row : List.of("6", "8")) {
// start a scan with an iterator that gets stuck and can not be
interrupted
- futures.add(startStuckBatchScan(c, table, executor, row, false));
+ futures.add(startStuckBatchScan(c, table, executor, row, false,
consistency));
// start a scan with an iterator that gets stuck and can be interrupted
- futures.add(startStuckBatchScan(c, table, executor, row, true));
+ futures.add(startStuckBatchScan(c, table, executor, row, true,
consistency));
}
// should eventually see the eight stuck scans running
- Wait.waitFor(() -> countActiveScans(c, table) == 8);
+ Wait.waitFor(() -> countActiveScans(c, serverType, table) == 8);
// Cancel the scan threads. This will cause the sessions on the server
side to timeout and
// become inactive. The stuck threads on the server side related to the
timed out sessions
@@ -287,20 +304,20 @@ public class ZombieScanIT extends ConfigurableMacBase {
});
// Four of the eight running scans should respond to thread interrupts
and exit
- Wait.waitFor(() -> countActiveScans(c, table) == 4);
+ Wait.waitFor(() -> countActiveScans(c, serverType, table) == 4);
Wait.waitFor(() -> getZombieScansMetric() == 4);
- assertEquals(4, countActiveScans(c, table));
+ assertEquals(4, countActiveScans(c, serverType, table));
// start four more stuck scans with two that will ignore interrupts
futures.clear();
- futures.add(startStuckScan(c, table, executor, "0", false));
- futures.add(startStuckScan(c, table, executor, "0", true));
- futures.add(startStuckBatchScan(c, table, executor, "99", false));
- futures.add(startStuckBatchScan(c, table, executor, "0", true));
+ futures.add(startStuckScan(c, table, executor, "0", false, consistency));
+ futures.add(startStuckScan(c, table, executor, "0", true, consistency));
+ futures.add(startStuckBatchScan(c, table, executor, "99", false,
consistency));
+ futures.add(startStuckBatchScan(c, table, executor, "0", true,
consistency));
- Wait.waitFor(() -> countActiveScans(c, table) == 8);
+ Wait.waitFor(() -> countActiveScans(c, serverType, table) == 8);
// Cancel the client side scan threads. Should cause the server side
threads to be
// interrupted.
@@ -310,15 +327,19 @@ public class ZombieScanIT extends ConfigurableMacBase {
});
// Two of the stuck threads should respond to interrupts on the server
side and exit.
- Wait.waitFor(() -> countActiveScans(c, table) == 6);
+ Wait.waitFor(() -> countActiveScans(c, serverType, table) == 6);
Wait.waitFor(() -> getZombieScansMetric() == 6);
- assertEquals(6, countActiveScans(c, table));
+ assertEquals(6, countActiveScans(c, serverType, table));
executor.shutdownNow();
+ } finally {
+ if (serverType == SCAN_SERVER) {
+ getCluster().getConfig().setNumScanServers(0);
+ getCluster().getClusterControl().stopAllServers(SCAN_SERVER);
+ }
}
-
}
private static long countLocations(String table, AccumuloClient client)
throws Exception {
@@ -341,7 +362,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
}
private Future<String> startStuckScan(AccumuloClient c, String table,
ExecutorService executor,
- String row, boolean canInterrupt) {
+ String row, boolean canInterrupt, ConsistencyLevel consistency) {
return executor.submit(() -> {
try (var scanner = c.createScanner(table)) {
String className;
@@ -351,6 +372,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
className = ZombieIterator.class.getName();
}
IteratorSetting iter = new IteratorSetting(100, "Z", className);
+ scanner.setConsistencyLevel(consistency);
scanner.addScanIterator(iter);
scanner.setRange(new Range(row));
return scanner.stream().findFirst().map(e ->
e.getKey().getRowData().toString())
@@ -360,7 +382,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
}
private Future<String> startStuckBatchScan(AccumuloClient c, String table,
- ExecutorService executor, String row, boolean canInterrupt) {
+ ExecutorService executor, String row, boolean canInterrupt,
ConsistencyLevel consistency) {
return executor.submit(() -> {
try (var scanner = c.createBatchScanner(table)) {
String className;
@@ -373,6 +395,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
IteratorSetting iter = new IteratorSetting(100, "Z", className);
scanner.addScanIterator(iter);
scanner.setRanges(List.of(new Range(row)));
+ scanner.setConsistencyLevel(consistency);
return scanner.stream().findFirst().map(e ->
e.getKey().getRowData().toString())
.orElse("none");
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
index 741a02a18a..8b7bdaa7d4 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.test.functional;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -121,24 +122,24 @@ public class ScanSessionTimeOutIT extends
AccumuloClusterHarness {
verify(iter, 0, 200);
// There should be a scan session open since not all data was read
from the iterator
- assertEquals(1L, countActiveScans(c, tableName));
+ assertEquals(1L, countActiveScans(c, TABLET_SERVER, tableName));
// sleep three times the session timeout
sleepUninterruptibly(9, TimeUnit.SECONDS);
// The scan session should have timed out and the next read should
create a new one
- assertEquals(0L, countActiveScans(c, tableName));
+ assertEquals(0L, countActiveScans(c, TABLET_SERVER, tableName));
verify(iter, 200, 50000);
// Reading part of the data in the range should cause a new scan
session to be created
- assertEquals(1L, countActiveScans(c, tableName));
+ assertEquals(1L, countActiveScans(c, TABLET_SERVER, tableName));
verify(iter, 50000, 100000);
// Once all of the data in the range was read the scanner should
automatically close the
// scan session
- assertEquals(0L, countActiveScans(c, tableName));
+ assertEquals(0L, countActiveScans(c, TABLET_SERVER, tableName));
}
// Nothing should have created any ew scan sessions for the table
- assertEquals(0L, countActiveScans(c, tableName));
+ assertEquals(0L, countActiveScans(c, TABLET_SERVER, tableName));
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index f767df4fdf..48db8bbe29 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -18,10 +18,15 @@
*/
package org.apache.accumulo.test.functional;
+import static
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL;
+import static
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE;
+import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Duration;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
@@ -31,6 +36,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -38,12 +44,14 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.CloseScannerIT;
import org.apache.accumulo.test.util.Wait;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
-public class ScannerIT extends AccumuloClusterHarness {
+public class ScannerIT extends ConfigurableMacBase {
@Override
protected Duration defaultTimeout() {
@@ -53,7 +61,7 @@ public class ScannerIT extends AccumuloClusterHarness {
@Test
public void testScannerReadaheadConfiguration() throws Exception {
final String table = getUniqueNames(1)[0];
- try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
c.tableOperations().create(table);
try (BatchWriter bw = c.createBatchWriter(table)) {
@@ -122,10 +130,20 @@ public class ScannerIT extends AccumuloClusterHarness {
/**
* {@link CloseScannerIT#testManyScans()} is a similar test.
*/
- @Test
- public void testSessionCleanup() throws Exception {
+ @ParameterizedTest
+ @EnumSource
+ public void testSessionCleanup(ConsistencyLevel consistency) throws
Exception {
final String tableName = getUniqueNames(1)[0];
- try (AccumuloClient accumuloClient =
Accumulo.newClient().from(getClientProps()).build()) {
+ final ServerType serverType = consistency == IMMEDIATE ? TABLET_SERVER :
SCAN_SERVER;
+ try (AccumuloClient accumuloClient =
Accumulo.newClient().from(getClientProperties()).build()) {
+
+ if (serverType == SCAN_SERVER) {
+ getCluster().getConfig().setNumScanServers(1);
+ getCluster().getClusterControl().startAllServers(SCAN_SERVER);
+ // Scans will fall back to tablet servers when no scan servers are
present. So wait for scan
+ // servers to show up in zookeeper. Can remove this in 3.1.
+ Wait.waitFor(() ->
!accumuloClient.instanceOperations().getScanServers().isEmpty());
+ }
accumuloClient.tableOperations().create(tableName);
@@ -137,6 +155,10 @@ public class ScannerIT extends AccumuloClusterHarness {
}
}
+ if (consistency == EVENTUAL) {
+ accumuloClient.tableOperations().flush(tableName, null, null, true);
+ }
+
// The test assumes the session timeout is configured to 1 minute,
validate this. Later in the
// test 10s is given for session to disappear and we want this 10s to be
much smaller than the
// configured session timeout.
@@ -147,53 +169,72 @@ public class ScannerIT extends AccumuloClusterHarness {
// closed that any open sessions will be closed.
for (int i = 0; i < 3; i++) {
try (var scanner = accumuloClient.createScanner(tableName)) {
+ scanner.setConsistencyLevel(consistency);
assertEquals(10, scanner.stream().limit(10).count());
assertEquals(10000, scanner.stream().limit(10000).count());
// since not all data in the range was read from the scanner it
should leave an active
// scan session per scanner iterator created
- assertEquals(2, countActiveScans(accumuloClient, tableName));
+ assertEquals(2, countActiveScans(accumuloClient, serverType,
tableName));
}
// When close is called on on the scanner it should close the scan
session. The session
// cleanup is async on the server because task may still be running
server side, but it
// should happen in less than the session timeout. Also the server
should start working on
// it immediately.
- Wait.waitFor(() -> countActiveScans(accumuloClient, tableName) == 0,
10000);
+ Wait.waitFor(() -> countActiveScans(accumuloClient, serverType,
tableName) == 0, 10000);
try (var scanner = accumuloClient.createBatchScanner(tableName)) {
+ scanner.setConsistencyLevel(consistency);
scanner.setRanges(List.of(new Range()));
assertEquals(10, scanner.stream().limit(10).count());
assertEquals(10000, scanner.stream().limit(10000).count());
- assertEquals(2, countActiveScans(accumuloClient, tableName));
+ assertEquals(2, countActiveScans(accumuloClient, serverType,
tableName));
}
- Wait.waitFor(() -> countActiveScans(accumuloClient, tableName) == 0,
10000);
+ Wait.waitFor(() -> countActiveScans(accumuloClient, serverType,
tableName) == 0, 10000);
}
// Test the case where all data is read from a scanner. In this case the
scanner should close
// the scan session at the end of the range even before the scanner
itself is closed.
for (int i = 0; i < 3; i++) {
try (var scanner = accumuloClient.createScanner(tableName)) {
+ scanner.setConsistencyLevel(consistency);
assertEquals(100000, scanner.stream().count());
assertEquals(100000, scanner.stream().count());
// The server side cleanup of the session should be able to happen
immediately in this
// case because nothing should be running on the server side to
fetch data because all
// data in the range was fetched.
- assertEquals(0, countActiveScans(accumuloClient, tableName));
+ assertEquals(0, countActiveScans(accumuloClient, serverType,
tableName));
}
try (var scanner = accumuloClient.createBatchScanner(tableName)) {
+ scanner.setConsistencyLevel(consistency);
scanner.setRanges(List.of(new Range()));
assertEquals(100000, scanner.stream().count());
assertEquals(100000, scanner.stream().count());
- assertEquals(0, countActiveScans(accumuloClient, tableName));
+ assertEquals(0, countActiveScans(accumuloClient, serverType,
tableName));
}
}
+ } finally {
+ if (serverType == SCAN_SERVER) {
+ getCluster().getConfig().setNumScanServers(0);
+ getCluster().getClusterControl().stopAllServers(SCAN_SERVER);
+ }
}
}
- public static long countActiveScans(AccumuloClient c, String tableName)
throws Exception {
+ public static long countActiveScans(AccumuloClient c, ServerType serverType,
String tableName)
+ throws Exception {
+ final Collection<String> servers;
+ if (serverType == TABLET_SERVER) {
+ servers = c.instanceOperations().getTabletServers();
+ } else if (serverType == SCAN_SERVER) {
+ servers = c.instanceOperations().getScanServers();
+ } else {
+ throw new IllegalArgumentException("Unsupported server type " +
serverType);
+ }
+
long count = 0;
- for (String tserver : c.instanceOperations().getTabletServers()) {
- count += c.instanceOperations().getActiveScans(tserver).stream()
+ for (String server : servers) {
+ count += c.instanceOperations().getActiveScans(server).stream()
.filter(activeScan ->
activeScan.getTable().equals(tableName)).count();
}
return count;