This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 7cf5c73222 Modified coordinator restart test to test compaction
duration update (#4766)
7cf5c73222 is described below
commit 7cf5c732229894034ed05e177db587e4bb89c263
Author: Dave Marion <[email protected]>
AuthorDate: Mon Jul 29 07:59:55 2024 -0400
Modified coordinator restart test to test compaction duration update (#4766)
Closes #4680
---
.../coordinator/CompactionCoordinator.java | 5 +-
.../test/compaction/ExternalCompaction_3_IT.java | 115 +++++++++++++++------
2 files changed, 86 insertions(+), 34 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 355d5a8a5e..34e9d9191b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -148,6 +148,9 @@ public class CompactionCoordinator
private static final Logger LOG =
LoggerFactory.getLogger(CompactionCoordinator.class);
+ public static final String RESTART_UPDATE_MSG =
+ "Coordinator restarted, compaction found in progress";
+
/*
* Map of compactionId to RunningCompactions. This is an informational cache
of what external
* compactions may be running. Its possible it may contain external
compactions that are not
@@ -301,7 +304,7 @@ public class CompactionCoordinator
running.forEach(rc -> {
TCompactionStatusUpdate update = new TCompactionStatusUpdate();
update.setState(TCompactionState.IN_PROGRESS);
- update.setMessage("Coordinator restarted, compaction found in
progress");
+ update.setMessage(RESTART_UPDATE_MSG);
rc.addUpdate(System.currentTimeMillis(), update);
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()),
rc);
});
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
index b1d7128b59..6d80c970e3 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
@@ -22,7 +22,6 @@ import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GR
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
-import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getLastState;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
@@ -31,29 +30,40 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
+import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
+import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
+import
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -76,9 +86,6 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
@BeforeAll
public static void beforeTests() throws Exception {
startMiniClusterWithConfig(new ExternalCompaction3Config());
- getCluster().getClusterControl().stop(ServerType.COMPACTOR);
- getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
- ExternalDoNothingCompactor.class);
}
@Test
@@ -88,6 +95,10 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
+ getCluster().getClusterControl().stop(ServerType.COMPACTOR);
+ getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
+ ExternalDoNothingCompactor.class);
+
createTable(client, table1, "cs1", 2);
// set compaction ratio to 1 so that majc occurs naturally, not user
compaction
// user compaction blocks merge
@@ -143,6 +154,10 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
// Verify that the tmp file are cleaned up
Wait.waitFor(() -> FindCompactionTmpFiles
.findTempFiles(getCluster().getServerContext(),
tid.canonical()).size() == 0);
+ } finally {
+ getCluster().getClusterControl().stop(ServerType.COMPACTOR);
+ getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
}
}
@@ -155,6 +170,12 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
createTable(client, table1, "cs2", 2);
writeData(client, table1);
+
+ IteratorSetting setting = new IteratorSetting(50, "slow",
SlowIterator.class);
+ SlowIterator.setSeekSleepTime(setting, 5000);
+ SlowIterator.setSleepTime(setting, 5000);
+ client.tableOperations().attachIterator(table1, setting,
EnumSet.of(IteratorScope.majc));
+
compact(client, table1, 2, GROUP2, false);
TableId tid = getCluster().getServerContext().getTableId(table1);
@@ -163,44 +184,33 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
Set<ExternalCompactionId> ecids =
waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid);
+ ServerContext ctx = getCluster().getServerContext();
+
+ // Wait for all compactions to start
+ Map<ExternalCompactionId,RunningCompactionInfo> originalRunningInfo =
null;
+ do {
+ originalRunningInfo = getRunningCompactionInformation(ctx, ecids);
+ } while (originalRunningInfo == null
+ || originalRunningInfo.values().stream().allMatch(rci ->
rci.duration == 0));
+
// Stop the Manager (Coordinator)
getCluster().getClusterControl().stop(ServerType.MANAGER);
// Restart the Manager while the compaction is running
getCluster().getClusterControl().start(ServerType.MANAGER);
- ServerContext ctx = getCluster().getServerContext();
+ Map<ExternalCompactionId,RunningCompactionInfo> postRestartRunningInfo =
+ getRunningCompactionInformation(ctx, ecids);
- // Confirm compaction is still running
- int matches = 0;
- while (matches == 0) {
- TExternalCompactionList running = null;
- while (running == null) {
- try {
- Optional<HostAndPort> coordinatorHost =
- ExternalCompactionUtil.findCompactionCoordinator(ctx);
- if (coordinatorHost.isEmpty()) {
- throw new TTransportException(
- "Unable to get CompactionCoordinator address from
ZooKeeper");
- }
- running = getRunningCompactions(ctx, coordinatorHost);
- } catch (TException t) {
- running = null;
- Thread.sleep(2000);
- }
- }
- if (running.getCompactions() != null) {
- for (ExternalCompactionId ecid : ecids) {
- TExternalCompaction tec =
running.getCompactions().get(ecid.canonical());
- if (tec != null && tec.getUpdates() != null &&
!tec.getUpdates().isEmpty()) {
- matches++;
- assertEquals(TCompactionState.IN_PROGRESS, getLastState(tec));
- }
- }
+ for (Entry<ExternalCompactionId,RunningCompactionInfo> post :
postRestartRunningInfo
+ .entrySet()) {
+ if (originalRunningInfo.containsKey(post.getKey())) {
+ assertTrue(
+ (post.getValue().duration -
originalRunningInfo.get(post.getKey()).duration) > 0);
}
- UtilWaitThread.sleep(250);
+ final String lastState = post.getValue().status;
+ assertTrue(lastState.equals(TCompactionState.IN_PROGRESS.name()));
}
- assertTrue(matches > 0);
// We need to cancel the compaction or delete the table here because we
initiate a user
// compaction above in the test. Even though the external compaction was
cancelled
@@ -210,4 +220,43 @@ public class ExternalCompaction_3_IT extends
SharedMiniClusterBase {
}
}
+ private Map<ExternalCompactionId,RunningCompactionInfo>
getRunningCompactionInformation(
+ ServerContext ctx, Set<ExternalCompactionId> ecids) throws
InterruptedException {
+
+ final Map<ExternalCompactionId,RunningCompactionInfo> results = new
HashMap<>();
+
+ while (results.isEmpty()) {
+ TExternalCompactionList running = null;
+ while (running == null || running.getCompactions() == null) {
+ try {
+ Optional<HostAndPort> coordinatorHost =
+ ExternalCompactionUtil.findCompactionCoordinator(ctx);
+ if (coordinatorHost.isEmpty()) {
+ throw new TTransportException(
+ "Unable to get CompactionCoordinator address from ZooKeeper");
+ }
+ running = getRunningCompactions(ctx, coordinatorHost);
+ } catch (TException t) {
+ running = null;
+ Thread.sleep(2000);
+ }
+ }
+ for (ExternalCompactionId ecid : ecids) {
+ final TExternalCompaction tec =
running.getCompactions().get(ecid.canonical());
+ if (tec != null && tec.getUpdatesSize() > 0) {
+ // When the coordinator restarts it inserts a message into the
updates. If this
+ // is the last message, then don't insert this into the results. We
want to get
+ // an actual update from the Compactor.
+ TreeMap<Long,TCompactionStatusUpdate> sorted = new
TreeMap<>(tec.getUpdates());
+ var lastEntry = sorted.lastEntry();
+ if
(lastEntry.getValue().getMessage().equals(CompactionCoordinator.RESTART_UPDATE_MSG))
{
+ continue;
+ }
+ results.put(ecid, new RunningCompactionInfo(tec));
+ }
+ }
+ }
+ return results;
+ }
+
}