codope commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r835763700



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams 
getRecordsGenerationParams() {
 
   /**
    * Processes commit metadata from data table and commits to metadata table.
+   *
    * @param instantTime instant time of interest.
    * @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
    * @param <T> type of commit metadata.
    * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
    */
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-    if (enabled && metadata != null) {
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap 
= convertMetadataFunction.convertMetadata();
-      commit(instantTime, partitionRecordsMap, canTriggerTableService);
+    if (!dataWriteConfig.isMetadataTableEnabled()) {
+      return;
+    }
+    Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+        commit(instantTime, partitionRecordsMap, canTriggerTableService);
+      }
+    });
+  }
+
+  private Set<String> getMetadataPartitionsToUpdate() {
+    // fetch partitions to update from table config
+    Set<String> partitionsToUpdate = 
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+        .map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toSet());
+    
partitionsToUpdate.addAll(Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+        .map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toSet()));
+    if (!partitionsToUpdate.isEmpty()) {
+      return partitionsToUpdate;
     }
+    // fallback to update files partition only if table config returned no 
partitions
+    partitionsToUpdate.add(MetadataPartitionType.FILES.getPartitionPath());
+    return partitionsToUpdate;
+  }
+
+  @Override
+  public void index(HoodieEngineContext engineContext, 
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+    if (indexPartitionInfos.isEmpty()) {
+      LOG.warn("No partition to index in the plan");
+      return;
+    }
+    String indexUptoInstantTime = 
indexPartitionInfos.get(0).getIndexUptoInstant();
+    indexPartitionInfos.forEach(indexPartitionInfo -> {
+      String relativePartitionPath = 
indexPartitionInfo.getMetadataPartitionPath();
+      LOG.info(String.format("Creating a new metadata index for partition '%s' 
under path %s upto instant %s",
+          relativePartitionPath, metadataWriteConfig.getBasePath(), 
indexUptoInstantTime));
+      try {
+        // filegroup should have already been initialized while scheduling 
index for this partition
+        if (!dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {
+          throw new HoodieIndexException(String.format("File group not 
initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index 
scheduling failed!",
+              relativePartitionPath, indexUptoInstantTime));
+        }
+      } catch (IOException e) {
+        throw new HoodieIndexException(String.format("Unable to check whether 
file group is initialized for metadata partition: %s, indexUptoInstant: %s",
+            relativePartitionPath, indexUptoInstantTime));
+      }
+
+      // return early and populate enabledPartitionTypes correctly (check in 
initialCommit)
+      MetadataPartitionType partitionType = 
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+      if (!enabledPartitionTypes.contains(partitionType)) {
+        throw new HoodieIndexException(String.format("Indexing for metadata 
partition: %s is not enabled", partitionType));
+      }
+    });
+    // before initial commit update table config
+    
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
 indexPartitionInfos.stream()

Review comment:
       hmm.. good point, we should append.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to