yihua commented on code in PR #18350:
URL: https://github.com/apache/hudi/pull/18350#discussion_r3037211444
##########
hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java:
##########
@@ -38,14 +38,16 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
Review Comment:
🤖 nit: The updated Javadoc for `InProcessLockProvider` is much clearer about
the design choice to disallow re-entrancy and its consistency with distributed
locks. Good clarification.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java:
##########
@@ -68,15 +68,14 @@ public class HoodieInstantTimeGenerator {
* Returns next instant time in the correct format.
* Ensures each instant time is at least 1 millisecond apart since we create
instant times at millisecond granularity.
*
- * @param shouldLock Whether the lock should be enabled to get the
instant time.
* @param timeGenerator TimeGenerator used to generate the instant time.
* @param milliseconds Milliseconds to add to current time while generating
the new instant time
*/
- public static String createNewInstantTime(boolean shouldLock, TimeGenerator
timeGenerator, long milliseconds) {
+ public static String createNewInstantTime(TimeGenerator timeGenerator, long
milliseconds) {
return LAST_INSTANT_TIME.updateAndGet((oldVal) -> {
String newCommitTime;
do {
- Date d = new Date(timeGenerator.generateTime(!shouldLock) +
milliseconds);
+ Date d = new Date(timeGenerator.generateTime() + milliseconds);
Review Comment:
🤖 nit: Changing `ZoneId.systemDefault()` to `commitTimeZone.getZoneId()` is
a good improvement for consistency and correctness, ensuring the configured
time zone is always used.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2171,30 +2172,32 @@ static HoodieActiveTimeline
runPendingTableServicesOperationsAndRefreshTimeline(
* 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
* deltacommit.
*/
- void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> writeClient,
Option<String> latestDeltaCommitTimeOpt) {
+ void compactIfNecessary(final BaseHoodieWriteClient<?,I,?,O> writeClient,
Option<String> latestDeltaCommitTimeOpt) {
// IMPORTANT: Trigger compaction with max instant time that is smaller
than(or equals) the earliest pending instant from DT.
// The compaction planner will manage to filter out the log files that
finished with greater completion time.
// see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for more
details.
HoodieTimeline metadataCompletedTimeline =
metadataMetaClient.getActiveTimeline().filterCompletedInstants();
- final String compactionInstantTime = dataMetaClient.reloadActiveTimeline()
+ final Option<String> restrictedCompactionInstantTimeOpt =
dataMetaClient.reloadActiveTimeline()
// The filtering strategy is kept in line with the rollback premise,
if an instant is pending on DT but completed on MDT,
// generates a compaction time smaller than it so that the instant
could then been rolled back.
.filterInflightsAndRequested().filter(instant ->
metadataCompletedTimeline.containsInstant(instant.requestedTime())).firstInstant()
// minus the pending instant time by 1 millisecond to avoid conflicts
on the MDT.
- .map(instant ->
HoodieInstantTimeGenerator.instantTimeMinusMillis(instant.requestedTime(), 1L))
- .orElse(writeClient.createNewInstantTime(false));
+ .map(instant ->
HoodieInstantTimeGenerator.instantTimeMinusMillis(instant.requestedTime(), 1L));
// we need to avoid checking compaction w/ same instant again.
// let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
- boolean skipCompactions =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+ boolean skipCompactions = restrictedCompactionInstantTimeOpt.map(
+ compactionInstantTime ->
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)).orElse(false);
try {
if (skipCompactions) {
- LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
- } else if
(writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
- LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
- writeClient.compact(compactionInstantTime, true);
+ LOG.info("Compaction with same {} time is already present in the
timeline.", restrictedCompactionInstantTimeOpt.get());
+ } else {
+ Option<String> scheduledInstantTimeOpt =
scheduleCompaction(writeClient, restrictedCompactionInstantTimeOpt);
+ if (scheduledInstantTimeOpt.isPresent()) {
+ writeClient.compact(scheduledInstantTimeOpt.get(), true);
+ }
}
} catch (Exception e) {
metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
Review Comment:
🤖 [Line 2205] nit: Similar to line 2177,
`restrictedCompactionInstantTimeOpt.get()` is used here without an
`isPresent()` check. A `NoSuchElementException` is possible if the `Option` is
empty.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java:
##########
@@ -86,7 +86,7 @@ public TimelineArchiverV2(HoodieWriteConfig config,
HoodieTable<T, I, K, O> tabl
this.config = config;
this.table = table;
this.metaClient = table.getMetaClient();
- this.txnManager = new TransactionManager(config, table.getStorage());
+ this.txnManager = table.getTxnManager().get();
Review Comment:
🤖 nit: Similar to `TimelineArchiverV1`, consider checking
`table.getTxnManager().isPresent()` before calling `.get()` for robustness.
##########
hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java:
##########
Review Comment:
**Line 68:** 🤖 nit: Adding `// Disallow re-entrant behavior;` explicitly
explains the design choice here, which is helpful.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java:
##########
Review Comment:
**Line 184:** 🤖 nit: Using a method reference `cleanActionExecutor::execute`
is a more concise and readable way to pass the executable to `assertThrows`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java:
##########
@@ -104,7 +104,7 @@ public TimelineArchiverV1(HoodieWriteConfig config,
HoodieTable<T, I, K, O> tabl
this.table = table;
this.metaClient = table.getMetaClient();
this.archiveFilePath =
ArchivedTimelineV1.getArchiveLogPath(metaClient.getArchivePath());
- this.txnManager = new TransactionManager(config,
table.getMetaClient().getStorage());
+ this.txnManager = table.getTxnManager().get();
Review Comment:
🤖 nit: It would be safer to check `table.getTxnManager().isPresent()` before
calling `.get()` here, or handle the `Option` more gracefully, to avoid
potential `NoSuchElementException`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -226,11 +227,11 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K,
O> table, HoodieInstan
);
this.txnManager.beginStateChange(Option.of(inflightInstant),
Option.empty());
writeTableMetadata(metadata, inflightInstant.requestedTime());
- table.getActiveTimeline().transitionCleanInflightToComplete(
- false,
- inflightInstant,
- Option.of(metadata),
- completedInstant ->
table.getMetaClient().getTableFormat().clean(metadata, completedInstant,
table.getContext(), table.getMetaClient(), table.getViewManager()));
+ TableFormatCompletionAction formatCompletionAction = completedInstant ->
table.getMetaClient().getTableFormat()
+ .clean(metadata, completedInstant, table.getContext(),
table.getMetaClient(), table.getViewManager());
+ HoodieInstant completedInstant =
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
Option.of(metadata), txnManager.generateInstantTime());
+ // FIXME-vc: this is an one off..
Review Comment:
🤖 nit: This `// FIXME-vc:` comment indicates a known issue or temporary
solution. Could you clarify its purpose or if it's meant to be addressed soon?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java:
##########
@@ -47,17 +46,12 @@ protected HoodieRollbackMetadata
rollbackInstant(HoodieInstant instantToRollback
&&
!ClusteringUtils.isClusteringOrReplaceCommitAction(instantToRollback.getAction()))
{
throw new HoodieRollbackException("Unsupported action in rollback
instant:" + instantToRollback);
Review Comment:
🤖 nit: Prefer safer handling of `Option.get()` with
`table.getTxnManager().get()` to avoid `NoSuchElementException`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2171,30 +2172,32 @@ static HoodieActiveTimeline
runPendingTableServicesOperationsAndRefreshTimeline(
* 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
* deltacommit.
*/
- void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> writeClient,
Option<String> latestDeltaCommitTimeOpt) {
+ void compactIfNecessary(final BaseHoodieWriteClient<?,I,?,O> writeClient,
Option<String> latestDeltaCommitTimeOpt) {
// IMPORTANT: Trigger compaction with max instant time that is smaller
than(or equals) the earliest pending instant from DT.
// The compaction planner will manage to filter out the log files that
finished with greater completion time.
Review Comment:
🤖 nit: `restrictedCompactionInstantTimeOpt.get()` is used without an
`isPresent()` check. This could lead to a `NoSuchElementException` if the
`Option` is empty. Consider adding a check or using `orElse()`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java:
##########
@@ -96,7 +96,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
Review Comment:
🤖 nit: Same suggestion here regarding safer `Option.get()` usage.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java:
##########
@@ -143,19 +139,25 @@ private HoodieRestoreMetadata finishRestore(Map<String,
List<HoodieRollbackMetad
table.getActiveTimeline().deletePending(instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.ROLLBACK_ACTION, entry.requestedTime()));
table.getActiveTimeline().deletePending(instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.ROLLBACK_ACTION, entry.requestedTime()));
});
- log.info("Commits " + instantsRolledBack + " rollback is complete.
Restored table to " + savepointToRestoreTimestamp);
+ log.info("Commits {} rollback is complete. Restored table to {}",
instantsRolledBack, savepointToRestoreTimestamp);
return restoreMetadata;
}
/**
* Update metadata table if available. Any update to metadata table happens
within data table lock.
+ * After metadata table is updated, the data table commit is marked as
complete.
*
* @param restoreMetadata instance of {@link HoodieRestoreMetadata} to be
applied to metadata.
+ * @param restoreInflightInstant the inflight instant to be saved as
complete.
*/
- private void writeToMetadata(HoodieRestoreMetadata restoreMetadata,
HoodieInstant restoreInflightInstant) {
+ private void writeToMetadataAndCompleteCommit(HoodieRestoreMetadata
restoreMetadata, HoodieInstant restoreInflightInstant) {
Review Comment:
🤖 nit: This `// FIXME-vc:` comment also suggests an outstanding item. Could
you provide more context or a plan for its resolution?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -41,40 +46,166 @@ public class TransactionManager implements Serializable,
AutoCloseable {
protected final LockManager lockManager;
@Getter
protected final boolean isLockRequired;
+ private final transient TimeGenerator timeGenerator;
+ private volatile long lockHolderId; // lock holder ID
+ private int permits; // allows for nested transaction
protected Option<HoodieInstant> changeActionInstant = Option.empty();
private Option<HoodieInstant> lastCompletedActionInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, storage), config.isLockRequired());
+ this(config, new LockManager(config, storage));
}
- protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
+ protected TransactionManager(HoodieWriteConfig writeConfig, LockManager
lockManager) {
+ this(lockManager, writeConfig.isLockRequired(),
TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig()));
+ }
+
+ public TransactionManager(LockManager lockManager, boolean isLockRequired,
TimeGenerator timeGenerator) {
this.lockManager = lockManager;
this.isLockRequired = isLockRequired;
+ this.timeGenerator = timeGenerator;
+ this.lockHolderId = -1;
+ this.permits = 0;
+ }
+
+ /**
+ * Caution: the invoker needs to ensure that API called within a lock
context.
+ */
+ public String generateInstantTime() {
+ if (lockHolderId < 0 && isLockRequired) {
+ throw new HoodieLockException("Cannot create instant without acquiring a
lock first.");
+ }
+ return HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L);
+ }
+
+ /**
+ * Generates an instant time and executes an action that requires that
instant time within a lock.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Function<String, T>
instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(Option.empty(), Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Function<String, T> instantTimeConsumingAction) {
+ return executeStateChangeWithInstant(providedInstantTime, Option.empty(),
instantTimeConsumingAction);
+ }
+
+ /**
+ * Uses the provided instant if present or else generates an instant time
and executes an action that requires that instant time within a lock.
+ * @param providedInstantTime an optional instant time provided by the
caller. If not provided, a new instant time will be generated.
+ * @param lastCompletedActionInstant optional input representing the last
completed instant, used for logging purposes.
+ * @param instantTimeConsumingAction a function that takes the generated
instant time and performs some action
+ * @return the result of the action
+ * @param <T> type of the result
+ */
+ public <T> T executeStateChangeWithInstant(Option<String>
providedInstantTime, Option<HoodieInstant> lastCompletedActionInstant,
Function<String, T> instantTimeConsumingAction) {
+ if (isLockRequired()) {
+ acquireLock();
+ }
+ String requestedInstant = providedInstantTime.orElseGet(() ->
HoodieInstantTimeGenerator.createNewInstantTime(timeGenerator, 0L));
+ try {
+ if (lastCompletedActionInstant.isEmpty()) {
+ LOG.info("State change starting for {}", changeActionInstant);
+ } else {
+ LOG.info("State change starting for {} with latest completed action
instant {}", changeActionInstant, lastCompletedActionInstant.get());
+ }
+ return instantTimeConsumingAction.apply(requestedInstant);
+ } finally {
+ if (isLockRequired()) {
+ releaseLock();
+ LOG.info("State change ended for {}", requestedInstant);
+ }
+ }
+ }
+
+ public void beginStateChange() {
+ beginStateChange(Option.empty(), Option.empty());
}
public void beginStateChange(Option<HoodieInstant> changeActionInstant,
Option<HoodieInstant>
lastCompletedActionInstant) {
if (isLockRequired) {
LOG.info("State change starting for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
- lockManager.lock();
+ acquireLock();
reset(this.changeActionInstant, changeActionInstant,
lastCompletedActionInstant);
LOG.info("State change started for {} with latest completed action
instant {}",
changeActionInstant, lastCompletedActionInstant);
}
}
+ public void endStateChange() {
+ endStateChange(Option.empty());
+ }
+
Review Comment:
🤖 nit: There's a typo in this comment; `{
@.idea/codeStyles/codeStyleConfig.xml hasLock}` should be removed.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java:
##########
@@ -98,7 +98,7 @@ public T start(CheckedFunction<T, R> func) throws R {
throw e;
Review Comment:
🤖 nit: Small grammar fix from 'retried' to 'retrying' makes the log message
more natural.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java:
##########
@@ -51,17 +50,13 @@ protected HoodieRollbackMetadata
rollbackInstant(HoodieInstant instantToRollback
default:
throw new IllegalArgumentException("invalid action name " +
instantToRollback.getAction());
Review Comment:
🤖 nit: Again, for `table.getTxnManager().get()`, consider checking
`isPresent()` first for safety.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java:
##########
@@ -95,7 +95,7 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
this.deleteInstants = deleteInstants;
Review Comment:
🤖 nit: Same suggestion for safer `Option.get()` usage here as well.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java:
##########
@@ -59,9 +59,8 @@ public void preCompact(
? instantGenerator.getCompactionInflightInstant(instantTime)
Review Comment:
🤖 nit: Ensure `table.getTxnManager()` is present before calling `.get()`, or
handle the empty case.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -202,7 +196,7 @@ protected HoodieTableMetaClient(HoodieStorage storage,
String basePath, boolean
if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
// Ensure layout version passed in config is not lower than the one seen
in hoodie.properties
checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get()) >=
0,
- "Layout Version defined in hoodie properties has higher version (" +
tableConfigVersion.get()
+ () -> "Layout Version defined in hoodie properties has higher
version (" + tableConfigVersion.get()
+ ") than the one passed in config (" + layoutVersion.get() +
")");
Review Comment:
🤖 nit: Using a lambda for assertion messages `() -> "..."` is a good
practice as it defers string construction until the assertion actually fails,
potentially improving performance slightly if the message is complex and the
assertion rarely fails.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java:
##########
Review Comment:
**Line 37:** 🤖 nit: Using a ternary operator here makes the table creation
logic more concise and readable.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
Review Comment:
**Line 62:** 🤖 nit: Similar to other instances, consider handling the
`Option` from `table.getTxnManager()` more safely than directly calling
`.get()`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java:
##########
Review Comment:
**Line 61:** 🤖 nit: Consider adding `isPresent()` check for
`table.getTxnManager()` before `.get()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]