nsivabalan commented on code in PR #12321:
URL: https://github.com/apache/hudi/pull/12321#discussion_r1857727831
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
+ }
+
+ @VisibleForTesting
+ public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext
engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName);
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFiles) {
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ return engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat,
storage).iterator();
+ } else {
+ // for logs, every entry is either an update or a delete
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ return getRecordKeys(fullFilePath.toString(),
dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize,
instantTime).iterator();
+ }
+ }).collectAsList();
Review Comment:
yes. bcoz following this, we need to look up in SI in MDT reader.
https://github.com/apache/hudi/blob/397005d6ed141823b08321438231e49ba8f311cd/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java#L341
hence had to collect as list.
we might need to revisit the apis in metadata reader (BaseTableMetadata) if
at all we can support HoodieData based apis.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]