bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493957561



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -554,14 +608,16 @@ protected HoodieBaseFile 
addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGro
       readLock.lock();
       String partition = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partition);
-      return fetchAllStoredFileGroups(partition).map(fileGroup -> {
-        Option<FileSlice> fileSlice = 
fileGroup.getLatestFileSliceBeforeOrOn(maxInstantTime);
-        // if the file-group is under construction, pick the latest before 
compaction instant time.
-        if (fileSlice.isPresent()) {
-          fileSlice = Option.of(fetchMergedFileSlice(fileGroup, 
fileSlice.get()));
-        }
-        return fileSlice;
-      
}).filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+      return fetchAllStoredFileGroups(partition)
+          .filter(fg -> !isFileGroupReplaced(fg.getFileGroupId()))

Review comment:
       same case here,, we need to use the maxInstantTime passed here instead 
of the timeline's maxInstant.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -425,10 +459,14 @@ protected HoodieBaseFile 
addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGro
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
-      return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> 
fileGroup.getAllBaseFiles()
-          .filter(baseFile -> 
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), 
HoodieTimeline.EQUALS,
-              instantTime)).filter(df -> 
!isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null))
-          .map(df -> addBootstrapBaseFileIfPresent(new 
HoodieFileGroupId(partitionPath, fileId), df));
+      if (isFileGroupReplaced(partitionPath, fileId)) {

Review comment:
       @satishkotha : Dont we need to use instant time when checking for 
replaced-file here ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -727,6 +795,26 @@ private String formatPartitionKey(String partitionStr) {
    */
   abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups();
 
+  /**
+   * Track instant time for file groups replaced.
+   */
+  protected abstract void resetReplacedFileGroups(final Map<HoodieFileGroupId, 
HoodieInstant> replacedFileGroups);
+
+  /**
+   * Track instant time for new file groups replaced.
+   */
+  protected abstract void addReplacedFileGroups(final Map<HoodieFileGroupId, 
HoodieInstant> replacedFileGroups);
+
+  /**
+   * Remove file groups that are replaced in any of the specified instants.
+   */
+  protected abstract void removeReplacedFileIds(Set<String> instants);

Review comment:
       rename removeReplacedFileIdsAtInstants ?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       We are creating metaclient and loading timeline once here and in the 
function called in the next line. Can you make sure you create metaclient only 
once without loading timeline.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to generate compaction plan from FileGroup/FileSlice 
abstraction.

Review comment:
       Doc needs fixing.

##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -39,6 +39,18 @@
         "name":"version",
         "type":["int", "null"],
         "default": 1
+     },
+     /* overlaps with 'commitsRollback' field. Adding this to track action 
type for all the instants being rolled back. */

Review comment:
       Can we move this to separate avsc file and reference it here 

##########
File path: hudi-common/src/main/avro/HoodieRestoreMetadata.avsc
##########
@@ -34,6 +34,8 @@
         "name":"version",
         "type":["int", "null"],
         "default": 1
-     }
+     },
+     /* overlaps with 'instantsToRollback' field. Adding this to track action 
type for all the instants being rolled back. */
+     {"name": "restoreInstantInfo", "default": null, "type": {"type": "array", 
"default": null, "items": ["null", "HoodieInstantInfo"]}}

Review comment:
       Can you use pretty print mode (multi-lines) in the same way other 
section is presented.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -91,40 +92,47 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses) {
   }
 
   /**
+   *
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) {
-    List<HoodieWriteStat> stats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, stats, extraMetadata);
+                        Option<Map<String, String>> extraMetadata) {
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, extraMetadata, actionType, 
Collections.emptyMap());
   }
 
-  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata) {
-    LOG.info("Committing " + instantTime);
+  /**
+   * Complete changes performed at the given instantTime marker with specified 
action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
+      Option<Map<String, String>> extraMetadata, String commitActionType, 
Map<String, List<String>> partitionToReplacedFileIds) {
+    List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds);
+  }
+
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata,
+                             String commitActionType) {
+    return commitStats(instantTime, stats, extraMetadata, commitActionType, 
Collections.emptyMap());
+  }
+
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata,
+                             String commitActionType, Map<String, 
List<String>> partitionToReplaceFileIds) {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieTableMetaClient metaClient = createMetaClient(false);

Review comment:
       this is no longer needed and can be removed ?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       My bad. Got confused with the method naming :) 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, metaClient.getCommitActionType(), 
metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) 
with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, actionType, metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) 
with specified action.
+   */
+  private void startCommitWithTime(String instantTime, String actionType, 
HoodieTableMetaClient metaClient) {
     // NOTE : Need to ensure that rollback is done before a new commit is 
started
     if (rollbackPending) {
       // Only rollback inflight commit/delta-commits. Do not touch compaction 
commits
       rollbackPendingCommits();
     }
-    startCommit(instantTime);
+    startCommit(instantTime, actionType, metaClient);
   }
 
-  private void startCommit(String instantTime) {
-    LOG.info("Generate a new instant time " + instantTime);
-    HoodieTableMetaClient metaClient = createMetaClient(true);
+  private void startCommit(String instantTime, String actionType, 
HoodieTableMetaClient metaClient) {

Review comment:
       Does this have to be non-static ? Can it be moved to CommitUtils ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to