codope commented on a change in pull request #4693: URL: https://github.com/apache/hudi/pull/4693#discussion_r835763700
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java ########## @@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() { /** * Processes commit metadata from data table and commits to metadata table. + * * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. * @param <T> type of commit metadata. * @param canTriggerTableService true if table services can be triggered. false otherwise. */ private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { - if (enabled && metadata != null) { - Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata(); - commit(instantTime, partitionRecordsMap, canTriggerTableService); + if (!dataWriteConfig.isMetadataTableEnabled()) { + return; + } + Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate(); + partitionsToUpdate.forEach(p -> { + if (enabled && metadata != null) { + Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata(); + commit(instantTime, partitionRecordsMap, canTriggerTableService); + } + }); + } + + private Set<String> getMetadataPartitionsToUpdate() { + // fetch partitions to update from table config + Set<String> partitionsToUpdate = Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(",")) + .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet()); + partitionsToUpdate.addAll(Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(",")) + .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet())); + if (!partitionsToUpdate.isEmpty()) { + return partitionsToUpdate; } + // fallback to update files partition only if table config returned no partitions + partitionsToUpdate.add(MetadataPartitionType.FILES.getPartitionPath()); + return partitionsToUpdate; + } + + @Override + public void index(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos) { + if (indexPartitionInfos.isEmpty()) { + LOG.warn("No partition to index in the plan"); + return; + } + String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant(); + indexPartitionInfos.forEach(indexPartitionInfo -> { + String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath(); + LOG.info(String.format("Creating a new metadata index for partition '%s' under path %s upto instant %s", + relativePartitionPath, metadataWriteConfig.getBasePath(), indexUptoInstantTime)); + try { + // filegroup should have already been initialized while scheduling index for this partition + if (!dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) { + throw new HoodieIndexException(String.format("File group not initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index scheduling failed!", + relativePartitionPath, indexUptoInstantTime)); + } + } catch (IOException e) { + throw new HoodieIndexException(String.format("Unable to check whether file group is initialized for metadata partition: %s, indexUptoInstant: %s", + relativePartitionPath, indexUptoInstantTime)); + } + + // return early and populate enabledPartitionTypes correctly (check in initialCommit) + MetadataPartitionType partitionType = MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT)); + if (!enabledPartitionTypes.contains(partitionType)) { + throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", partitionType)); + } + }); + // before initial commit update table config + dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(), indexPartitionInfos.stream() Review comment: hmm.. good point, we should append. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org