This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f763da2bc197 feat(conflict-resolution): Allow
PreferWriterConflictResolutionStrategy to abort clustering if there is an
ongoing write that is in requested state. (#18280)
f763da2bc197 is described below
commit f763da2bc19750f96aa0a1cf57a322726bf05d51
Author: Krishen <[email protected]>
AuthorDate: Fri Mar 13 14:39:52 2026 -0700
feat(conflict-resolution): Allow PreferWriterConflictResolutionStrategy to
abort clustering if there is an ongoing write that is in requested state.
(#18280)
#17907
Summary and Changelog
Summary: When PreferWriterConflictResolutionStrategy is used for write
conflict resolution, clustering commits can now be configured to self-abort if
there are any pending ingestion instants (in .requested state) that have an
active heartbeat but have not yet transitioned to .inflight. This prevents
clustering from committing and potentially causing a conflict with an ongoing
ingestion write, since ingestion should have precedence over clustering (with
this strategy). This is only rec [...]
The heartbeat timeout is inferred from the write config's heartbeat
interval and tolerable misses
A new dedicated exception
(HoodieWriteConflictAwaitingIngestionInflightException) and metric
(conflict_resolution.awaiting_ingestion_inflight.counter) are emitted when this
scenario occurs, allowing users to distinguish this specific failure mode from
general write conflicts.
Changelog:
hudi-common
HoodieWriteConflictAwaitingIngestionInflightException: New exception
extending HoodieWriteConflictException, thrown when clustering detects a
pending ingestion instant with an active heartbeat.
hudi-client-common
ConflictResolutionStrategy: Added a new default overload
getCandidateInstants(metaClient, currentInstant, lastSuccessfulInstant,
Option<HoodieWriteConfig>) that delegates to the existing method.
Implementations can override to use the write config for additional behavior.
PreferWriterConflictResolutionStrategy: Overrides the new
getCandidateInstants overload. When the current operation is clustering and
hoodie.clustering.block_for_pending_ingestion is enabled, checks ingestion
.requested instants for active heartbeats. If an active heartbeat is found,
throws HoodieWriteConflictAwaitingIngestionInflightException. If the heartbeat
is expired, the instant is ignored (assumed to be a failed write).
HoodieWriteConfig: Added CLUSTERING_BLOCK_FOR_PENDING_INGESTION config
property (default false), getter isClusteringBlockForPendingIngestion(), and
builder method withClusteringBlockForPendingIngestion(boolean).
TransactionUtils: Updated resolveWriteConflictIfAny to pass
Option.of(config) to the new getCandidateInstants overload, threading the write
config through.
BaseHoodieClient: Updated resolveWriteConflict to catch
HoodieWriteConflictAwaitingIngestionInflightException and emit a dedicated
metric before re-throwing.
HoodieMetrics: Added emitConflictResolutionAwaitingIngestionInflight()
counter metric.
TestPreferWriterConflictResolutionStrategy: Added 3 tests covering: (1)
clustering self-aborts with active heartbeat ingestion .requested instant, (2)
no blocking when config is disabled, (3) old method signature delegates with
defaults (blocking disabled).
Impact
User-facing: New opt-in config
hoodie.clustering.block_for_pending_ingestion (default false). No change to
existing behavior unless explicitly enabled.
Behavior: When enabled, clustering writes using
PreferWriterConflictResolutionStrategy will fail with
HoodieWriteConflictAwaitingIngestionInflightException if they detect an
ingestion .requested instant with an active heartbeat, preventing clustering
from committing and risking a conflict with ongoing ingestion.
Performance: Negligible. When enabled, adds heartbeat file existence checks
for .requested ingestion instants during clustering commit conflict resolution.
Risk Level
Low. Config defaults to false, so existing behavior is unchanged. The new
exception is a subclass of HoodieWriteConflictException, so existing catch
blocks continue to work. All existing tests pass; 3 new tests validate the new
behavior.
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../org/apache/hudi/client/BaseHoodieClient.java | 1 +
.../client/transaction/ConcurrentOperation.java | 12 +-
.../transaction/ConflictResolutionStrategy.java | 13 +
.../PreferWriterConflictResolutionStrategy.java | 114 ++++++--
.../apache/hudi/client/utils/TransactionUtils.java | 2 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 19 ++
.../org/apache/hudi/metrics/HoodieMetrics.java | 42 +++
...TestPreferWriterConflictResolutionStrategy.java | 309 +++++++++++++++++++++
.../org/apache/hudi/metrics/TestHoodieMetrics.java | 35 +++
.../exception/HoodieWriteConflictException.java | 31 +++
.../TestHoodieClientOnCopyOnWriteStorage.java | 62 +++++
11 files changed, 619 insertions(+), 21 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 6fe7771d7252..b0875672b5e9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -231,6 +231,7 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
metrics.emitConflictResolutionSuccessful();
} catch (HoodieWriteConflictException e) {
metrics.emitConflictResolutionFailed();
+ e.getCategory().ifPresent(metrics::emitConflictResolutionByCategory);
throw e;
} finally {
if (conflictResolutionTimer != null) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index dc3bd59c6211..177119a789ab 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -118,9 +118,15 @@ public class ConcurrentOperation {
break;
case COMMIT_ACTION:
case DELTA_COMMIT_ACTION:
- this.mutatedPartitionAndFileIds =
getPartitionAndFileIdWithoutSuffixFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
- .getPartitionToWriteStats());
- this.operationType =
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
+ // Ingestion .requested instants may have empty plan files, so the
deserialized
+ // commit metadata will be null. In that case we leave
mutatedPartitionAndFileIds
+ // empty and operationType unset, since the write has not started
yet.
+ org.apache.hudi.avro.model.HoodieCommitMetadata avroCommitMeta =
+
this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata();
+ if (avroCommitMeta != null) {
+ this.mutatedPartitionAndFileIds =
getPartitionAndFileIdWithoutSuffixFromSpecificRecord(avroCommitMeta.getPartitionToWriteStats());
+ this.operationType =
WriteOperationType.fromValue(avroCommitMeta.getOperationType());
+ }
break;
case REPLACE_COMMIT_ACTION:
case CLUSTERING_ACTION:
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java
index 1deba86fe1df..629cde07ec05 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;
@@ -42,6 +43,18 @@ public interface ConflictResolutionStrategy {
*/
Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient metaClient,
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
+ /**
+ * Overload of {@link #getCandidateInstants(HoodieTableMetaClient,
HoodieInstant, Option)} that also accepts an
+ * optional {@link HoodieWriteConfig}. Implementations may use the write
config to infer additional behavior
+ * (e.g., heartbeat-based blocking of clustering when pending ingestion
instants exist).
+ *
+ * <p>The default implementation simply delegates to the existing method,
ignoring the write config.</p>
+ */
+ default Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient
metaClient, HoodieInstant currentInstant,
+ Option<HoodieInstant>
lastSuccessfulInstant, Option<HoodieWriteConfig> writeConfigOpt) {
+ return getCandidateInstants(metaClient, currentInstant,
lastSuccessfulInstant);
+ }
+
/**
* Implementations of this method will determine whether a conflict exists
between 2 commits.
* @param thisOperation
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
index 9b02e26aecc0..2c3942f7169e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
@@ -18,15 +18,22 @@
package org.apache.hudi.client.transaction;
+import org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.table.HoodieTable;
import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@@ -45,16 +52,27 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMI
public class PreferWriterConflictResolutionStrategy
extends SimpleConcurrentFileWritesConflictResolutionStrategy {
+ private boolean isClusteringBlockForPendingIngestion;
+
/**
* For tableservices like replacecommit and compaction commits this method
also returns ingestion inflight commits.
*/
@Override
public Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient
metaClient, HoodieInstant currentInstant,
Option<HoodieInstant>
lastSuccessfulInstant) {
+ return getCandidateInstants(metaClient, currentInstant,
lastSuccessfulInstant, Option.empty());
+ }
+
+ @Override
+ public Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient
metaClient, HoodieInstant currentInstant,
+ Option<HoodieInstant>
lastSuccessfulInstant, Option<HoodieWriteConfig> writeConfigOpt) {
HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
- if (ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant,
metaClient.getInstantGenerator())
- || COMPACTION_ACTION.equals(currentInstant.getAction())) {
- return getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant);
+ boolean isCurrentOperationClustering =
ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant,
metaClient.getInstantGenerator());
+ this.isClusteringBlockForPendingIngestion = isCurrentOperationClustering
+ && writeConfigOpt.isPresent() &&
writeConfigOpt.get().isClusteringBlockForPendingIngestion();
+
+ if (isCurrentOperationClustering ||
COMPACTION_ACTION.equals(currentInstant.getAction())) {
+ return getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant, isCurrentOperationClustering, metaClient, writeConfigOpt);
} else {
return getCandidateInstantsForNonTableServicesCommits(activeTimeline,
currentInstant);
}
@@ -78,13 +96,20 @@ public class PreferWriterConflictResolutionStrategy
}
/**
- * To find which instants are conflicting, we apply the following logic
- * Get both completed instants and ingestion inflight commits that have
happened since the last successful write.
- * We need to check for write conflicts since they may have mutated the same
files
- * that are being newly created by the current write.
+ * Returns candidate instants for table service commits (clustering or
compaction).
+ * Includes both completed instants and ingestion inflight commits that have
happened
+ * since the current write started.
+ *
+ * <p>If the current write is clustering and
+ * {@code
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution} is
enabled,
+ * also includes ingestion {@code .requested} instants (filtering out those
with expired heartbeats)
+ * so they can be evaluated by {@link #hasConflict} and {@link
#resolveConflict}.</p>
*/
- private Stream<HoodieInstant>
getCandidateInstantsForTableServicesCommits(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant) {
- // Fetch list of completed commits.
+ private Stream<HoodieInstant> getCandidateInstantsForTableServicesCommits(
+ HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant,
+ boolean isCurrentOperationClustering, HoodieTableMetaClient metaClient,
+ Option<HoodieWriteConfig> writeConfigOpt) {
+
Stream<HoodieInstant> completedCommitsStream =
activeTimeline
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION,
REPLACE_COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION))
@@ -92,21 +117,76 @@ public class PreferWriterConflictResolutionStrategy
.findInstantsModifiedAfterByCompletionTime(currentInstant.requestedTime())
.getInstantsAsStream();
- // Fetch list of ingestion inflight commits.
- Stream<HoodieInstant> inflightIngestionCommitsStream =
- activeTimeline
- .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION))
- .filterInflights()
- .getInstantsAsStream();
+ Stream<HoodieInstant> inflightIngestionCommitsStream;
+ if (isClusteringBlockForPendingIngestion) {
+ HoodieWriteConfig writeConfig = writeConfigOpt.get();
+ long maxHeartbeatIntervalMs =
writeConfig.getHoodieClientHeartbeatIntervalInMs()
+ * (writeConfig.getHoodieClientHeartbeatTolerableMisses() + 1);
+ inflightIngestionCommitsStream = activeTimeline
+ .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
+ .filterInflightsAndRequested()
+ .getInstantsAsStream()
+ .filter(i -> !ClusteringUtils.isClusteringInstant(activeTimeline, i,
metaClient.getInstantGenerator()))
+ .filter(i -> {
+ if (i.isRequested()) {
+ try {
+ return
!HoodieHeartbeatUtils.isHeartbeatExpired(i.requestedTime(),
maxHeartbeatIntervalMs,
+ metaClient.getStorage(),
metaClient.getBasePath().toString());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return i.isInflight();
+ });
+ } else {
+ inflightIngestionCommitsStream = activeTimeline
+ .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
+ .filterInflights()
+ .getInstantsAsStream();
+ }
- // Merge and sort the instants and return.
List<HoodieInstant> instantsToConsider =
Stream.concat(completedCommitsStream, inflightIngestionCommitsStream)
- .sorted(Comparator.comparing(o -> o.getCompletionTime()))
+ .sorted(Comparator.comparing(HoodieInstant::getCompletionTime,
Comparator.nullsLast(Comparator.naturalOrder())))
.collect(Collectors.toList());
log.info("Instants that may have conflict with {} are {}", currentInstant,
instantsToConsider);
return instantsToConsider.stream();
}
+ @Override
+ public boolean hasConflict(ConcurrentOperation thisOperation,
ConcurrentOperation otherOperation) {
+ if (isClusteringBlockForPendingIngestion
+ && WriteOperationType.CLUSTER.equals(thisOperation.getOperationType())
+ && isRequestedIngestionInstant(otherOperation)) {
+ log.info("Clustering operation {} conflicts with pending ingestion
instant {} "
+ + "that has an active heartbeat", thisOperation, otherOperation);
+ return true;
+ }
+ return super.hasConflict(thisOperation, otherOperation);
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
+ ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
+ if (isClusteringBlockForPendingIngestion
+ && WriteOperationType.CLUSTER.equals(thisOperation.getOperationType())
+ && isRequestedIngestionInstant(otherOperation)) {
+ throw new HoodieWriteConflictException(
+
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION,
+ String.format("Pending ingestion instant %s with active heartbeat
has not transitioned to "
+ + "inflight yet but may potentially conflict with current
clustering operation %s",
+ otherOperation, thisOperation));
+ }
+ return super.resolveConflict(table, thisOperation, otherOperation);
+ }
+
+ private boolean isRequestedIngestionInstant(ConcurrentOperation operation) {
+ String state = operation.getInstantActionState();
+ String actionType = operation.getInstantActionType();
+ return HoodieInstant.State.REQUESTED.name().equals(state)
+ && (COMMIT_ACTION.equals(actionType) ||
DELTA_COMMIT_ACTION.equals(actionType)
+ || (REPLACE_COMMIT_ACTION.equals(actionType) &&
!WriteOperationType.CLUSTER.equals(operation.getOperationType())));
+ }
+
@Override
public boolean isPreCommitRequired() {
return true;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index e8cf8e7f9d28..6b5ac8c575aa 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -81,7 +81,7 @@ public class TransactionUtils {
Option<HoodieSchema> newTableSchema =
resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant,
currentTxnOwnerInstant);
Stream<HoodieInstant> instantStream =
Stream.concat(resolutionStrategy.getCandidateInstants(
- table.getMetaClient(), currentTxnOwnerInstant.get(),
lastCompletedTxnOwnerInstant),
+ table.getMetaClient(), currentTxnOwnerInstant.get(),
lastCompletedTxnOwnerInstant, Option.of(config)),
completedInstantsDuringCurrentWriteOperation);
final ConcurrentOperation thisOperation = new
ConcurrentOperation(currentTxnOwnerInstant.get(),
thisCommitMetadata.orElseGet(HoodieCommitMetadata::new));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d68e3e6a94a7..2486a398292a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -675,6 +675,16 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Number of heartbeat misses, before a writer is
deemed not alive and all pending writes are aborted.");
+ public static final ConfigProperty<Boolean>
CLUSTERING_BLOCK_FOR_PENDING_INGESTION = ConfigProperty
+
.key("hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution")
+ .defaultValue(false)
+ .markAdvanced()
+ .withDocumentation("Only applicable when
\"hoodie.write.concurrency.mode\" is set to OCC or NBCC and the conflict "
+ + "resolution strategy
(\"hoodie.write.conflict.resolution.strategy\") is set to "
+ + "PreferWriterConflictResolutionStrategy. When enabled, proactively
prevents clustering from committing if "
+ + "there are any ongoing ingestion writes that have not transitioned
from requested to inflight yet and have "
+ + "an active heartbeat, since ingestion may be targeting the same
files and should have precedence.");
+
public static final ConfigProperty<String> WRITE_CONCURRENCY_MODE =
ConfigProperty
.key("hoodie.write.concurrency.mode")
.defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
@@ -2665,6 +2675,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES);
}
+ public boolean isClusteringBlockForPendingIngestion() {
+ return getBooleanOrDefault(CLUSTERING_BLOCK_FOR_PENDING_INGESTION);
+ }
+
/**
* File listing metadata configs.
*/
@@ -3455,6 +3469,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withClusteringBlockForPendingIngestion(boolean enable) {
+ writeConfig.setValue(CLUSTERING_BLOCK_FOR_PENDING_INGESTION,
String.valueOf(enable));
+ return this;
+ }
+
public Builder withWriteConcurrencyMode(WriteConcurrencyMode
concurrencyMode) {
writeConfig.setValue(WRITE_CONCURRENCY_MODE, concurrencyMode.name());
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 3fa85d5c7878..0decbe290c3d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.storage.HoodieStorage;
import com.codahale.metrics.Counter;
@@ -116,6 +117,10 @@ public class HoodieMetrics {
private String conflictResolutionTimerName = null;
private String conflictResolutionSuccessCounterName = null;
private String conflictResolutionFailureCounterName = null;
+ private String conflictResolutionIngestionVsIngestionCounterName = null;
+ private String conflictResolutionIngestionVsTableServiceCounterName = null;
+ private String conflictResolutionTableServiceVsIngestionCounterName = null;
+ private String conflictResolutionTableServiceVsTableServiceCounterName =
null;
private String compactionRequestedCounterName = null;
private String compactionCompletedCounterName = null;
private String rollbackFailureCounterName = null;
@@ -135,6 +140,10 @@ public class HoodieMetrics {
private Timer conflictResolutionTimer = null;
private Counter conflictResolutionSuccessCounter = null;
private Counter conflictResolutionFailureCounter = null;
+ private Counter conflictResolutionIngestionVsIngestionCounter = null;
+ private Counter conflictResolutionIngestionVsTableServiceCounter = null;
+ private Counter conflictResolutionTableServiceVsIngestionCounter = null;
+ private Counter conflictResolutionTableServiceVsTableServiceCounter = null;
private Counter compactionRequestedCounter = null;
private Counter compactionCompletedCounter = null;
private Counter rollbackFailureCounter = null;
@@ -158,6 +167,10 @@ public class HoodieMetrics {
this.conflictResolutionTimerName =
getMetricsName(CONFLICT_RESOLUTION_STR, TIMER_METRIC);
this.conflictResolutionSuccessCounterName =
getMetricsName(CONFLICT_RESOLUTION_STR, SUCCESS_COUNTER);
this.conflictResolutionFailureCounterName =
getMetricsName(CONFLICT_RESOLUTION_STR, FAILURE_COUNTER);
+ this.conflictResolutionIngestionVsIngestionCounterName =
getMetricsName(CONFLICT_RESOLUTION_STR, "ingestion_vs_ingestion" +
COUNTER_METRIC_EXTENSION);
+ this.conflictResolutionIngestionVsTableServiceCounterName =
getMetricsName(CONFLICT_RESOLUTION_STR, "ingestion_vs_table_service" +
COUNTER_METRIC_EXTENSION);
+ this.conflictResolutionTableServiceVsIngestionCounterName =
getMetricsName(CONFLICT_RESOLUTION_STR, "table_service_vs_ingestion" +
COUNTER_METRIC_EXTENSION);
+ this.conflictResolutionTableServiceVsTableServiceCounterName =
getMetricsName(CONFLICT_RESOLUTION_STR, "table_service_vs_table_service" +
COUNTER_METRIC_EXTENSION);
this.compactionRequestedCounterName =
getMetricsName(HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.REQUESTED_COMPACTION_SUFFIX + COUNTER_METRIC_EXTENSION);
this.compactionCompletedCounterName =
getMetricsName(HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.COMPLETED_COMPACTION_SUFFIX + COUNTER_METRIC_EXTENSION);
this.rollbackFailureCounterName = getMetricsName("rollback",
FAILURE_COUNTER);
@@ -548,6 +561,35 @@ public class HoodieMetrics {
}
}
+ public void
emitConflictResolutionByCategory(HoodieWriteConflictException.ConflictCategory
category) {
+ if (config.isLockingMetricsEnabled()) {
+ switch (category) {
+ case INGESTION_VS_INGESTION:
+ conflictResolutionIngestionVsIngestionCounter = getCounter(
+ conflictResolutionIngestionVsIngestionCounter,
conflictResolutionIngestionVsIngestionCounterName);
+ conflictResolutionIngestionVsIngestionCounter.inc();
+ break;
+ case INGESTION_VS_TABLE_SERVICE:
+ conflictResolutionIngestionVsTableServiceCounter = getCounter(
+ conflictResolutionIngestionVsTableServiceCounter,
conflictResolutionIngestionVsTableServiceCounterName);
+ conflictResolutionIngestionVsTableServiceCounter.inc();
+ break;
+ case TABLE_SERVICE_VS_INGESTION:
+ conflictResolutionTableServiceVsIngestionCounter = getCounter(
+ conflictResolutionTableServiceVsIngestionCounter,
conflictResolutionTableServiceVsIngestionCounterName);
+ conflictResolutionTableServiceVsIngestionCounter.inc();
+ break;
+ case TABLE_SERVICE_VS_TABLE_SERVICE:
+ conflictResolutionTableServiceVsTableServiceCounter = getCounter(
+ conflictResolutionTableServiceVsTableServiceCounter,
conflictResolutionTableServiceVsTableServiceCounterName);
+ conflictResolutionTableServiceVsTableServiceCounter.inc();
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
public void emitCompactionRequested() {
if (config.isMetricsOn()) {
compactionRequestedCounter = getCounter(compactionRequestedCounter,
compactionRequestedCounterName);
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
index 80561f74e14d..f59490c77aef 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
@@ -19,13 +19,17 @@
package org.apache.hudi.client.transaction;
import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.junit.jupiter.api.Assertions;
@@ -251,4 +255,309 @@ public class TestPreferWriterConflictResolutionStrategy
extends HoodieCommonTest
// expected
}
}
+
+ /**
+ * Confirms that when {@code
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+ * is enabled, clustering will detect a conflict with an ingestion
.requested instant
+ * that has an active heartbeat, via hasConflict/resolveConflict.
+ */
+ @Test
+ public void
testClusterConflictingWithIngestionRequestedInstantWithActiveHeartbeat() throws
Exception {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withClusteringBlockForPendingIngestion(true)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withHeartbeatTolerableMisses(2)
+ .build();
+
+ createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // clustering gets scheduled and goes inflight
+ String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+ createClusterRequested(currentWriterInstant, metaClient);
+ createClusterInflight(currentWriterInstant, metaClient);
+
+ // ingestion writer creates a .requested instant with active heartbeat
+ String activeIngestionInstantTime =
WriteClientTestUtils.createNewInstantTime();
+
HoodieTestTable.of(metaClient).addRequestedCommit(activeIngestionInstantTime);
+ HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+ metaClient.getStorage(), metaClient.getBasePath().toString(),
+ (long) (1000 * 60), 5);
+ heartbeatClient.start(activeIngestionInstantTime);
+
+ Option<HoodieInstant> currentInstant = Option.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+
+ try {
+ List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+ metaClient, currentInstant.get(), lastSuccessfulInstant,
Option.of(writeConfig))
+ .collect(Collectors.toList());
+ // The .requested instant with active heartbeat should be returned as a
candidate
+ Assertions.assertEquals(1, candidateInstants.size());
+ Assertions.assertEquals(activeIngestionInstantTime,
candidateInstants.get(0).requestedTime());
+
+ HoodieReplaceCommitMetadata clusteringMetadata = new
HoodieReplaceCommitMetadata();
+ clusteringMetadata.setOperationType(WriteOperationType.CLUSTER);
+ ConcurrentOperation thisOperation = new
ConcurrentOperation(currentInstant.get(), clusteringMetadata);
+ ConcurrentOperation otherOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
+
+ // hasConflict should detect the conflict
+ Assertions.assertTrue(strategy.hasConflict(thisOperation,
otherOperation));
+
+ // resolveConflict should throw with TABLE_SERVICE_VS_INGESTION category
+ HoodieWriteConflictException thrown = Assertions.assertThrows(
+ HoodieWriteConflictException.class,
+ () -> strategy.resolveConflict(null, thisOperation, otherOperation));
+ Assertions.assertTrue(thrown.getCategory().isPresent());
+
Assertions.assertEquals(HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION,
+ thrown.getCategory().get());
+ } finally {
+ heartbeatClient.stop(activeIngestionInstantTime);
+ heartbeatClient.close();
+ }
+ }
+
+ /**
+ * Confirms that clustering does NOT fail for pending ingestion .requested
instants
+ * when {@code
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+ * is disabled (default behavior).
+ */
+ @Test
+ public void testClusterDoesNotBlockWithoutConfigEnabled() throws Exception {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withClusteringBlockForPendingIngestion(false)
+ .build();
+
+ createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // clustering gets scheduled and goes inflight
+ String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+ createClusterRequested(currentWriterInstant, metaClient);
+ createClusterInflight(currentWriterInstant, metaClient);
+
+ // ingestion writer creates a .requested instant with active heartbeat
+ String ingestionInstantTime = WriteClientTestUtils.createNewInstantTime();
+ HoodieTestTable.of(metaClient).addRequestedCommit(ingestionInstantTime);
+ HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+ metaClient.getStorage(), metaClient.getBasePath().toString(),
+ (long) (1000 * 60), 5);
+ heartbeatClient.start(ingestionInstantTime);
+
+ Option<HoodieInstant> currentInstant = Option.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+
+ // With
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution disabled,
+ // clustering should NOT fail even though there's an active heartbeat
+ List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+ metaClient, currentInstant.get(), lastSuccessfulInstant,
Option.of(writeConfig))
+ .collect(Collectors.toList());
+ Assertions.assertEquals(0, candidateInstants.size());
+
+ heartbeatClient.stop(ingestionInstantTime);
+ heartbeatClient.close();
+ }
+
+ /**
+ * Confirms that when getCandidateInstants is called without a write config,
+ * it delegates properly and uses defaults
+ * ({@code
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+ * disabled by default).
+ */
+ @Test
+ public void testClusterOldMethodDoesNotBlockByDefault() throws Exception {
+ createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // clustering gets scheduled and goes inflight
+ String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+ createClusterRequested(currentWriterInstant, metaClient);
+ createClusterInflight(currentWriterInstant, metaClient);
+
+ // ingestion writer creates a .requested instant with active heartbeat
+ String ingestionInstantTime = WriteClientTestUtils.createNewInstantTime();
+ HoodieTestTable.of(metaClient).addRequestedCommit(ingestionInstantTime);
+ HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+ metaClient.getStorage(), metaClient.getBasePath().toString(),
+ (long) (1000 * 60), 5);
+ heartbeatClient.start(ingestionInstantTime);
+
+ Option<HoodieInstant> currentInstant = Option.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+
+ // Without write config, should NOT throw since the default is
+ // hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution
= false
+ List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+ metaClient, currentInstant.get(), lastSuccessfulInstant)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(0, candidateInstants.size());
+
+ heartbeatClient.stop(ingestionInstantTime);
+ heartbeatClient.close();
+ }
+
+ /**
+ * Confirms that when {@code
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+ * is enabled and there is an inflight ingestion instant, it is returned as
a candidate
+ * (exercises the i.isInflight() return path in the filter).
+ */
+ @Test
+ public void testClusterWithBlockingEnabledAndInflightIngestion() throws
Exception {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withClusteringBlockForPendingIngestion(true)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withHeartbeatTolerableMisses(2)
+ .build();
+
+ createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // clustering gets scheduled and goes inflight
+ String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+ createClusterRequested(currentWriterInstant, metaClient);
+ createClusterInflight(currentWriterInstant, metaClient);
+
+ // ingestion writer creates an inflight commit
+ String ingestionInstantTime = WriteClientTestUtils.createNewInstantTime();
+ createInflightCommit(ingestionInstantTime, metaClient);
+
+ Option<HoodieInstant> currentInstant = Option.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+
+ List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+ metaClient, currentInstant.get(), lastSuccessfulInstant,
Option.of(writeConfig))
+ .collect(Collectors.toList());
+ // The inflight ingestion instant should be returned as a candidate
+ Assertions.assertEquals(1, candidateInstants.size());
+ Assertions.assertEquals(ingestionInstantTime,
candidateInstants.get(0).requestedTime());
+ }
+
+ /**
+ * Confirms that when {@code
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+ * is enabled and there is both an inflight and an expired-heartbeat
requested ingestion instant,
+ * only the inflight is returned as a candidate.
+ */
+ @Test
+ public void testClusterWithBlockingEnabledInflightAndExpiredRequested()
throws Exception {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withClusteringBlockForPendingIngestion(true)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withHeartbeatTolerableMisses(2)
+ .build();
+
+ createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // clustering gets scheduled and goes inflight
+ String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+ createClusterRequested(currentWriterInstant, metaClient);
+ createClusterInflight(currentWriterInstant, metaClient);
+
+ // expired requested ingestion instant (no heartbeat)
+ String expiredInstantTime = WriteClientTestUtils.createNewInstantTime();
+ HoodieTestTable.of(metaClient).addRequestedCommit(expiredInstantTime);
+
+ // active inflight ingestion instant
+ String inflightInstantTime = WriteClientTestUtils.createNewInstantTime();
+ createInflightCommit(inflightInstantTime, metaClient);
+
+ Option<HoodieInstant> currentInstant = Option.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+
+ List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+ metaClient, currentInstant.get(), lastSuccessfulInstant,
Option.of(writeConfig))
+ .collect(Collectors.toList());
+ // Only the inflight should be returned; expired requested should be
filtered out
+ Assertions.assertEquals(1, candidateInstants.size());
+ Assertions.assertEquals(inflightInstantTime,
candidateInstants.get(0).requestedTime());
+ }
+
+ /**
+ * Confirms that when the .requested instant has an expired heartbeat (no
heartbeat file),
+ * clustering does NOT treat it as a conflict even when
+ * {@code
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution} is
enabled.
+ */
+ @Test
+ public void testClusterWithBlockingEnabledAndExpiredHeartbeatRequested()
throws Exception {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withClusteringBlockForPendingIngestion(true)
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withHeartbeatTolerableMisses(2)
+ .build();
+
+ createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // clustering gets scheduled and goes inflight
+ String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+ createClusterRequested(currentWriterInstant, metaClient);
+ createClusterInflight(currentWriterInstant, metaClient);
+
+ // ingestion writer creates a .requested instant but never starts a
heartbeat (simulates expired/dead writer)
+ String expiredIngestionInstantTime =
WriteClientTestUtils.createNewInstantTime();
+
HoodieTestTable.of(metaClient).addRequestedCommit(expiredIngestionInstantTime);
+
+ Option<HoodieInstant> currentInstant = Option.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+
+ // The .requested instant with expired heartbeat should be filtered out
+ List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+ metaClient, currentInstant.get(), lastSuccessfulInstant,
Option.of(writeConfig))
+ .collect(Collectors.toList());
+ Assertions.assertEquals(0, candidateInstants.size());
+ }
+
+ /**
+ * Confirms that compaction (non-clustering table service) when write config
is provided
+ * still picks up inflight ingestion instants as candidates.
+ */
+ @Test
+ public void testCompactionWithInflightIngestionViaNewOverload() throws
Exception {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withClusteringBlockForPendingIngestion(true)
+ .build();
+
+ createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // writer 1 starts (inflight ingestion)
+ String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+ createInflightCommit(currentWriterInstant, metaClient);
+
+ // compaction gets scheduled
+ String compactionInstantTime = WriteClientTestUtils.createNewInstantTime();
+ createCompactionRequested(compactionInstantTime, metaClient);
+
+ Option<HoodieInstant> currentInstant = Option.of(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime));
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+
+ // Compaction is not clustering, so .requested instants are not included
even when
+ // hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution
is enabled
+ List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+ metaClient, currentInstant.get(), lastSuccessfulInstant,
Option.of(writeConfig))
+ .collect(Collectors.toList());
+ Assertions.assertEquals(1, candidateInstants.size());
+ Assertions.assertEquals(currentWriterInstant,
candidateInstants.get(0).requestedTime());
+ }
+
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index f28f80672e1b..b785d9025288 100755
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import com.codahale.metrics.Timer;
@@ -354,4 +355,38 @@ public class TestHoodieMetrics {
// Verify the original exception type counter is unchanged
assertEquals(2,
metrics.getRegistry().getCounters().get(exceptionMetricName).getCount());
}
+
+ @Test
+ public void testConflictResolutionByCategoryMetrics() {
+ when(writeConfig.isLockingMetricsEnabled()).thenReturn(true);
+
+ String tableServiceVsIngestion = hoodieMetrics.getMetricsName(
+ HoodieMetrics.CONFLICT_RESOLUTION_STR, "table_service_vs_ingestion" +
COUNTER_METRIC_EXTENSION);
+ String ingestionVsIngestion = hoodieMetrics.getMetricsName(
+ HoodieMetrics.CONFLICT_RESOLUTION_STR, "ingestion_vs_ingestion" +
COUNTER_METRIC_EXTENSION);
+ String ingestionVsTableService = hoodieMetrics.getMetricsName(
+ HoodieMetrics.CONFLICT_RESOLUTION_STR, "ingestion_vs_table_service" +
COUNTER_METRIC_EXTENSION);
+ String tableServiceVsTableService = hoodieMetrics.getMetricsName(
+ HoodieMetrics.CONFLICT_RESOLUTION_STR,
"table_service_vs_table_service" + COUNTER_METRIC_EXTENSION);
+
+ hoodieMetrics.emitConflictResolutionByCategory(
+
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION);
+ assertEquals(1,
metrics.getRegistry().getCounters().get(tableServiceVsIngestion).getCount());
+
+ hoodieMetrics.emitConflictResolutionByCategory(
+
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION);
+ assertEquals(2,
metrics.getRegistry().getCounters().get(tableServiceVsIngestion).getCount());
+
+ hoodieMetrics.emitConflictResolutionByCategory(
+ HoodieWriteConflictException.ConflictCategory.INGESTION_VS_INGESTION);
+ assertEquals(1,
metrics.getRegistry().getCounters().get(ingestionVsIngestion).getCount());
+
+ hoodieMetrics.emitConflictResolutionByCategory(
+
HoodieWriteConflictException.ConflictCategory.INGESTION_VS_TABLE_SERVICE);
+ assertEquals(1,
metrics.getRegistry().getCounters().get(ingestionVsTableService).getCount());
+
+ hoodieMetrics.emitConflictResolutionByCategory(
+
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_TABLE_SERVICE);
+ assertEquals(1,
metrics.getRegistry().getCounters().get(tableServiceVsTableService).getCount());
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieWriteConflictException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieWriteConflictException.java
index f0f6dcbf0ab1..f2685e7854dc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieWriteConflictException.java
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieWriteConflictException.java
@@ -18,6 +18,8 @@
package org.apache.hudi.exception;
+import org.apache.hudi.common.util.Option;
+
/**
* <p>
* Exception thrown for Hoodie failures. The root of the exception hierarchy.
@@ -29,15 +31,44 @@ package org.apache.hudi.exception;
*/
public class HoodieWriteConflictException extends HoodieException {
+ /**
+ * Categorizes the two sides of a write conflict for metrics and diagnostics.
+ */
+ public enum ConflictCategory {
+ INGESTION_VS_INGESTION,
+ INGESTION_VS_TABLE_SERVICE,
+ TABLE_SERVICE_VS_INGESTION,
+ TABLE_SERVICE_VS_TABLE_SERVICE
+ }
+
+ private final Option<ConflictCategory> category;
+
public HoodieWriteConflictException(String msg) {
super(msg);
+ this.category = Option.empty();
}
public HoodieWriteConflictException(Throwable e) {
super(e);
+ this.category = Option.empty();
}
public HoodieWriteConflictException(String msg, Throwable e) {
super(msg, e);
+ this.category = Option.empty();
+ }
+
+ public HoodieWriteConflictException(ConflictCategory category, String msg) {
+ super(msg);
+ this.category = Option.of(category);
+ }
+
+ public HoodieWriteConflictException(ConflictCategory category, String msg,
Throwable e) {
+ super(msg, e);
+ this.category = Option.of(category);
+ }
+
+ public Option<ConflictCategory> getCategory() {
+ return category;
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 83ccf54b5079..fcb08a2e2706 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
import
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
@@ -62,6 +63,7 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
@@ -1893,6 +1895,66 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
return clusteringInstant;
}
+ /**
+ * Verifies that clustering fails with {@link HoodieWriteConflictException}
when
+ * {@code
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution} is
enabled
+ * and an ingestion .requested instant with an active heartbeat exists.
+ */
+ @Test
+ public void testClusteringFailsOnPendingIngestionRequestedInstant() throws
Exception {
+ Properties properties = getDisabledRowWriterProperties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+ HoodieCleanConfig cleanConfig =
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
+
+ // Insert base data with a regular ingestion writer
+ HoodieWriteConfig insertWriteConfig = getConfigBuilder()
+ .withCleanConfig(cleanConfig)
+ .withLockConfig(createLockConfig(new
PreferWriterConflictResolutionStrategy()))
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withProperties(properties)
+ .build();
+ SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
+
+ int numRecords = 200;
+ String firstCommit = WriteClientTestUtils.createNewInstantTime();
+ String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionStr});
+ writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")),
"000",
+ numRecords, dataGenerator::generateInserts,
SparkRDDWriteClient::insert, true, numRecords, numRecords,
+ 1, INSTANT_GENERATOR);
+
+ // Simulate an ingestion writer that has created a .requested commit with
an active heartbeat
+ String ingestionRequestedTime =
WriteClientTestUtils.createNewInstantTime();
+ HoodieTestTable.of(metaClient).addRequestedCommit(ingestionRequestedTime);
+ HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+ metaClient.getStorage(), metaClient.getBasePath().toString(),
+ (long) (1000 * 60), 5);
+ heartbeatClient.start(ingestionRequestedTime);
+
+ try {
+ // Schedule and execute clustering with blocking enabled — should fail
during conflict
+ // resolution because it detects the active ingestion .requested instant
+ HoodieWriteConfig clusteringWriteConfig = getConfigBuilder()
+ .withCleanConfig(cleanConfig)
+ .withClusteringConfig(createClusteringBuilder(true, 1).build())
+
.withPreCommitValidatorConfig(createPreCommitValidatorConfig(numRecords))
+ .withLockConfig(createLockConfig(new
PreferWriterConflictResolutionStrategy()))
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withClusteringBlockForPendingIngestion(true)
+ .withProperties(properties)
+ .build();
+
+ SparkRDDWriteClient<?> clusteringWriteClient =
getHoodieWriteClient(clusteringWriteConfig);
+ String clusteringCommitTime =
clusteringWriteClient.scheduleClustering(Option.empty()).get();
+ HoodieClusteringException exception =
assertThrows(HoodieClusteringException.class,
+ () -> clusteringWriteClient.cluster(clusteringCommitTime, true));
+ assertTrue(exception.getCause() instanceof HoodieWriteConflictException);
+ } finally {
+ heartbeatClient.stop(ingestionRequestedTime);
+ heartbeatClient.close();
+ }
+ }
+
public static class FailingPreCommitValidator<T extends HoodieRecordPayload,
I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I,
K, O> {
public FailingPreCommitValidator(HoodieSparkTable table,
HoodieEngineContext context, HoodieWriteConfig config) {