This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new d90f85b8cb updates FateCleaner to use SteadyTime (#4670)
d90f85b8cb is described below
commit d90f85b8cb86c9ca72064b8bcf1158ea9923dff5
Author: Keith Turner <[email protected]>
AuthorDate: Fri Jun 14 09:36:38 2024 -0400
updates FateCleaner to use SteadyTime (#4670)
fixes #4481
---
.../org/apache/accumulo/core/fate/FateCleaner.java | 49 ++++++++++------------
.../apache/accumulo/core/fate/FateCleanerTest.java | 48 +++++++++++++++------
.../java/org/apache/accumulo/manager/Manager.java | 2 +-
3 files changed, 59 insertions(+), 40 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
index e1738c2167..20e6ef691d 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
@@ -20,10 +20,12 @@ package org.apache.accumulo.core.fate;
import java.time.Duration;
import java.util.EnumSet;
-import java.util.UUID;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,54 +38,45 @@ import com.google.common.base.Preconditions;
* field is used to track fate transactions that are candidates for cleanup.
*
* <p>
- * No external time source is used. It starts tracking idle time when its
created.
- *
- * <p>
* The {@link #ageOff()} method on this class must be periodically called
inorder to cleanup to
* happen.
*/
public class FateCleaner<T> {
public interface TimeSource {
- long currentTimeNanos();
+ SteadyTime steadyTime();
}
// Statuses that can be aged off if idle for a prolonged period.
private static final EnumSet<TStatus> AGE_OFF_STATUSES =
EnumSet.of(TStatus.NEW, TStatus.FAILED, TStatus.SUCCESSFUL);
- // This is used to determine if age off data was persisted by another
instance of this object.
- private final UUID instanceId = UUID.randomUUID();
-
private static final Logger log = LoggerFactory.getLogger(FateCleaner.class);
private final FateStore<T> store;
- private final long ageOffTime;
+ private final Duration ageOffTime;
private final TimeSource timeSource;
private static class AgeOffInfo {
- final UUID instanceId;
- final long setTime;
+ final SteadyTime setTime;
final TStatus status;
public AgeOffInfo(String ageOffStr) {
var tokens = ageOffStr.split(":");
- Preconditions.checkArgument(tokens.length == 3, "Malformed input %s",
ageOffStr);
- instanceId = UUID.fromString(tokens[0]);
- setTime = Long.parseLong(tokens[1]);
- status = TStatus.valueOf(tokens[2]);
+ Preconditions.checkArgument(tokens.length == 2, "Malformed input %s",
ageOffStr);
+ setTime = SteadyTime.from(Long.parseLong(tokens[0]),
TimeUnit.NANOSECONDS);
+ status = TStatus.valueOf(tokens[1]);
}
- public AgeOffInfo(UUID instanceId, long time, TStatus status) {
- this.instanceId = instanceId;
+ public AgeOffInfo(SteadyTime time, TStatus status) {
this.setTime = time;
this.status = status;
}
@Override
public String toString() {
- return instanceId + ":" + setTime + ":" + status;
+ return setTime.getNanos() + ":" + status;
}
}
@@ -97,9 +90,12 @@ public class FateCleaner<T> {
}
private boolean shouldAgeOff(TStatus currStatus, AgeOffInfo ageOffInfo) {
+ SteadyTime currSteadyTime = timeSource.steadyTime();
+ Duration elapsed = currSteadyTime.minus(ageOffInfo.setTime);
+ Preconditions.checkState(!elapsed.isNegative(), "Elapsed steady time is
negative : %s %s %s",
+ currSteadyTime, ageOffInfo.setTime, elapsed);
return AGE_OFF_STATUSES.contains(currStatus) && currStatus ==
ageOffInfo.status
- && ageOffInfo.instanceId.equals(instanceId)
- && timeSource.currentTimeNanos() - ageOffInfo.setTime >= ageOffTime;
+ && elapsed.compareTo(ageOffTime) > 0;
}
public void ageOff() {
@@ -108,12 +104,10 @@ public class FateCleaner<T> {
try {
AgeOffInfo ageOffInfo = readAgeOffInfo(txStore);
TStatus currStatus = txStore.getStatus();
- if (ageOffInfo == null || !ageOffInfo.instanceId.equals(instanceId)
- || currStatus != ageOffInfo.status) {
+ if (ageOffInfo == null || currStatus != ageOffInfo.status) {
// set or reset the age off info because it does not exists or
it exists but is no
// longer valid
- var newAgeOffInfo =
- new AgeOffInfo(instanceId, timeSource.currentTimeNanos(),
currStatus);
+ var newAgeOffInfo = new AgeOffInfo(timeSource.steadyTime(),
currStatus);
txStore.setTransactionInfo(Fate.TxInfo.TX_AGEOFF,
newAgeOffInfo.toString());
log.trace("Set age off data {} {}", idStatus.getFateId(),
newAgeOffInfo);
} else if (shouldAgeOff(currStatus, ageOffInfo)) {
@@ -127,8 +121,9 @@ public class FateCleaner<T> {
}
public FateCleaner(FateStore<T> store, Duration duration, TimeSource
timeSource) {
- this.store = store;
- this.ageOffTime = duration.toNanos();
- this.timeSource = timeSource;
+ this.store = Objects.requireNonNull(store);
+ this.ageOffTime = Objects.requireNonNull(duration);
+ this.timeSource = Objects.requireNonNull(timeSource);
+ Preconditions.checkArgument(!duration.isNegative() && !duration.isZero());
}
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
index eb0d1dc748..8a6a69c70d 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
@@ -20,13 +20,16 @@ package org.apache.accumulo.core.fate;
import static java.util.stream.Collectors.toSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.time.Duration;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.fate.FateCleaner.TimeSource;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.zookeeper.KeeperException;
import org.junit.jupiter.api.Test;
@@ -36,10 +39,9 @@ public class FateCleanerTest {
long time = 0;
@Override
- public long currentTimeNanos() {
- return time;
+ public SteadyTime steadyTime() {
+ return SteadyTime.from(time, TimeUnit.NANOSECONDS);
}
-
}
@Test
@@ -225,7 +227,8 @@ public class FateCleanerTest {
@Test
public void testNewCleaner() {
- // this test ensures that a new cleaner instance ignores data from another
cleaner instance
+ // this test ensures that a new cleaner instance uses persisted data from
a previous cleaner
+ // instance
TestTimeSource tts = new TestTimeSource();
TestStore testStore = new TestStore();
@@ -250,24 +253,45 @@ public class FateCleanerTest {
assertEquals(Set.of(fateId2, fateId3),
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
- // create a new cleaner, it should ignore any data stored by previous
cleaner
+ // create a new cleaner, it should use the steady times persisted by
previous cleaner instance
FateCleaner<String> cleaner2 = new FateCleaner<>(testStore,
Duration.ofHours(10), tts);
tts.time += Duration.ofHours(5).toNanos();
- // since this is a new cleaner instance, it should reset the clock
+
cleaner2.ageOff();
- assertEquals(Set.of(fateId2, fateId3),
- testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
+ assertEquals(Set.of(fateId3),
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
- // since the clock was reset, advancing time should not age anything off
- tts.time += Duration.ofHours(9).toNanos();
+ tts.time += Duration.ofHours(4).toNanos();
cleaner2.ageOff();
- assertEquals(Set.of(fateId2, fateId3),
- testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
+ assertEquals(Set.of(fateId3),
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
// this should advance time enough to age everything off
tts.time += Duration.ofHours(2).toNanos();
cleaner2.ageOff();
assertEquals(Set.of(),
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
}
+
+ @Test
+ public void testErrors() {
+ TestTimeSource tts = new TestTimeSource();
+ TestStore testStore = new TestStore();
+ assertThrows(IllegalArgumentException.class,
+ () -> new FateCleaner<>(testStore, Duration.ofHours(-10), tts));
+ assertThrows(IllegalArgumentException.class,
+ () -> new FateCleaner<>(testStore, Duration.ZERO, tts));
+ assertThrows(NullPointerException.class,
+ () -> new FateCleaner<>(null, Duration.ofHours(10), tts));
+ assertThrows(NullPointerException.class, () -> new
FateCleaner<>(testStore, null, tts));
+ assertThrows(NullPointerException.class,
+ () -> new FateCleaner<>(testStore, Duration.ofHours(10), null));
+
+ tts.time += Duration.ofHours(6).toNanos();
+
+ FateCleaner<String> cleaner1 = new FateCleaner<>(testStore,
Duration.ofHours(10), tts);
+ FateId fateId1 = testStore.create();
+ cleaner1.ageOff();
+ tts.time -= Duration.ofHours(3).toNanos();
+ // steady time going backwards should cause an error
+ assertThrows(IllegalStateException.class, () -> cleaner1.ageOff());
+ }
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index e5deae2b26..c7e6339a22 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1262,7 +1262,7 @@ public class Manager extends AbstractServer
final Fate<Manager> fateInstance =
new Fate<>(this, store, TraceRepo::toLogString, getConfiguration());
- var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8),
System::nanoTime);
+ var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8),
this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));