This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 2bec74ad83 Bug fix no scan id for batch scans (#4871)
2bec74ad83 is described below
commit 2bec74ad83b0c4cbdf98ba67337f62aaf5806ce7
Author: Kevin Rathbun <[email protected]>
AuthorDate: Fri Sep 13 11:39:25 2024 -0400
Bug fix no scan id for batch scans (#4871)
* Bug fix no scan id for batch scans
closes #4868
- Batch scans scan ids are now set
- At the same time, got rid of code duplication and improved readability
of SessionManager.getActiveScans()
- Added functionality to test this case in ScanIdIT
---
.../accumulo/tserver/session/SessionManager.java | 109 +++++++---------
.../apache/accumulo/test/functional/ScanIdIT.java | 139 +++++++++++++++------
2 files changed, 144 insertions(+), 104 deletions(-)
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index d32dbcd14c..497287cda6 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -47,7 +47,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.ScanState;
import org.apache.accumulo.core.tabletserver.thrift.ScanType;
@@ -55,10 +55,10 @@ import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.tserver.scan.ScanParameters;
import org.apache.accumulo.tserver.scan.ScanRunState;
import org.apache.accumulo.tserver.scan.ScanTask;
import org.apache.accumulo.tserver.session.Session.State;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -453,7 +453,7 @@ public class SessionManager {
final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
/**
- * Add sessions so that get the list returned in the active scans call
+ * Add sessions that get the list returned in the active scans call
*/
for (Session session : deferredCleanupQueue) {
copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker,
session));
@@ -461,75 +461,56 @@ public class SessionManager {
List.of(sessions.entrySet(), copiedIdleSessions).forEach(s ->
s.forEach(entry -> {
Session session = entry.getValue();
- if (session instanceof SingleScanSession) {
- SingleScanSession ss = (SingleScanSession) session;
-
- ScanState state = ScanState.RUNNING;
-
- ScanTask<ScanBatch> nbt = ss.getScanTask();
- if (nbt == null) {
- state = ScanState.IDLE;
- } else {
- switch (nbt.getScanRunState()) {
- case QUEUED:
- state = ScanState.QUEUED;
- break;
- case FINISHED:
- state = ScanState.IDLE;
- break;
- case RUNNING:
- default:
- /* do nothing */
- break;
- }
- }
- var params = ss.scanParams;
- ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(),
- ss.extent.tableId().canonical(), ct - ss.startTime, ct -
ss.lastAccessTime,
- ScanType.SINGLE, state, ss.extent.toThrift(),
-
params.getColumnSet().stream().map(Column::toThrift).collect(Collectors.toList()),
- params.getSsiList(), params.getSsio(),
params.getAuthorizations().getAuthorizationsBB(),
- params.getClassLoaderContext());
+ if (session instanceof ScanSession) {
+ ScanSession<?> scanSession = (ScanSession<?>) session;
+ boolean isSingle = session instanceof SingleScanSession;
- // scanId added by ACCUMULO-2641 is an optional thrift argument and
not available in
- // ActiveScan constructor
- activeScan.setScanId(entry.getKey());
- activeScans.add(activeScan);
+ addActiveScan(activeScans, scanSession,
+ isSingle ? ((SingleScanSession) scanSession).extent
+ : ((MultiScanSession) scanSession).threadPoolExtent,
+ ct, isSingle ? ScanType.SINGLE : ScanType.BATCH,
+ computeScanState(scanSession.getScanTask()),
scanSession.scanParams, entry.getKey());
+ }
+ }));
- } else if (session instanceof MultiScanSession) {
- MultiScanSession mss = (MultiScanSession) session;
+ return activeScans;
+ }
- ScanState state = ScanState.RUNNING;
+ private ScanState computeScanState(ScanTask<?> scanTask) {
+ ScanState state = ScanState.RUNNING;
- ScanTask<MultiScanResult> nbt = mss.getScanTask();
- if (nbt == null) {
+ if (scanTask == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (scanTask.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
state = ScanState.IDLE;
- } else {
- switch (nbt.getScanRunState()) {
- case QUEUED:
- state = ScanState.QUEUED;
- break;
- case FINISHED:
- state = ScanState.IDLE;
- break;
- case RUNNING:
- default:
- /* do nothing */
- break;
- }
- }
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
- var params = mss.scanParams;
- activeScans.add(new ActiveScan(mss.client, mss.getUser(),
- mss.threadPoolExtent.tableId().canonical(), ct - mss.startTime, ct
- mss.lastAccessTime,
- ScanType.BATCH, state, mss.threadPoolExtent.toThrift(),
+ return state;
+ }
+
+ private void addActiveScan(List<ActiveScan> activeScans, Session session,
KeyExtent extent,
+ long ct, ScanType scanType, ScanState state, ScanParameters params, long
scanId) {
+ ActiveScan activeScan =
+ new ActiveScan(session.client, session.getUser(),
extent.tableId().canonical(),
+ ct - session.startTime, ct - session.lastAccessTime, scanType,
state, extent.toThrift(),
params.getColumnSet().stream().map(Column::toThrift).collect(Collectors.toList()),
params.getSsiList(), params.getSsio(),
params.getAuthorizations().getAuthorizationsBB(),
- params.getClassLoaderContext()));
- }
- }));
-
- return activeScans;
+ params.getClassLoaderContext());
+ // scanId added by ACCUMULO-2641 is an optional thrift argument and not
available in
+ // ActiveScan constructor
+ activeScan.setScanId(scanId);
+ activeScans.add(activeScan);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
index c880052a1d..4f7139c6a8 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@ -19,13 +19,14 @@
package org.apache.accumulo.test.functional;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@@ -44,6 +45,7 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -86,20 +88,17 @@ import org.slf4j.LoggerFactory;
public class ScanIdIT extends AccumuloClusterHarness {
private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
-
- private static final int NUM_SCANNERS = 8;
-
+ private static final int NUM_SINGLE_SCANNERS = 8;
+ private static final int NUM_BATCH_SCANNERS = 1;
+ private static final int NUM_TOTAL_SCANNERS = NUM_SINGLE_SCANNERS +
NUM_BATCH_SCANNERS;
private static final int NUM_DATA_ROWS = 100;
-
- private static final ExecutorService pool =
Executors.newFixedThreadPool(NUM_SCANNERS);
-
+ private static final ExecutorService pool =
Executors.newFixedThreadPool(NUM_TOTAL_SCANNERS);
private static final AtomicBoolean testInProgress = new AtomicBoolean(true);
-
private static final Map<Integer,Value> resultsByWorker = new
ConcurrentHashMap<>();
@Override
protected Duration defaultTimeout() {
- return Duration.ofMinutes(1);
+ return Duration.ofMinutes(2);
}
/**
@@ -122,19 +121,28 @@ public class ScanIdIT extends AccumuloClusterHarness {
attachSlowIterator(client, tableName);
- CountDownLatch latch = new CountDownLatch(NUM_SCANNERS);
+ CountDownLatch latch = new CountDownLatch(NUM_TOTAL_SCANNERS);
- List<ScannerThread> scanThreadsToClose = new ArrayList<>(NUM_SCANNERS);
- for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
+ // Ensure that scan ids are working as expected for both Scanner and
BatchScanner
+ List<ScannerThread> scanThreadsToClose = new
ArrayList<>(NUM_SINGLE_SCANNERS);
+ List<BatchScannerThread> batchScanThreadsToClose = new
ArrayList<>(NUM_BATCH_SCANNERS);
+ // workers 0 through NUM_SINGLE_SCANNERS - 1 use Scanner
+ for (int scannerIndex = 0; scannerIndex < NUM_SINGLE_SCANNERS;
scannerIndex++) {
ScannerThread st = new ScannerThread(client, scannerIndex, tableName,
latch);
scanThreadsToClose.add(st);
pool.execute(st);
}
+ // workers NUM_SINGLE_SCANNER through NUM_TOTAL_SCANNERS - 1 use
BatchScanner
+ for (int bsIndex = NUM_SINGLE_SCANNERS; bsIndex < NUM_TOTAL_SCANNERS;
bsIndex++) {
+ BatchScannerThread bst = new BatchScannerThread(client, bsIndex,
tableName, latch);
+ batchScanThreadsToClose.add(bst);
+ pool.execute(bst);
+ }
// wait for scanners to report a result.
while (testInProgress.get()) {
- if (resultsByWorker.size() < NUM_SCANNERS) {
+ if (resultsByWorker.size() < NUM_TOTAL_SCANNERS) {
log.trace("Results reported {}", resultsByWorker.size());
sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
} else {
@@ -150,21 +158,19 @@ public class ScanIdIT extends AccumuloClusterHarness {
}
Set<Long> scanIds = getScanIds(client);
- assertTrue(scanIds.size() >= NUM_SCANNERS,
- "Expected at least " + NUM_SCANNERS + " scanIds, but saw " +
scanIds.size());
+ assertTrue(scanIds.size() >= NUM_TOTAL_SCANNERS,
+ "Expected at least " + NUM_TOTAL_SCANNERS + " scanIds, but saw " +
scanIds.size());
+ // A scan id should have been set regardless of whether a Scanner or
BatchScanner was used
+ scanIds.forEach(scanId -> assertNotEquals(0L, scanId, "saw a scanId that
was never set"));
- scanThreadsToClose.forEach(st -> {
- if (st.scanner != null) {
- st.scanner.close();
- }
- });
+ // Close all scanners. All should be non-null, test should fail (NPE)
otherwise
+ scanThreadsToClose.forEach(st -> st.scanner.close());
+ batchScanThreadsToClose.forEach(bst -> bst.bs.close());
- while (!(scanIds = getScanIds(client)).isEmpty()) {
+ while (!getScanIds(client).isEmpty()) {
log.debug("Waiting for active scans to stop...");
Thread.sleep(200);
}
- assertEquals(0, scanIds.size(), "Expected no scanIds after closing
scanners");
-
}
}
@@ -236,33 +242,26 @@ public class ScanIdIT extends AccumuloClusterHarness {
*/
@Override
public void run() {
-
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
- log.error("Thread interrupted with id {}", workerIndex);
+ log.error("ScannerThread interrupted with id {}", workerIndex);
Thread.currentThread().interrupt();
return;
}
- log.debug("Creating scanner in worker thread {}", workerIndex);
-
+ log.debug("Creating Scanner in ScannerThread worker {}", workerIndex);
try {
-
scanner = accumuloClient.createScanner(tablename, new
Authorizations());
-
// Never start readahead
scanner.setReadaheadThreshold(Long.MAX_VALUE);
scanner.setBatchSize(1);
-
// create different ranges to try to hit more than one tablet.
scanner.setRange(new Range(new Text(Integer.toString(workerIndex)),
new Text("9")));
-
scanner.fetchColumnFamily(new Text("fam1"));
for (Map.Entry<Key,Value> entry : scanner) {
-
// exit when success condition is met.
if (!testInProgress.get()) {
scanner.clearScanIterators();
@@ -271,29 +270,89 @@ public class ScanIdIT extends AccumuloClusterHarness {
Text row = entry.getKey().getRow();
- log.debug("worker {}, row {}", workerIndex, row);
+ log.debug("ScannerThread worker {}, row {}", workerIndex, row);
if (entry.getValue() != null) {
-
Value prevValue = resultsByWorker.put(workerIndex,
entry.getValue());
-
// value should always being increasing
if (prevValue != null) {
-
- log.trace("worker {} values {}", workerIndex,
+ log.trace("ScannerThread worker {} values {}", workerIndex,
String.format("%1$s < %2$s", prevValue, entry.getValue()));
-
assertTrue(prevValue.compareTo(entry.getValue()) > 0);
}
} else {
log.info("Scanner returned null");
fail("Scanner returned unexpected null value");
}
-
}
log.debug("Scanner ran out of data. (info only, not an error) ");
} catch (TableNotFoundException e) {
- throw new IllegalStateException("Initialization failure. Could not
create scanner", e);
+ throw new IllegalStateException("Initialization failure. Could not
create Scanner", e);
+ } finally {
+ // don't close scanner here, because it will clean up the scan ids
we're checking for
+ }
+ }
+ }
+
+ /**
+ * Runs BatchScanner in separate thread to allow multiple scanners to
execute in parallel.
+ * <p>
+ * The thread run method is terminated when the testInProgress flag is set
to false.
+ */
+ private static class BatchScannerThread implements Runnable {
+ private final AccumuloClient accumuloClient;
+ private BatchScanner bs;
+ private final int workerIndex;
+ private final String tableName;
+ private final CountDownLatch latch;
+
+ public BatchScannerThread(AccumuloClient accumuloClient, int workerIndex,
String tableName,
+ CountDownLatch latch) {
+ this.accumuloClient = accumuloClient;
+ this.workerIndex = workerIndex;
+ this.tableName = tableName;
+ this.latch = latch;
+ }
+
+ @Override
+ public void run() {
+ latch.countDown();
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ log.error("BatchScannerThread interrupted with id {}", workerIndex);
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ log.debug("Creating BatchScanner in BatchScannerThread worker {}",
workerIndex);
+ try {
+ bs = accumuloClient.createBatchScanner(tableName);
+ bs.setRanges(Collections.singletonList(new Range()));
+ bs.fetchColumnFamily(new Text("fam1"));
+
+ for (Map.Entry<Key,Value> entry : bs) {
+ // exit when success condition is met.
+ if (!testInProgress.get()) {
+ bs.clearScanIterators();
+ return;
+ }
+
+ Text row = entry.getKey().getRow();
+
+ log.debug("BatchScannerThread worker {}, row {}", workerIndex, row);
+
+ if (entry.getValue() != null) {
+ resultsByWorker.put(workerIndex, entry.getValue());
+ // should not check that the values are increasing since this is a
BatchScanner
+ } else {
+ log.info("BatchScanner returned null");
+ fail("BatchScanner returned unexpected null value");
+ }
+ }
+ log.debug("BatchScanner ran out of data. (info only, not an error) ");
+ } catch (TableNotFoundException | AccumuloSecurityException |
AccumuloException e) {
+ throw new IllegalStateException("Initialization failure. Could not
create BatchScanner", e);
} finally {
// don't close scanner here, because it will clean up the scan ids
we're checking for
}