This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 01fd2ccb25 scan session id now available when session cleanup is
deferred (#4902)
01fd2ccb25 is described below
commit 01fd2ccb25a3bc7d673308dddb8b88dd4c032b04
Author: Kevin Rathbun <[email protected]>
AuthorDate: Thu Oct 3 09:51:59 2024 -0400
scan session id now available when session cleanup is deferred (#4902)
Previously, when a scan session was attempted to be cleaned up but couldn't
and was deferred for later, the scan session id would still be visible from
things like the `listscans` command but would have a value of -1. This change:
- makes it so that the session id is still available in this situation
- improves logging related to scan session ids and logs the scan session
ids in more places
- tests that invalid (0 = never set, -1 = no longer tracking) scan session
ids are no longer seen
closes #4842
---
.../accumulo/core/clientImpl/ThriftScanner.java | 5 +-
.../org/apache/accumulo/tserver/ScanServer.java | 19 ++++----
.../accumulo/tserver/ThriftScanClientHandler.java | 11 +++--
.../apache/accumulo/tserver/session/Session.java | 18 +++++++-
.../accumulo/tserver/session/SessionManager.java | 54 ++++++----------------
.../org/apache/accumulo/test/ZombieScanIT.java | 29 ++++++++++++
6 files changed, 77 insertions(+), 59 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index e70b0637d8..4703ac8c08 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -389,8 +389,8 @@ public class ThriftScanner {
// call the scan server selector and just go back to the previous scan
server
addr = scanState.prevLoc;
log.trace(
- "For tablet {} continuing scan on scan server {} without consulting
scan server selector, using busyTimeout {}",
- loc.getExtent(), addr.serverAddress, scanState.busyTimeout);
+ "For tablet {} continuing scan {} on scan server {} without
consulting scan server selector, using busyTimeout {}",
+ loc.getExtent(), scanState.scanID, addr.serverAddress,
scanState.busyTimeout);
} else {
var tabletId = new TabletIdImpl(loc.getExtent());
// obtain a snapshot once and only expose this snapshot to the plugin
for consistency
@@ -898,7 +898,6 @@ public class ThriftScanner {
}
} else {
- // log.debug("Calling continue scan : "+scanState.range+" loc = "+loc);
String msg =
"Continuing scan tserver=" + addr.serverAddress + " scanid=" +
scanState.scanID;
Thread.currentThread().setName(msg);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 319241dc41..4dc21eb487 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -136,7 +136,7 @@ import com.google.common.net.HostAndPort;
public class ScanServer extends AbstractServer
implements TabletScanClientService.Iface, TabletHostingServer {
- private static final Logger log = LoggerFactory.getLogger(ScanServer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
private static class TabletMetadataLoader implements
CacheLoader<KeyExtent,TabletMetadata> {
@@ -173,8 +173,6 @@ public class ScanServer extends AbstractServer
}
}
- private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
-
protected ThriftScanClientHandler delegate;
private UUID serverLockUUID;
private final TabletMetadataLoader tabletMetadataLoader;
@@ -212,8 +210,8 @@ public class ScanServer extends AbstractServer
super("sserver", opts, ServerContext::new, args);
context = super.getContext();
- log.info("Version " + Constants.VERSION);
- log.info("Instance " + getContext().getInstanceID());
+ LOG.info("Version " + Constants.VERSION);
+ LOG.info("Instance " + getContext().getInstanceID());
this.sessionManager = new SessionManager(context);
this.resourceManager = new TabletServerResourceManager(context, this);
@@ -433,11 +431,11 @@ public class ScanServer extends AbstractServer
// thread to look for log sorting work in the future
logSorter.startWatchingForRecoveryLogs(threadPoolSize);
} catch (Exception ex) {
- log.error("Error starting LogSorter");
+ LOG.error("Error starting LogSorter");
throw new RuntimeException(ex);
}
} else {
- log.info(
+ LOG.info(
"Log sorting for tablet recovery is disabled,
SSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
}
@@ -987,6 +985,7 @@ public class ScanServer extends AbstractServer
batchTimeOut, classLoaderContext, executionHints,
getScanTabletResolver(tablet),
busyTimeout);
+ LOG.trace("started scan: {}", is.getScanID());
return is;
} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
@@ -1058,16 +1057,16 @@ public class ScanServer extends AbstractServer
ssio, authorizations, waitForWrites, tSamplerConfig, batchTimeOut,
contextArg,
executionHints, getBatchScanTabletResolver(tablets), busyTimeout);
- LOG.trace("started scan: {}", ims.getScanID());
+ LOG.trace("started multi scan: {}", ims.getScanID());
return ims;
} catch (ScanServerBusyException be) {
scanServerMetrics.incrementBusy();
throw be;
} catch (TException e) {
- LOG.error("Error starting scan", e);
+ LOG.error("Error starting multi scan", e);
throw e;
} catch (AccumuloException e) {
- LOG.error("Error starting scan", e);
+ LOG.error("Error starting multi scan", e);
throw new RuntimeException(e);
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
index ee4cab0508..9344b1c4ce 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
@@ -336,8 +336,8 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
long t2 = System.currentTimeMillis();
if (log.isTraceEnabled()) {
- log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs,
nbTimes = [%s] ",
- TServerUtils.clientAddress.get(), ss.extent.tableId(),
ss.entriesReturned,
+ log.trace(String.format("ScanSess %d tid %s %s %,d entries in %.2f
secs, nbTimes = [%s] ",
+ scanID, TServerUtils.clientAddress.get(), ss.extent.tableId(),
ss.entriesReturned,
(t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
}
@@ -533,10 +533,11 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
if (log.isTraceEnabled()) {
log.trace(String.format(
- "MultiScanSess %s %,d entries in %.2f secs"
+ "MultiScanSess %d %s %,d entries in %.2f secs"
+ " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ",
- TServerUtils.clientAddress.get(), session.numEntries, (t2 -
session.startTime) / 1000.0,
- session.totalLookupTime / 1000.0, session.numTablets,
session.numRanges));
+ scanID, TServerUtils.clientAddress.get(), session.numEntries,
+ (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0,
session.numTablets,
+ session.numRanges));
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 29247247c3..fda1395405 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -18,12 +18,15 @@
*/
package org.apache.accumulo.tserver.session;
+import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.server.rpc.TServerUtils;
+import com.google.common.base.Preconditions;
+
public class Session {
enum State {
@@ -37,6 +40,7 @@ public class Session {
boolean allowReservation = true;
private final Timer stateChangeTimer = Timer.startNew();
private final TCredentials credentials;
+ private OptionalLong sessionId = OptionalLong.empty();
Session(TCredentials credentials) {
this.credentials = credentials;
@@ -66,13 +70,23 @@ public class Session {
return state;
}
+ public void setSessionId(long sessionId) {
+ Preconditions.checkState(this.sessionId.isEmpty());
+ this.sessionId = OptionalLong.of(sessionId);
+ }
+
+ public long getSessionId() {
+ Preconditions.checkState(this.sessionId.isPresent());
+ return sessionId.orElseThrow();
+ }
+
public long elaspedSinceStateChange(TimeUnit unit) {
return stateChangeTimer.elapsed(unit);
}
@Override
public String toString() {
- return getClass().getSimpleName() + " " + state + " startTime:" +
startTime + " lastAccessTime:"
- + lastAccessTime + " client:" + client;
+ return getClass().getSimpleName() + " " + state + " sessionId:" +
sessionId + " startTime:"
+ + startTime + " lastAccessTime:" + lastAccessTime + " client:" +
client;
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 564f25c019..3a7df5347d 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -24,14 +24,11 @@ import static
org.apache.accumulo.core.util.LazySingletons.RANDOM;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -61,7 +58,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
public class SessionManager {
private static final Logger log =
LoggerFactory.getLogger(SessionManager.class);
@@ -70,7 +66,6 @@ public class SessionManager {
private final long maxIdle;
private final long maxUpdateIdle;
private final BlockingQueue<Session> deferredCleanupQueue = new
ArrayBlockingQueue<>(5000);
- private final Long expiredSessionMarker = (long) -1;
private final ServerContext ctx;
private volatile LongConsumer zombieCountConsumer = null;
@@ -93,10 +88,10 @@ public class SessionManager {
Preconditions.checkArgument(session.getState() == State.NEW);
session.setState(reserve ? State.RESERVED : State.UNRESERVED);
session.startTime = session.lastAccessTime = System.currentTimeMillis();
- }
-
- while (sessions.putIfAbsent(sid, session) != null) {
- sid = RANDOM.get().nextLong();
+ while (sessions.putIfAbsent(sid, session) != null) {
+ sid = RANDOM.get().nextLong();
+ }
+ session.setSessionId(sid);
}
return sid;
@@ -312,8 +307,8 @@ public class SessionManager {
}
long idleTime = System.currentTimeMillis() - session.lastAccessTime;
if (idleTime > configuredIdle) {
- log.info("Closing idle session from user={}, client={},
idle={}ms", session.getUser(),
- session.client, idleTime);
+ log.info("Closing idle session {} from user={}, client={},
idle={}ms",
+ session.getSessionId(), session.getUser(), session.client,
idleTime);
iter.remove();
sessionsToCleanup.add(session);
session.setState(State.REMOVED);
@@ -382,8 +377,9 @@ public class SessionManager {
}
if (shouldRemove) {
- log.info("Closing not accessed session from user=" +
session2.getUser() + ", client="
- + session2.client + ", duration=" + delay + "ms");
+ log.info("Closing not accessed session " +
session2.getSessionId() + " from user="
+ + session2.getUser() + ", client=" + session2.client + ",
duration=" + delay
+ + "ms");
sessions.remove(sessionId);
cleanup(session2);
}
@@ -399,18 +395,7 @@ public class SessionManager {
public Map<TableId,MapCounter<ScanRunState>> getActiveScansPerTable() {
Map<TableId,MapCounter<ScanRunState>> counts = new HashMap<>();
- Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
-
- /*
- * Add sessions so that get the list returned in the active scans call
- */
- for (Session session : deferredCleanupQueue) {
- copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker,
session));
- }
-
- List.of(sessions.entrySet(), copiedIdleSessions).forEach(set ->
set.forEach(entry -> {
-
- Session session = entry.getValue();
+ Stream.concat(sessions.values().stream(),
deferredCleanupQueue.stream()).forEach(session -> {
ScanTask<?> nbt = null;
TableId tableID = null;
@@ -430,7 +415,7 @@ public class SessionManager {
counts.computeIfAbsent(tableID, unusedKey -> new
MapCounter<>()).increment(srs, 1);
}
}
- }));
+ });
return counts;
}
@@ -439,18 +424,8 @@ public class SessionManager {
final List<ActiveScan> activeScans = new ArrayList<>();
final long ct = System.currentTimeMillis();
- final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
-
- /*
- * Add sessions that get the list returned in the active scans call
- */
- for (Session session : deferredCleanupQueue) {
- copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker,
session));
- }
-
- List.of(sessions.entrySet(), copiedIdleSessions).forEach(s ->
s.forEach(entry -> {
- Session session = entry.getValue();
+ Stream.concat(sessions.values().stream(),
deferredCleanupQueue.stream()).forEach(session -> {
if (session instanceof ScanSession) {
ScanSession<?> scanSession = (ScanSession<?>) session;
boolean isSingle = session instanceof SingleScanSession;
@@ -459,9 +434,10 @@ public class SessionManager {
isSingle ? ((SingleScanSession) scanSession).extent
: ((MultiScanSession) scanSession).threadPoolExtent,
ct, isSingle ? ScanType.SINGLE : ScanType.BATCH,
- computeScanState(scanSession.getScanTask()),
scanSession.scanParams, entry.getKey());
+ computeScanState(scanSession.getScanTask()),
scanSession.scanParams,
+ session.getSessionId());
}
- }));
+ });
return activeScans;
}
diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
index 399a208d0a..60482e020c 100644
--- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
@@ -24,6 +24,7 @@ import static
org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
@@ -41,9 +42,12 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.ScanType;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
@@ -240,6 +244,10 @@ public class ZombieScanIT extends ConfigurableMacBase {
}
assertEquals(4, tabletSeversWithZombieScans.size());
+ // This check may be outside the scope of this test but works nicely for
this check and is
+ // simple enough to include
+ assertValidScanIds(c);
+
executor.shutdownNow();
}
@@ -400,4 +408,25 @@ public class ZombieScanIT extends ConfigurableMacBase {
.filter(metric ->
metric.getName().equals(SCAN_ZOMBIE_THREADS.getName()))
.mapToInt(metric ->
Integer.parseInt(metric.getValue())).max().orElse(-1);
}
+
+ /**
+ * Ensure that the scan session ids are valid (should not expect 0 or -1). 0
would mean the id was
+ * never set (previously existing bug). -1 previously indicated that the
scan session was no
+ * longer tracking the id (occurred for scans when cleanup was attempted but
deferred for later)
+ */
+ private void assertValidScanIds(AccumuloClient c)
+ throws AccumuloException, AccumuloSecurityException {
+ Set<Long> scanIds = new HashSet<>();
+ Set<ScanType> scanTypes = new HashSet<>();
+ for (String tserver : c.instanceOperations().getTabletServers()) {
+ c.instanceOperations().getActiveScans(tserver).forEach(activeScan -> {
+ scanIds.add(activeScan.getScanid());
+ scanTypes.add(activeScan.getType());
+ });
+ }
+ assertNotEquals(0, scanIds.size());
+ scanIds.forEach(id -> assertTrue(id != 0L && id != -1L));
+ // ensure coverage of both batch and single scans
+ assertEquals(scanTypes, Set.of(ScanType.SINGLE, ScanType.BATCH));
+ }
}