yihua commented on a change in pull request #4212:
URL: https://github.com/apache/hudi/pull/4212#discussion_r806223924
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -254,6 +254,12 @@
.withDocumentation("The average record size. If not explicitly
specified, hudi will compute the "
+ "record size estimate compute dynamically based on commit
metadata. "
+ " This is critical in computing the insert parallelism and
bin-packing inserts into small files.");
+
+ public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS =
ConfigProperty
+ .key("hoodie.allow.multiple.cleans")
Review comment:
nit: rename to `hoodie.clean.allow.multiple` or
`hoodie.cleaner.allow.multiple`? Also, shouldn't all clean configs be in new
`HoodieCleanConfig` class (can be a separate PR)?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -254,6 +254,12 @@
.withDocumentation("The average record size. If not explicitly
specified, hudi will compute the "
+ "record size estimate compute dynamically based on commit
metadata. "
+ " This is critical in computing the insert parallelism and
bin-packing inserts into small files.");
+
+ public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS =
ConfigProperty
+ .key("hoodie.allow.multiple.cleans")
+ .defaultValue(true)
+ .withDocumentation("Allows scheduling/executing multiple cleans by
enabling this config. If users prefer to strictly ensure clean requests should
be mutually exclusive, "
Review comment:
nit: add `.sinceVersion("0.11.0")`?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -721,21 +721,28 @@ public HoodieCleanMetadata clean(String cleanInstantTime,
boolean skipLocking) t
* @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline, boolean skipLocking) throws HoodieIOException {
- if (scheduleInline) {
- scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
- }
- LOG.info("Cleaner started");
final Timer.Context timerContext = metrics.getCleanCtx();
- LOG.info("Cleaned failed attempts if any");
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
- HoodieCleanMetadata metadata = createTable(config,
hadoopConf).clean(context, cleanInstantTime, skipLocking);
- if (timerContext != null && metadata != null) {
- long durationMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
- LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
- + " Earliest Retained Instant :" +
metadata.getEarliestCommitToRetain()
- + " cleanerElapsedMs" + durationMs);
+
+ HoodieCleanMetadata metadata = null;
+ HoodieTable table = createTable(config, hadoopConf);
+ if (config.allowMultipleCleans() ||
!table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent())
{
+ LOG.info("Cleaner started");
+ // proceed only if multiple clean schedules are enabled or if there are
no pending cleans.
+ if (scheduleInline) {
+ scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
+ table.getMetaClient().reloadActiveTimeline();
Review comment:
Since the clean action executor does the refreshing, is this actually
needed?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -721,21 +721,28 @@ public HoodieCleanMetadata clean(String cleanInstantTime,
boolean skipLocking) t
* @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline, boolean skipLocking) throws HoodieIOException {
- if (scheduleInline) {
- scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
- }
- LOG.info("Cleaner started");
final Timer.Context timerContext = metrics.getCleanCtx();
- LOG.info("Cleaned failed attempts if any");
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
- HoodieCleanMetadata metadata = createTable(config,
hadoopConf).clean(context, cleanInstantTime, skipLocking);
- if (timerContext != null && metadata != null) {
- long durationMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
- LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
- + " Earliest Retained Instant :" +
metadata.getEarliestCommitToRetain()
- + " cleanerElapsedMs" + durationMs);
+
+ HoodieCleanMetadata metadata = null;
+ HoodieTable table = createTable(config, hadoopConf);
+ if (config.allowMultipleCleans() ||
!table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent())
{
+ LOG.info("Cleaner started");
+ // proceed only if multiple clean schedules are enabled or if there are
no pending cleans.
+ if (scheduleInline) {
+ scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
+ table.getMetaClient().reloadActiveTimeline();
Review comment:
Only refresh timeline when multiple cleans are not allowed, after the
clean action is done?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -721,21 +721,28 @@ public HoodieCleanMetadata clean(String cleanInstantTime,
boolean skipLocking) t
* @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline, boolean skipLocking) throws HoodieIOException {
- if (scheduleInline) {
- scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
- }
- LOG.info("Cleaner started");
final Timer.Context timerContext = metrics.getCleanCtx();
- LOG.info("Cleaned failed attempts if any");
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
- HoodieCleanMetadata metadata = createTable(config,
hadoopConf).clean(context, cleanInstantTime, skipLocking);
- if (timerContext != null && metadata != null) {
- long durationMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
- LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
- + " Earliest Retained Instant :" +
metadata.getEarliestCommitToRetain()
- + " cleanerElapsedMs" + durationMs);
+
+ HoodieCleanMetadata metadata = null;
+ HoodieTable table = createTable(config, hadoopConf);
+ if (config.allowMultipleCleans() ||
!table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent())
{
+ LOG.info("Cleaner started");
+ // proceed only if multiple clean schedules are enabled or if there are
no pending cleans.
+ if (scheduleInline) {
+ scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
+ table.getMetaClient().reloadActiveTimeline();
+ metadata = table.clean(context, cleanInstantTime, skipLocking);
+ }
+
+ if (timerContext != null && metadata != null) {
+ long durationMs = metrics.getDurationInMs(timerContext.stop());
+ metrics.updateCleanMetrics(durationMs,
metadata.getTotalFilesDeleted());
+ LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ + " Earliest Retained Instant :" +
metadata.getEarliestCommitToRetain()
+ + " cleanerElapsedMs" + durationMs);
+ }
Review comment:
should this be outside the if branch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]