nsivabalan commented on code in PR #18302:
URL: https://github.com/apache/hudi/pull/18302#discussion_r2956976273
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
Review Comment:
why public ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isExpirationOfClusteringEnabled()
+ && hasInstantExpired(metaClient, instant.requestedTime(),
config.getClusteringExpirationTimeMins())
+ && ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ }
+
+ private static boolean hasInstantExpired(HoodieTableMetaClient metaClient,
String instantTime, long expirationMins) {
+ try {
+ ZoneId zoneId =
metaClient.getTableConfig().getTimelineTimezone().getZoneId();
+ LocalDateTime instantDateTime = LocalDateTime.parse(
+ HoodieInstantTimeGenerator.fixInstantTimeCompatibility(instantTime),
+ HoodieInstantTimeGenerator.MILLIS_INSTANT_TIME_FORMATTER);
+ long instantEpochMs =
instantDateTime.atZone(zoneId).toInstant().toEpochMilli();
+ long ageMs = System.currentTimeMillis() - instantEpochMs;
Review Comment:
should we do something like
```
ZonedDateTime latestDateTime =
ZonedDateTime.ofInstant(java.time.Instant.now(),
table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
long currentTimeMs = latestDateTime.toInstant().toEpochMilli();
long replaceCommitInstantTime =
HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime).toInstant().toEpochMilli();
long ageMs = currentTimeMs - replaceCommitInstantTime;
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java:
##########
@@ -235,14 +237,49 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Columns to sort the data by when clustering");
+ static final String SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME =
+
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy";
+
public static final ConfigProperty<String> UPDATES_STRATEGY = ConfigProperty
.key("hoodie.clustering.updates.strategy")
.defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy")
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
+ if
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+ return Option.of(SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME);
+ }
+ return Option.empty();
+ })
.markAdvanced()
.sinceVersion("0.7.0")
.withDocumentation("Determines how to handle updates, deletes to file
groups that are under clustering."
+ " Default strategy just rejects the update");
+ public static final ConfigProperty<Boolean> ENABLE_EXPIRATIONS =
ConfigProperty
+ .key("hoodie.clustering.enable.expirations")
+ .defaultValue(false)
+ .withInferFunction(cfg -> {
Review Comment:
same comment as above.
lets validate if setting both default value and infer func works
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java:
##########
@@ -289,6 +293,74 @@ private int doPurgePendingInstant(JavaSparkContext jsc)
throws Exception {
return 0;
}
+ /**
+ * Returns the instant times of all pending clustering plans that target any
of the given partitions.
+ *
+ * @param metaClient the table meta client
+ * @param partitions list of partition paths to check against pending
clustering plans
+ * @return list of clustering instant times targeting the given partitions
+ */
+ public static List<String> getPendingClusteringInstantsForPartitions(
+ HoodieTableMetaClient metaClient,
+ List<String> partitions) {
+ Set<String> partitionSet = partitions.stream().collect(Collectors.toSet());
+ return ClusteringUtils.getAllPendingClusteringPlans(metaClient)
+ .filter(planPair -> {
+ HoodieClusteringPlan plan = planPair.getRight();
+ return plan.getInputGroups().stream()
+ .flatMap(group -> group.getSlices().stream())
+ .map(slice -> slice.getPartitionPath())
+ .anyMatch(partitionSet::contains);
+ })
+ .map(planPair -> planPair.getLeft().requestedTime())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Rolls back pending clustering instants that target any of the given
partitions,
+ * are eligible for rollback (config enabled, old enough, and a clustering
instant),
+ * and whose heartbeat has expired (indicating the clustering job is no
longer alive).
+ *
+ * @param client the write client to use for rollback operations
+ * @param metaClient the table meta client
+ * @param partitions list of partition paths to check against pending
clustering plans
+ */
+ public static void rollbackFailedClusteringForPartitions(
+ SparkRDDWriteClient<?> client,
+ HoodieTableMetaClient metaClient,
+ List<String> partitions) {
+ long maxAllowableHeartbeatIntervalInMs =
client.getConfig().getHoodieClientHeartbeatIntervalInMs()
+ * client.getConfig().getHoodieClientHeartbeatTolerableMisses();
+ String basePath = metaClient.getBasePath().toString();
+
+ getPendingClusteringInstantsForPartitions(metaClient, partitions).stream()
+ .filter(instantTime -> {
+ HoodieInstant instant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, instantTime);
Review Comment:
is it not possible to reuse the writeClient to rollback an expired
clustering.
I am looking to avoid having duplicate codes.
for eg, if we chance the way we deduce the expiration of a clustering
instant, we have to fix in two places.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java:
##########
@@ -235,14 +237,49 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Columns to sort the data by when clustering");
+ static final String SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME =
+
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy";
+
public static final ConfigProperty<String> UPDATES_STRATEGY = ConfigProperty
.key("hoodie.clustering.updates.strategy")
.defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy")
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
+ if
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+ return Option.of(SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME);
+ }
+ return Option.empty();
+ })
.markAdvanced()
.sinceVersion("0.7.0")
.withDocumentation("Determines how to handle updates, deletes to file
groups that are under clustering."
+ " Default strategy just rejects the update");
+ public static final ConfigProperty<Boolean> ENABLE_EXPIRATIONS =
ConfigProperty
+ .key("hoodie.clustering.enable.expirations")
+ .defaultValue(false)
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
+ if
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+ return Option.of(true);
Review Comment:
can we leave this false for OOB users. this is slightly orthogonal feature
right.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isExpirationOfClusteringEnabled()
Review Comment:
can we re-order the checks
`config.isExpirationOfClusteringEnabled() &&
ClusteringUtils.isClusteringInstant(
metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator()) && hasInstantExpired(metaClient,
instant.requestedTime(), config.getClusteringExpirationTimeMins()
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isExpirationOfClusteringEnabled()
+ && hasInstantExpired(metaClient, instant.requestedTime(),
config.getClusteringExpirationTimeMins())
+ && ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ }
+
+ private static boolean hasInstantExpired(HoodieTableMetaClient metaClient,
String instantTime, long expirationMins) {
Review Comment:
are we not checking the heart beat expiry, but just purely based on the
instant time?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isExpirationOfClusteringEnabled()
+ && hasInstantExpired(metaClient, instant.requestedTime(),
config.getClusteringExpirationTimeMins())
+ && ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ }
+
+ private static boolean hasInstantExpired(HoodieTableMetaClient metaClient,
String instantTime, long expirationMins) {
Review Comment:
should we check from the time the heart beat expired and it we have elapsed
the expiration interval?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isExpirationOfClusteringEnabled()
+ && hasInstantExpired(metaClient, instant.requestedTime(),
config.getClusteringExpirationTimeMins())
+ && ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ }
+
+ private static boolean hasInstantExpired(HoodieTableMetaClient metaClient,
String instantTime, long expirationMins) {
+ try {
+ ZoneId zoneId =
metaClient.getTableConfig().getTimelineTimezone().getZoneId();
+ LocalDateTime instantDateTime = LocalDateTime.parse(
+ HoodieInstantTimeGenerator.fixInstantTimeCompatibility(instantTime),
+ HoodieInstantTimeGenerator.MILLIS_INSTANT_TIME_FORMATTER);
+ long instantEpochMs =
instantDateTime.atZone(zoneId).toInstant().toEpochMilli();
+ long ageMs = System.currentTimeMillis() - instantEpochMs;
+ return ageMs >= TimeUnit.MINUTES.toMillis(expirationMins);
+ } catch (DateTimeParseException e) {
+ log.warn("Could not parse instant time {}, assuming it has expired",
instantTime, e);
+ return true;
Review Comment:
won't this unintentionally rollback an on-going or in progress clustering?
should we throw instead?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java:
##########
@@ -235,14 +237,49 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Columns to sort the data by when clustering");
+ static final String SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME =
+
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy";
+
public static final ConfigProperty<String> UPDATES_STRATEGY = ConfigProperty
.key("hoodie.clustering.updates.strategy")
.defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy")
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
Review Comment:
can you write some UTs for this.
what does it mean to have a default value set(L245) and also infer
func(L246).
shouldn't only one of them will take effect?
If yes, then, we should fix L247 to set
`org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy`
as default
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]