yihua commented on a change in pull request #4034:
URL: https://github.com/apache/hudi/pull/4034#discussion_r754743106
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -88,33 +93,41 @@
/**
* Get the last completed transaction hoodie instant and {@link
HoodieCommitMetadata#getExtraMetadata()}.
+ *
* @param metaClient
* @return
*/
public static Option<Pair<HoodieInstant, Map<String, String>>>
getLastCompletedTxnInstantAndMetadata(
HoodieTableMetaClient metaClient) {
- Option<HoodieInstant> hoodieInstantOption =
metaClient.getActiveTimeline().getCommitsTimeline()
- .filterCompletedInstants().lastInstant();
- try {
- if (hoodieInstantOption.isPresent()) {
- switch (hoodieInstantOption.get().getAction()) {
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
HoodieReplaceCommitMetadata.class);
- return Option.of(Pair.of(hoodieInstantOption.get(),
replaceCommitMetadata.getExtraMetadata()));
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- case HoodieTimeline.COMMIT_ACTION:
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
HoodieCommitMetadata.class);
- return Option.of(Pair.of(hoodieInstantOption.get(),
commitMetadata.getExtraMetadata()));
- default:
- throw new IllegalArgumentException("Unknown instant action" +
hoodieInstantOption.get().getAction());
+ List<HoodieInstant> hoodieInstants =
metaClient.getActiveTimeline().getCommitsTimeline()
+
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
+ if (!hoodieInstants.isEmpty()) {
+ for (HoodieInstant hoodieInstant : hoodieInstants) {
+ try {
+ switch (hoodieInstant.getAction()) {
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieReplaceCommitMetadata.class);
+ return Option.of(Pair.of(hoodieInstant,
replaceCommitMetadata.getExtraMetadata()));
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ case HoodieTimeline.COMMIT_ACTION:
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class);
+ if
(!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)
+ &&
!commitMetadata.getOperationType().equals(WriteOperationType.COMPACT)) { //
skip compaction instants
Review comment:
The compaction commit is a separate action. Should the logic of
skipping comapction instants be put into a separate branch of `case
HoodieTimeline.COMPACTION_ACTION`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]