This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new c06fa70459 Improve Ample logging for rejected conditional mutations
(#5219)
c06fa70459 is described below
commit c06fa70459af4eac086cbc5c00b664a63fdbbb3d
Author: Dom G. <[email protected]>
AuthorDate: Mon Jan 6 15:00:07 2025 -0500
Improve Ample logging for rejected conditional mutations (#5219)
* Improve Ample logging for rejected conditional mutations
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../accumulo/core/metadata/schema/Ample.java | 11 ++++++++++
.../metadata/ConditionalTabletMutatorImpl.java | 12 ++++++++++-
.../metadata/ConditionalTabletsMutatorImpl.java | 19 +++++++++++------
.../accumulo/manager/TabletGroupWatcher.java | 12 +++++------
.../coordinator/CompactionCoordinator.java | 5 +++--
.../coordinator/commit/CommitCompaction.java | 5 +++--
.../availability/SetTabletAvailability.java | 3 ++-
.../tableOps/bulkVer2/CleanUpBulkImport.java | 2 +-
.../manager/tableOps/bulkVer2/LoadFiles.java | 2 +-
.../manager/tableOps/compact/CompactionDriver.java | 20 +++++++++++-------
.../manager/tableOps/delete/ReserveTablets.java | 3 ++-
.../manager/tableOps/merge/DeleteRows.java | 24 +++++++++++++---------
.../test/functional/AmpleConditionalWriterIT.java | 6 ++++--
13 files changed, 83 insertions(+), 41 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 7135b5a978..367ee6fe64 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.admin.TabletAvailability;
@@ -623,6 +624,16 @@ public interface Ample {
* let the rejected status carry forward in this case.
*/
void submit(RejectionHandler rejectionHandler);
+
+ /**
+ * Overloaded version of {@link #submit(RejectionHandler)} that takes a
short description of the
+ * operation to assist with debugging.
+ *
+ * @param rejectionHandler The rejection handler
+ * @param description A short description of the operation (e.g., "bulk
import", "compaction")
+ */
+ void submit(RejectionHandler rejectionHandler, Supplier<String>
description);
+
}
/**
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 4ae4667602..ac0dc4b112 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.TabletAvailability;
@@ -86,17 +87,20 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
private final ServerContext context;
private final ServiceLock lock;
private final KeyExtent extent;
+ private final BiConsumer<KeyExtent,Supplier<String>> descriptionConsumer;
private boolean sawOperationRequirement = false;
private boolean checkPrevEndRow = true;
protected ConditionalTabletMutatorImpl(ServerContext context, KeyExtent
extent,
Consumer<ConditionalMutation> mutationConsumer,
- BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer) {
+ BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer,
+ BiConsumer<KeyExtent,Supplier<String>> descriptionConsumer) {
super(new ConditionalMutation(extent.toMetaRow()));
this.mutation = (ConditionalMutation) super.mutation;
this.mutationConsumer = mutationConsumer;
this.rejectionHandlerConsumer = rejectionHandlerConsumer;
+ this.descriptionConsumer = descriptionConsumer;
this.extent = extent;
this.context = context;
this.lock = this.context.getServiceLock();
@@ -390,4 +394,10 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
mutationConsumer.accept(mutation);
rejectionHandlerConsumer.accept(extent, rejectionCheck);
}
+
+ @Override
+ public void submit(Ample.RejectionHandler rejectionHandler, Supplier<String>
description) {
+ descriptionConsumer.accept(extent, description);
+ this.submit(rejectionHandler);
+ }
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 660453f299..777364e0c4 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
@@ -66,6 +67,7 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
private boolean active = true;
final Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new
HashMap<>();
+ private final Map<KeyExtent,Supplier<String>> operationDescriptions = new
HashMap<>();
private final Function<DataLevel,String> tableMapper;
public ConditionalTabletsMutatorImpl(ServerContext context) {
@@ -93,7 +95,8 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
Preconditions.checkState(extents.putIfAbsent(extent.toMetaRow(), extent)
== null,
"Duplicate extents not handled %s", extent);
- return new ConditionalTabletMutatorImpl(context, extent, mutations::add,
rejectedHandlers::put);
+ return new ConditionalTabletMutatorImpl(context, extent, mutations::add,
rejectedHandlers::put,
+ operationDescriptions::put);
}
protected ConditionalWriter createConditionalWriter(Ample.DataLevel
dataLevel)
@@ -262,16 +265,20 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
status = Status.ACCEPTED;
}
+ Supplier<String> descSupplier =
operationDescriptions.get(extent);
+ String desc = (descSupplier == null) ? null : descSupplier.get();
+
if (log.isTraceEnabled()) {
// log detailed info about tablet metadata and mutation
- log.trace("Mutation was rejected, status:{} {} {}", status,
tabletMetadata,
- result.getMutation().prettyPrint());
+ log.trace("Mutation was rejected, status:{}. Operation
description: {} {} {}",
+ status, desc, tabletMetadata,
result.getMutation().prettyPrint());
} else if (log.isDebugEnabled()) {
// log a single line of info that makes it apparent this
happened and gives enough
// information to investigate
- log.debug("Mutation was rejected, status:{} extent:{} row:{}",
status,
- tabletMetadata == null ? null : tabletMetadata.getExtent(),
- new String(result.getMutation().getRow(), UTF_8));
+ log.debug(
+ "Mutation was rejected, status:{} extent:{} row:{}
operation description: {}",
+ status, tabletMetadata == null ? null :
tabletMetadata.getExtent(),
+ new String(result.getMutation().getRow(), UTF_8), desc);
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 6496f500e9..e29413e82a 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -350,12 +350,10 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
// Do not add any code here, it may interfere with the finally block
removing extents from
// hostingRequestInProgress
try (var mutator =
manager.getContext().getAmple().conditionallyMutateTablets()) {
- inProgress.forEach(ke -> {
- mutator.mutateTablet(ke).requireAbsentOperation()
-
.requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation()
- .setHostingRequested().submit(TabletMetadata::getHostingRequested);
-
- });
+ inProgress.forEach(ke ->
mutator.mutateTablet(ke).requireAbsentOperation()
+
.requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation()
+ .setHostingRequested()
+ .submit(TabletMetadata::getHostingRequested, () -> "host ondemand"));
List<Range> ranges = new ArrayList<>();
@@ -1094,7 +1092,7 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
"replaceVolume conditional mutation rejection check {}
logsRemoved:{} filesRemoved:{}",
tm.getExtent(), logsRemoved, filesRemoved);
return logsRemoved && filesRemoved;
- });
+ }, () -> "replace volume");
}
tabletsMutator.process().forEach((extent, result) -> {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 6f2eca3756..fce920e4cb 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -635,8 +635,9 @@ public class CompactionCoordinator
}
tabletMutator.putExternalCompaction(externalCompactionId, ecm);
- tabletMutator
- .submit(tm ->
tm.getExternalCompactions().containsKey(externalCompactionId));
+ tabletMutator.submit(
+ tm ->
tm.getExternalCompactions().containsKey(externalCompactionId),
+ () -> "compaction reservation");
var result = tabletsMutator.process().get(extent);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index cb9492d7e3..29e626d954 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -128,8 +128,9 @@ public class CommitCompaction extends ManagerRepo {
// make the needed updates to the tablet
updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile,
ecm, tabletMutator);
- tabletMutator
- .submit(tabletMetadata ->
!tabletMetadata.getExternalCompactions().containsKey(ecid));
+ tabletMutator.submit(
+ tabletMetadata ->
!tabletMetadata.getExternalCompactions().containsKey(ecid),
+ () -> "commit compaction " + ecid);
if (LOG.isDebugEnabled()) {
LOG.debug("Compaction completed {} added {} removed {}",
tablet.getExtent(), newDatafile,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java
index bd94afd15b..db4fcdbeaf 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java
@@ -136,7 +136,8 @@ public class SetTabletAvailability extends ManagerRepo {
tabletExtent);
mutator.mutateTablet(tabletExtent).requireAbsentOperation()
.putTabletAvailability(tabletAvailability)
- .submit(tabletMeta -> tabletMeta.getTabletAvailability() ==
tabletAvailability);
+ .submit(tabletMeta -> tabletMeta.getTabletAvailability() ==
tabletAvailability,
+ () -> "set tablet availability");
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
index 3200369525..e5cdfdb085 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -111,7 +111,7 @@ public class CleanUpBulkImport extends ManagerRepo {
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
tablet.getLoaded().entrySet().stream().filter(entry ->
entry.getValue().equals(fateId))
.map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile);
- tabletMutator.submit(tm -> false);
+ tabletMutator.submit(tm -> false, () -> "remove bulk load entries
" + fateId);
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 5ff2ca3ec3..c38fb54ce6 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -251,7 +251,7 @@ class LoadFiles extends ManagerRepo {
Preconditions.checkState(
loadingFiles.put(tablet.getExtent(),
List.copyOf(filesToLoad.keySet())) == null);
- tabletMutator.submit(tm -> false);
+ tabletMutator.submit(tm -> false, () -> "bulk load files " + fateId);
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 464f365121..51c4186bf9 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -211,7 +211,8 @@ public class CompactionDriver extends ManagerRepo {
// this tablet has no files try to mark it as done
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, FILES, COMPACTED).putCompacted(fateId)
- .submit(tabletMetadata ->
tabletMetadata.getCompacted().contains(fateId));
+ .submit(tabletMetadata ->
tabletMetadata.getCompacted().contains(fateId),
+ () -> "no files, attempting to mark as compacted. " +
fateId);
noFiles++;
} else if (tablet.getSelectedFiles() == null &&
tablet.getExternalCompactions().isEmpty()) {
// there are no selected files
@@ -242,7 +243,8 @@ public class CompactionDriver extends ManagerRepo {
// no files were selected so mark the tablet as compacted
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, FILES, SELECTED, ECOMP,
COMPACTED).putCompacted(fateId)
- .submit(tabletMetadata ->
tabletMetadata.getCompacted().contains(fateId));
+ .submit(tabletMetadata ->
tabletMetadata.getCompacted().contains(fateId),
+ () -> "no files, attempting to mark as compacted. " +
fateId);
noneSelected++;
} else {
@@ -260,9 +262,11 @@ public class CompactionDriver extends ManagerRepo {
selectionsSubmitted.put(tablet.getExtent(), filesToCompact);
- mutator.submit(tabletMetadata -> tabletMetadata.getSelectedFiles()
!= null
- && tabletMetadata.getSelectedFiles().getFateId().equals(fateId)
- || tabletMetadata.getCompacted().contains(fateId));
+ mutator.submit(
+ tabletMetadata -> tabletMetadata.getSelectedFiles() != null
+ &&
tabletMetadata.getSelectedFiles().getFateId().equals(fateId)
+ || tabletMetadata.getCompacted().contains(fateId),
+ () -> "selecting files for compaction. " + fateId);
if (minSelected == null ||
tablet.getExtent().compareTo(minSelected) < 0) {
minSelected = tablet.getExtent();
@@ -298,7 +302,8 @@ public class CompactionDriver extends ManagerRepo {
var mutator =
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, ECOMP, USER_COMPACTION_REQUESTED)
.putUserCompactionRequested(fateId);
- mutator.submit(tm ->
tm.getUserCompactionsRequested().contains(fateId));
+ mutator.submit(tm ->
tm.getUserCompactionsRequested().contains(fateId),
+ () -> "marking as needing a user requested compaction. " +
fateId);
userCompactionRequested++;
} else {
// Marker was already added and we are waiting
@@ -400,7 +405,8 @@ public class CompactionDriver extends ManagerRepo {
mutator.deleteUserCompactionRequested(fateId);
}
- mutator.submit(needsNoUpdate::test);
+ mutator.submit(needsNoUpdate::test,
+ () -> "cleanup metadata for failed compaction. " + fateId);
}
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
index f065206b2f..6f9d65d0b8 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java
@@ -96,7 +96,8 @@ public class ReserveTablets extends ManagerRepo {
// must wait for the tablet to have no location before proceeding to
actually delete. See
// the documentation about the opid column in the MetadataSchema
class for more details.
conditionalMutator.mutateTablet(tabletMeta.getExtent()).requireAbsentOperation()
- .putOperation(opid).submit(tm ->
opid.equals(tm.getOperationId()));
+ .putOperation(opid)
+ .submit(tm -> opid.equals(tm.getOperationId()), () -> "put opid
" + opid);
submitted++;
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
index 2c5221b7ce..3bf6bce841 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java
@@ -178,16 +178,20 @@ public class DeleteRows extends ManagerRepo {
}
}
- filesToDelete.forEach(file -> log.debug("{} deleting file {} for {}",
fateId, file,
- tabletMetadata.getExtent()));
- filesToAddMap.forEach((file, dfv) -> log.debug("{} adding file {} {}
for {}", fateId, file,
- dfv, tabletMetadata.getExtent()));
-
- filesToDelete.forEach(tabletMutator::deleteFile);
- filesToAddMap.forEach(tabletMutator::putFile);
-
- tabletMutator.submit(tm ->
tm.getFiles().containsAll(filesToAddMap.keySet())
- && Collections.disjoint(tm.getFiles(), filesToDelete));
+ filesToDelete.forEach(file -> {
+ log.debug("{} deleting file {} for {}", fateId, file,
tabletMetadata.getExtent());
+ tabletMutator.deleteFile(file);
+ });
+
+ filesToAddMap.forEach((file, dfv) -> {
+ log.debug("{} adding file {} {} for {}", fateId, file, dfv,
tabletMetadata.getExtent());
+ tabletMutator.putFile(file, dfv);
+ });
+
+ tabletMutator.submit(
+ tm -> tm.getFiles().containsAll(filesToAddMap.keySet())
+ && Collections.disjoint(tm.getFiles(), filesToDelete),
+ () -> "delete tablet files (as part of merge or deleterow
operation) " + fateId);
}
var results = tabletsMutator.process();
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index d8e0e9d2e1..f594f7b9ec 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -173,7 +173,8 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
// test require absent with a future location set
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
- .putLocation(Location.future(ts2)).submit(tm -> false);
+ .putLocation(Location.future(ts2)).submit(tm -> false,
+ () -> "Testing that requireAbsentLocation() fails when a
future location is set");
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
}
assertEquals(Location.future(ts1),
context.getAmple().readTablet(e1).getLocation());
@@ -196,7 +197,8 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
.putLocation(Location.future(ts2)).submit(tm -> false);
- assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
+ assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus(),
+ () -> "Testing that requireAbsentLocation() fails when a current
location is set");
}
assertEquals(Location.current(ts1),
context.getAmple().readTablet(e1).getLocation());