This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 368cdb36ae Update Ample to use SteadyTime for suspension (#4545)
368cdb36ae is described below
commit 368cdb36aefdc4ebe3f028feca2e425b418461dd
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Fri May 10 14:32:34 2024 -0400
Update Ample to use SteadyTime for suspension (#4545)
SteadyTime was added in #4494 which creates a strong type for tracking
the total duration for the time the cluster has had a manager. This time
is used for suspension so we this updates Ample to use the new type
instead of just a long.
---
.../org/apache/accumulo/core/logging/TabletLogger.java | 7 +++----
.../accumulo/core/metadata/SuspendingTServer.java | 17 ++++++++++-------
.../org/apache/accumulo/core/metadata/schema/Ample.java | 3 ++-
.../org/apache/accumulo/core/util/time/SteadyTime.java | 9 +++++++++
.../core/metadata/schema/TabletMetadataTest.java | 9 ++++++---
.../server/manager/state/LoggingTabletStateStore.java | 7 +++----
.../server/manager/state/MetaDataStateStore.java | 11 ++++++-----
.../accumulo/server/manager/state/TabletStateStore.java | 5 +++--
.../server/manager/state/ZooTabletStateStore.java | 3 ++-
.../accumulo/server/metadata/TabletMutatorBase.java | 3 ++-
.../org/apache/accumulo/manager/TabletGroupWatcher.java | 4 ++--
.../apache/accumulo/tserver/UnloadTabletHandler.java | 8 ++++++--
12 files changed, 54 insertions(+), 32 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
index d01356cfc7..2209e41c27 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
@@ -37,6 +36,7 @@ import
org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,10 +75,9 @@ public class TabletLogger {
locLog.debug("Loading {} on {}", extent, server);
}
- public static void suspended(KeyExtent extent, HostAndPort server, long
time, TimeUnit timeUnit,
+ public static void suspended(KeyExtent extent, HostAndPort server,
SteadyTime time,
int numWalogs) {
- locLog.debug("Suspended {} to {} at {} ms with {} walogs", extent, server,
- timeUnit.toMillis(time), numWalogs);
+ locLog.debug("Suspended {} to {} at {} ms with {} walogs", extent, server,
time, numWalogs);
}
public static void unsuspended(KeyExtent extent) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/SuspendingTServer.java
b/core/src/main/java/org/apache/accumulo/core/metadata/SuspendingTServer.java
index 9f59a30cd8..e481369a21 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/SuspendingTServer.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/SuspendingTServer.java
@@ -19,8 +19,10 @@
package org.apache.accumulo.core.metadata;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.time.SteadyTime;
import com.google.common.net.HostAndPort;
@@ -29,21 +31,22 @@ import com.google.common.net.HostAndPort;
*/
public class SuspendingTServer {
public final HostAndPort server;
- public final long suspensionTime;
+ public final SteadyTime suspensionTime;
- SuspendingTServer(HostAndPort server, long suspensionTime) {
+ SuspendingTServer(HostAndPort server, SteadyTime suspensionTime) {
this.server = Objects.requireNonNull(server);
- this.suspensionTime = suspensionTime;
+ this.suspensionTime = Objects.requireNonNull(suspensionTime);
}
public static SuspendingTServer fromValue(Value value) {
String valStr = value.toString();
String[] parts = valStr.split("[|]", 2);
- return new SuspendingTServer(HostAndPort.fromString(parts[0]),
Long.parseLong(parts[1]));
+ return new SuspendingTServer(HostAndPort.fromString(parts[0]),
+ SteadyTime.from(Long.parseLong(parts[1]), TimeUnit.MILLISECONDS));
}
- public static Value toValue(TServerInstance tServer, long suspensionTime) {
- return new Value(tServer.getHostPort() + "|" + suspensionTime);
+ public static Value toValue(TServerInstance tServer, SteadyTime
suspensionTime) {
+ return new Value(tServer.getHostPort() + "|" + suspensionTime.getMillis());
}
@Override
@@ -52,7 +55,7 @@ public class SuspendingTServer {
return false;
}
SuspendingTServer rhs = (SuspendingTServer) rhsObject;
- return server.equals(rhs.server) && suspensionTime == rhs.suspensionTime;
+ return server.equals(rhs.server) &&
suspensionTime.equals(rhs.suspensionTime);
}
@Override
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index fc71d4edf6..931e415774 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.hadoop.io.Text;
/**
@@ -299,7 +300,7 @@ public interface Ample {
TabletMutator deleteBulkFile(StoredTabletFile bulkref);
- TabletMutator putSuspension(TServerInstance tserver, long suspensionTime);
+ TabletMutator putSuspension(TServerInstance tserver, SteadyTime
suspensionTime);
TabletMutator deleteSuspension();
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java
b/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java
index a94ae0ce55..4007b514c0 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/time/SteadyTime.java
@@ -52,6 +52,10 @@ public class SteadyTime implements Comparable<SteadyTime> {
return time;
}
+ public Duration minus(SteadyTime other) {
+ return time.minus(other.getDuration());
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -74,6 +78,11 @@ public class SteadyTime implements Comparable<SteadyTime> {
return time.compareTo(other.time);
}
+ @Override
+ public String toString() {
+ return "SteadyTime[" + time + "]";
+ }
+
public static SteadyTime from(long time, TimeUnit unit) {
return new SteadyTime(Duration.of(time, unit.toChronoUnit()));
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
index ac94fc8f69..3b2b4a85ac 100644
---
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
@@ -48,6 +48,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
@@ -78,6 +79,7 @@ import
org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
@@ -128,7 +130,7 @@ public class TabletMetadataTest {
MERGED_COLUMN.put(mutation, new Value());
OLD_PREV_ROW_COLUMN.put(mutation, TabletColumnFamily.encodePrevEndRow(new
Text("oldPrev")));
- long suspensionTime = System.currentTimeMillis();
+ SteadyTime suspensionTime = SteadyTime.from(System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
TServerInstance ser1 = new
TServerInstance(HostAndPort.fromParts("server1", 8555), "s001");
Value suspend = SuspendingTServer.toValue(ser1, suspensionTime);
SUSPEND_COLUMN.put(mutation, suspend);
@@ -289,13 +291,14 @@ public class TabletMetadataTest {
// test SUSPENDED
mutation = TabletColumnFamily.createPrevRowMutation(extent);
mutation.at().family(SUSPEND_COLUMN.getColumnFamily())
-
.qualifier(SUSPEND_COLUMN.getColumnQualifier()).put(SuspendingTServer.toValue(ser2,
1000L));
+ .qualifier(SUSPEND_COLUMN.getColumnQualifier())
+ .put(SuspendingTServer.toValue(ser2, SteadyTime.from(1000L,
TimeUnit.MILLISECONDS)));
rowMap = toRowMap(mutation);
tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch,
false);
assertEquals(TabletState.SUSPENDED, tm.getTabletState(tservers));
- assertEquals(1000L, tm.getSuspend().suspensionTime);
+ assertEquals(1000L, tm.getSuspend().suspensionTime.getMillis());
assertEquals(ser2.getHostAndPort(), tm.getSuspend().server);
assertNull(tm.getLocation());
assertFalse(tm.hasCurrent());
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
index acfb363d2b..63b00064e6 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
@@ -21,12 +21,12 @@ package org.apache.accumulo.server.manager.state;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.hadoop.fs.Path;
import com.google.common.net.HostAndPort;
@@ -86,7 +86,7 @@ class LoggingTabletStateStore implements TabletStateStore {
@Override
public void suspend(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long
suspensionTimestamp)
+ Map<TServerInstance,List<Path>> logsForDeadServers, SteadyTime
suspensionTimestamp)
throws DistributedStoreException {
wrapped.suspend(tablets, logsForDeadServers, suspensionTimestamp);
@@ -100,8 +100,7 @@ class LoggingTabletStateStore implements TabletStateStore {
if (location != null) {
server = location.getHostAndPort();
}
- TabletLogger.suspended(tls.extent, server, suspensionTimestamp,
TimeUnit.MILLISECONDS,
- logsForDeadServers.size());
+ TabletLogger.suspended(tls.extent, server, suspensionTimestamp,
logsForDeadServers.size());
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index ccb5dc2747..acc37a2075 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@ -32,6 +32,7 @@ import
org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.server.util.ManagerMetadataUtil;
import org.apache.hadoop.fs.Path;
@@ -99,18 +100,18 @@ class MetaDataStateStore implements TabletStateStore {
@Override
public void unassign(Collection<TabletLocationState> tablets,
Map<TServerInstance,List<Path>> logsForDeadServers) throws
DistributedStoreException {
- unassign(tablets, logsForDeadServers, -1);
+ unassign(tablets, logsForDeadServers, null);
}
@Override
public void suspend(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long
suspensionTimestamp)
+ Map<TServerInstance,List<Path>> logsForDeadServers, SteadyTime
suspensionTimestamp)
throws DistributedStoreException {
unassign(tablets, logsForDeadServers, suspensionTimestamp);
}
private void unassign(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long
suspensionTimestamp)
+ Map<TServerInstance,List<Path>> logsForDeadServers, SteadyTime
suspensionTimestamp)
throws DistributedStoreException {
try (var tabletsMutator = ample.mutateTablets()) {
for (TabletLocationState tls : tablets) {
@@ -128,11 +129,11 @@ class MetaDataStateStore implements TabletStateStore {
}
}
}
- if (suspensionTimestamp >= 0) {
+ if (suspensionTimestamp != null && suspensionTimestamp.getMillis()
>= 0) {
tabletMutator.putSuspension(tls.current.getServerInstance(),
suspensionTimestamp);
}
}
- if (tls.suspend != null && suspensionTimestamp < 0) {
+ if (tls.suspend != null && suspensionTimestamp == null) {
tabletMutator.deleteSuspension();
}
if (tls.hasFuture()) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
index 05072fd1b0..9f4302bfdb 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
@@ -76,7 +77,7 @@ public interface TabletStateStore extends
Iterable<TabletLocationState> {
* previous tserver.
*/
void suspend(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long
suspensionTimestamp)
+ Map<TServerInstance,List<Path>> logsForDeadServers, SteadyTime
suspensionTimestamp)
throws DistributedStoreException;
/**
@@ -91,7 +92,7 @@ public interface TabletStateStore extends
Iterable<TabletLocationState> {
}
public static void suspend(ServerContext context, TabletLocationState tls,
- Map<TServerInstance,List<Path>> logsForDeadServers, long
suspensionTimestamp)
+ Map<TServerInstance,List<Path>> logsForDeadServers, SteadyTime
suspensionTimestamp)
throws DistributedStoreException {
getStoreForTablet(tls.extent,
context).suspend(Collections.singletonList(tls),
logsForDeadServers, suspensionTimestamp);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
index e18365bdf6..639c72f47f 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
@@ -36,6 +36,7 @@ import
org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.server.util.ManagerMetadataUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -188,7 +189,7 @@ class ZooTabletStateStore implements TabletStateStore {
@Override
public void suspend(Collection<TabletLocationState> tablets,
- Map<TServerInstance,List<Path>> logsForDeadServers, long
suspensionTimestamp)
+ Map<TServerInstance,List<Path>> logsForDeadServers, SteadyTime
suspensionTimestamp)
throws DistributedStoreException {
// No support for suspending root tablet.
unassign(tablets, logsForDeadServers);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
index 036c9ac342..a8b567775a 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.io.Text;
@@ -201,7 +202,7 @@ public abstract class TabletMutatorBase implements
Ample.TabletMutator {
}
@Override
- public Ample.TabletMutator putSuspension(TServerInstance tServer, long
suspensionTime) {
+ public Ample.TabletMutator putSuspension(TServerInstance tServer, SteadyTime
suspensionTime) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after
calling mutate.");
mutation.put(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(),
SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier(),
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 530bd950b1..7adba90fe0 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -454,7 +454,7 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
private void hostSuspendedTablet(TabletLists tLists, TabletLocationState
tls, Location location,
TableConfiguration tableConf) {
- if (manager.getSteadyTime().getMillis() - tls.suspend.suspensionTime
+ if (manager.getSteadyTime().minus(tls.suspend.suspensionTime).toMillis()
< tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) {
// Tablet is suspended. See if its tablet server is back.
TServerInstance returnInstance = null;
@@ -1386,7 +1386,7 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
deadTablets.subList(0, maxServersToShow));
Manager.log.debug("logs for dead servers: {}", deadLogs);
if (canSuspendTablets()) {
- store.suspend(deadTablets, deadLogs,
manager.getSteadyTime().getMillis());
+ store.suspend(deadTablets, deadLogs, manager.getSteadyTime());
} else {
store.unassign(deadTablets, deadLogs);
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java
index 206f29b383..1458901202 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java
@@ -18,8 +18,11 @@
*/
package org.apache.accumulo.tserver;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import java.util.concurrent.TimeUnit;
+
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.manager.thrift.TabletLoadState;
@@ -28,6 +31,7 @@ import org.apache.accumulo.core.metadata.TabletLocationState;
import
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.server.manager.state.DistributedStoreException;
import org.apache.accumulo.server.manager.state.TabletStateStore;
import org.apache.accumulo.tserver.managermessage.TabletStatusMessage;
@@ -120,8 +124,8 @@ class UnloadTabletHandler implements Runnable {
&&
!server.getConfiguration().getBoolean(Property.MANAGER_METADATA_SUSPENDABLE))) {
TabletStateStore.unassign(server.getContext(), tls, null);
} else {
- TabletStateStore.suspend(server.getContext(), tls, null,
- requestTimeSkew + NANOSECONDS.toMillis(System.nanoTime()));
+ TabletStateStore.suspend(server.getContext(), tls, null,
SteadyTime.from(
+ requestTimeSkew + NANOSECONDS.toMillis(System.nanoTime()),
TimeUnit.MILLISECONDS));
}
} catch (DistributedStoreException ex) {
log.warn("Unable to update storage", ex);