This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 9049e22abe fixes bug with cleaning up multiple dead compactions in
single tablet (#5083)
9049e22abe is described below
commit 9049e22abefda919ae0aafff15c8339569c9899e
Author: Keith Turner <[email protected]>
AuthorDate: Wed Nov 20 08:37:32 2024 -0500
fixes bug with cleaning up multiple dead compactions in single tablet
(#5083)
When a single tablet had multiple dead compactions the dead compaction
cleanup code would attempt to write separate conditonal mutations for
the same tablet. For simplicity Ample does not support this case and
throws an exception. Reworked the code to only write a single
conditional mutation per tablet for dead compaction detection.
---
.../coordinator/CompactionCoordinator.java | 62 ++++++++++++----------
1 file changed, 33 insertions(+), 29 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 941d6647d0..925da279fc 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.manager.compaction.coordinator;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
@@ -37,11 +36,11 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -779,34 +778,43 @@ public class CompactionCoordinator
void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
// Need to process each level by itself because the conditional tablet
mutator does not support
- // mutating multiple data levels at the same time
- compactions.entrySet().stream()
- .collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()),
- Collectors.toMap(Entry::getKey, Entry::getValue)))
- .forEach((level, compactionsByLevel) ->
compactionFailedForLevel(compactionsByLevel));
+ // mutating multiple data levels at the same time. Also the conditional
tablet mutator does not
+ // support submitting multiple mutations for a single tablet, so need to
group by extent.
+
+ Map<DataLevel,Map<KeyExtent,Set<ExternalCompactionId>>> groupedCompactions
=
+ new EnumMap<>(DataLevel.class);
+
+ compactions.forEach((ecid, extent) -> {
+ groupedCompactions.computeIfAbsent(DataLevel.of(extent.tableId()), dl ->
new HashMap<>())
+ .computeIfAbsent(extent, e -> new HashSet<>()).add(ecid);
+ });
+
+ groupedCompactions
+ .forEach((dataLevel, levelCompactions) ->
compactionFailedForLevel(levelCompactions));
}
- void compactionFailedForLevel(Map<ExternalCompactionId,KeyExtent>
compactions) {
+ void compactionFailedForLevel(Map<KeyExtent,Set<ExternalCompactionId>>
compactions) {
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
- compactions.forEach((ecid, extent) -> {
+ compactions.forEach((extent, ecids) -> {
try {
ctx.requireNotDeleted(extent.tableId());
-
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
- .deleteExternalCompaction(ecid).submit(new RejectionHandler() {
-
- @Override
- public boolean callWhenTabletDoesNotExists() {
- return true;
- }
+ var mutator =
tabletsMutator.mutateTablet(extent).requireAbsentOperation();
+ ecids.forEach(mutator::requireCompaction);
+ ecids.forEach(mutator::deleteExternalCompaction);
+ mutator.submit(new RejectionHandler() {
+ @Override
+ public boolean callWhenTabletDoesNotExists() {
+ return true;
+ }
- @Override
- public boolean test(TabletMetadata tabletMetadata) {
- return tabletMetadata == null
- ||
!tabletMetadata.getExternalCompactions().containsKey(ecid);
- }
+ @Override
+ public boolean test(TabletMetadata tabletMetadata) {
+ return tabletMetadata == null
+ ||
Collections.disjoint(tabletMetadata.getExternalCompactions().keySet(), ecids);
+ }
- });
+ });
} catch (TableDeletedException e) {
LOG.warn("Table {} was deleted, unable to update metadata for
compaction failure.",
extent.tableId());
@@ -820,10 +828,7 @@ public class CompactionCoordinator
// this should try again later when the dead compaction detector
runs, lets log it in case
// its a persistent problem
if (LOG.isDebugEnabled()) {
- var ecid =
- compactions.entrySet().stream().filter(entry ->
entry.getValue().equals(extent))
- .findFirst().map(Map.Entry::getKey).orElse(null);
- LOG.debug("Unable to remove failed compaction {} {}", extent,
ecid);
+ LOG.debug("Unable to remove failed compaction {} {}", extent,
compactions.get(extent));
}
} else {
// compactionFailed is called from the Compactor when either a
compaction fails or
@@ -833,8 +838,7 @@ public class CompactionCoordinator
// that have a corresponding ecid in the name.
ecidsForTablet.clear();
- compactions.entrySet().stream().filter(e ->
e.getValue().compareTo(extent) == 0)
- .map(Entry::getKey).forEach(ecidsForTablet::add);
+ ecidsForTablet.addAll(compactions.get(extent));
if (!ecidsForTablet.isEmpty()) {
final TabletMetadata tm = ctx.getAmple().readTablet(extent,
ColumnType.DIR);
@@ -875,7 +879,7 @@ public class CompactionCoordinator
});
}
- compactions.forEach((k, v) -> recordCompletion(k));
+ compactions.values().forEach(ecids ->
ecids.forEach(this::recordCompletion));
}
/**