vinothchandar commented on a change in pull request #4034:
URL: https://github.com/apache/hudi/pull/4034#discussion_r756350588



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -339,11 +340,17 @@ public void refreshTimeline() throws IOException {
           resumeCheckpointStr = Option.empty();
         } else if 
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
             HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
-          throw new HoodieDeltaStreamerException(
-              "Unable to find previous checkpoint. Please double check if this 
table "
-                  + "was indeed built via delta streamer. Last Commit :" + 
lastCommit + ", Instants :"
-                  + 
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", 
CommitMetadata="
-                  + commitMetadata.toJsonString());
+          // if previous commit metadata did not have the checkpoint key, try 
traversing previous commits until we find one.
+          Option<String> prevCheckpoint = 
getPreviousCheckpoint(commitTimelineOpt.get());

Review comment:
       anyway to do Option.orThrow or some pattern?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -88,33 +93,42 @@
 
   /**
    * Get the last completed transaction hoodie instant and {@link 
HoodieCommitMetadata#getExtraMetadata()}.
+   *
    * @param metaClient
    * @return
    */
   public static Option<Pair<HoodieInstant, Map<String, String>>> 
getLastCompletedTxnInstantAndMetadata(
       HoodieTableMetaClient metaClient) {
-    Option<HoodieInstant> hoodieInstantOption = 
metaClient.getActiveTimeline().getCommitsTimeline()
-        .filterCompletedInstants().lastInstant();
-    try {
-      if (hoodieInstantOption.isPresent()) {
-        switch (hoodieInstantOption.get().getAction()) {
-          case HoodieTimeline.REPLACE_COMMIT_ACTION:
-            HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
-                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
 HoodieReplaceCommitMetadata.class);
-            return Option.of(Pair.of(hoodieInstantOption.get(), 
replaceCommitMetadata.getExtraMetadata()));
-          case HoodieTimeline.DELTA_COMMIT_ACTION:
-          case HoodieTimeline.COMMIT_ACTION:
-            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
 HoodieCommitMetadata.class);
-            return Option.of(Pair.of(hoodieInstantOption.get(), 
commitMetadata.getExtraMetadata()));
-          default:
-            throw new IllegalArgumentException("Unknown instant action" + 
hoodieInstantOption.get().getAction());
+    List<HoodieInstant> hoodieInstants = 
metaClient.getActiveTimeline().getCommitsTimeline()
+        
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
+    if (!hoodieInstants.isEmpty()) {
+      for (HoodieInstant hoodieInstant : hoodieInstants) {
+        try {
+          switch (hoodieInstant.getAction()) {
+            case HoodieTimeline.REPLACE_COMMIT_ACTION:
+              HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
+                  
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieReplaceCommitMetadata.class);
+              return Option.of(Pair.of(hoodieInstant, 
replaceCommitMetadata.getExtraMetadata()));
+            case HoodieTimeline.DELTA_COMMIT_ACTION:
+            case HoodieTimeline.COMMIT_ACTION:
+              HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                  
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieCommitMetadata.class);
+              if (commitMetadata.getOperationType() != null
+                  && 
!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)
+                  && 
!commitMetadata.getOperationType().equals(WriteOperationType.COMPACT)) { // 
skip compaction instants
+                return Option.of(Pair.of(hoodieInstant, 
commitMetadata.getExtraMetadata()));
+              }
+              break;
+            default:
+              throw new IllegalArgumentException("Unknown instant action" + 
hoodieInstant.getAction());
+          }
+        } catch (IOException io) {
+          throw new HoodieIOException("Unable to read metadata for instant " + 
hoodieInstant, io);
         }
-      } else {
-        return Option.empty();
       }
-    } catch (IOException io) {
-      throw new HoodieIOException("Unable to read metadata for instant " + 
hoodieInstantOption.get(), io);
+      return Option.empty();

Review comment:
       anyway to avoid these nested empty returns

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
##########
@@ -196,34 +206,59 @@ void 
testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType)
     HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, 
jsc());
     backfillJob.sync();
 
+    meta.reloadActiveTimeline();
+    int totalCommits = 
meta.getCommitsTimeline().filterCompletedInstants().countInstants();
+
     // Save the checkpoint information from the deltastreamer run and perform 
next write
     String checkpointAfterDeltaSync = 
getLatestMetadata(meta).getMetadata(CHECKPOINT_KEY);
     // this writer will enable 
HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key() so that 
deltastreamer checkpoint will be carried over.
-    performWriteWithDeltastreamerStateMerge();
+    doSparkWriteWithDeltastreamerStateMerge(true);// Verify that the 
checkpoint is carried over

Review comment:
       I feel you can just unit test this without actually doing the writes? 
just add the commit metadata without checkpoints?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -88,33 +93,42 @@
 
   /**
    * Get the last completed transaction hoodie instant and {@link 
HoodieCommitMetadata#getExtraMetadata()}.
+   *
    * @param metaClient
    * @return
    */
   public static Option<Pair<HoodieInstant, Map<String, String>>> 
getLastCompletedTxnInstantAndMetadata(
       HoodieTableMetaClient metaClient) {
-    Option<HoodieInstant> hoodieInstantOption = 
metaClient.getActiveTimeline().getCommitsTimeline()
-        .filterCompletedInstants().lastInstant();
-    try {
-      if (hoodieInstantOption.isPresent()) {
-        switch (hoodieInstantOption.get().getAction()) {
-          case HoodieTimeline.REPLACE_COMMIT_ACTION:
-            HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
-                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
 HoodieReplaceCommitMetadata.class);
-            return Option.of(Pair.of(hoodieInstantOption.get(), 
replaceCommitMetadata.getExtraMetadata()));
-          case HoodieTimeline.DELTA_COMMIT_ACTION:
-          case HoodieTimeline.COMMIT_ACTION:
-            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
 HoodieCommitMetadata.class);
-            return Option.of(Pair.of(hoodieInstantOption.get(), 
commitMetadata.getExtraMetadata()));
-          default:
-            throw new IllegalArgumentException("Unknown instant action" + 
hoodieInstantOption.get().getAction());
+    List<HoodieInstant> hoodieInstants = 
metaClient.getActiveTimeline().getCommitsTimeline()
+        
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
+    if (!hoodieInstants.isEmpty()) {
+      for (HoodieInstant hoodieInstant : hoodieInstants) {
+        try {
+          switch (hoodieInstant.getAction()) {
+            case HoodieTimeline.REPLACE_COMMIT_ACTION:
+              HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
+                  
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieReplaceCommitMetadata.class);
+              return Option.of(Pair.of(hoodieInstant, 
replaceCommitMetadata.getExtraMetadata()));
+            case HoodieTimeline.DELTA_COMMIT_ACTION:
+            case HoodieTimeline.COMMIT_ACTION:
+              HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                  
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieCommitMetadata.class);
+              if (commitMetadata.getOperationType() != null

Review comment:
       this probably is older commits?

##########
File path: 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java
##########
@@ -62,13 +66,28 @@ public void testCheckpointStateMerge() throws IOException {
     timeline.createNewInstant(commitInstantWithCheckpointState);
 
     HoodieCommitMetadata metadataWithCheckpoint = new HoodieCommitMetadata();
+    metadataWithCheckpoint.setOperationType(WriteOperationType.INSERT);
     String checkpointVal = "00001";
     
metadataWithCheckpoint.addMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY,
 checkpointVal);
     timeline.saveAsComplete(
         commitInstantWithCheckpointState,
         
Option.of(metadataWithCheckpoint.toJsonString().getBytes(StandardCharsets.UTF_8))
     );
 
+    if (testCompaction) {
+      HoodieInstant compactionInstant = new HoodieInstant(
+          true,
+          HoodieTimeline.COMPACTION_ACTION,

Review comment:
       not sure I follow this test. if you use COMMIT_ACTION above, then it 
must be COW right?  or conversely if testCompaction=true, should not we be 
generating delta commit above?

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -451,6 +458,22 @@ public void refreshTimeline() throws IOException {
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
+  private Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws 
IOException {

Review comment:
       unit test this ? 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -451,6 +458,22 @@ public void refreshTimeline() throws IOException {
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
+  private Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws 
IOException {
+    return (Option<String>) timeline.getReverseOrderedInstants().map(instant 
-> {

Review comment:
       I feel this can be more concise

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -451,6 +458,22 @@ public void refreshTimeline() throws IOException {
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
+  private Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws 
IOException {
+    return (Option<String>) timeline.getReverseOrderedInstants().map(instant 
-> {

Review comment:
       write a `filter()` then a `findFirst()` and then `map()`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to