jonvex commented on code in PR #12001:
URL: https://github.com/apache/hudi/pull/12001#discussion_r1777307915
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -179,40 +176,27 @@ protected HoodieData<HoodieRecord>
getFunctionalIndexRecords(List<Pair<String, F
String columnToIndex = indexDefinition.getSourceFields().get(0);
SQLContext sqlContext = sparkEngineContext.getSqlContext();
String basePath = metaClient.getBasePath().toString();
- for (Pair<String, FileSlice> pair : partitionFileSlicePairs) {
- String partition = pair.getKey();
- FileSlice fileSlice = pair.getValue();
+
+ // Group FileSlices by partition
+ Map<String, List<FileSlice>> partitionToFileSlicesMap =
partitionFileSlicePairs.stream()
+ .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList())));
+ List<HoodieRecord> allRecords = new ArrayList<>();
+ for (Map.Entry<String, List<FileSlice>> entry :
partitionToFileSlicesMap.entrySet()) {
+ String partition = entry.getKey();
+ List<FileSlice> fileSlices = entry.getValue();
+ List<HoodieRecord> recordsForPartition = Collections.emptyList();
// For functional index using column_stats
if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
- return getFunctionalIndexRecordsUsingColumnStats(
- metaClient,
- parallelism,
- readerSchema,
- fileSlice,
- basePath,
- partition,
- functionalIndex,
- columnToIndex,
- sqlContext,
- sparkEngineContext);
+ recordsForPartition =
getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, fileSlices,
basePath, partition, functionalIndex, columnToIndex, sqlContext);
}
// For functional index using bloom_filters
if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
Review Comment:
Change this to an else if.
The two ifs are not both possible to be true. And since we are overwriting
recordsForPartition in both of them it makes even more sense to not execute
both of them.
```
indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)
indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)
```
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
private static final String READ_PATHS_CONFIG =
"hoodie.datasource.read.paths";
private static final String GLOB_PATHS_CONFIG = "glob.paths";
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+
Schema readerSchema,
+
List<FileSlice> fileSlices,
+
String basePath,
+
String partition,
+
HoodieFunctionalIndex<Column, Column> functionalIndex,
+
String columnToIndex,
+
SQLContext sqlContext) {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new
ArrayList<>();
- if (fileSlice.getBaseFile().isPresent()) {
- HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- String filename = baseFile.getFileName();
- long fileSize = baseFile.getFileSize();
- Path baseFilePath = filePath(basePath, partition, filename);
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+ for (FileSlice fileSlice : fileSlices) {
+ if (fileSlice.getBaseFile().isPresent()) {
+ HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ String filename = baseFile.getFileName();
+ long fileSize = baseFile.getFileSize();
+ Path baseFilePath = filePath(basePath, partition, filename);
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+ }
+ // Handle log files
+ fileSlice.getLogFiles().forEach(logFile -> {
+ String fileName = logFile.getFileName();
+ Path logFilePath = filePath(basePath, partition, fileName);
+ long fileSize = logFile.getFileSize();
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+ });
}
- // Handle log files
- fileSlice.getLogFiles().forEach(logFile -> {
- String fileName = logFile.getFileName();
- Path logFilePath = filePath(basePath, partition, fileName);
- long fileSize = logFile.getFileSize();
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
- });
- return HoodieJavaRDD.of(createColumnStatsRecords(partition,
columnRangeMetadataList, false).collect(Collectors.toList()),
sparkEngineContext, parallelism);
+ return createColumnStatsRecords(partition, columnRangeMetadataList,
false).collect(Collectors.toList());
}
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext,
- HoodieWriteConfig metadataWriteConfig) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+
Schema readerSchema,
+
List<FileSlice> fileSlices,
+
String basePath,
+
String partition,
+
HoodieFunctionalIndex<Column, Column> functionalIndex,
+
String columnToIndex,
+
SQLContext sqlContext,
+
HoodieWriteConfig metadataWriteConfig) {
List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
- if (fileSlice.getBaseFile().isPresent()) {
- HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- String filename = baseFile.getFileName();
- Path baseFilePath = filePath(basePath, partition, filename);
- buildBloomFilterMetadata(
- metaClient,
- readerSchema,
- functionalIndex,
- columnToIndex,
- sqlContext,
- bloomFilterMetadataList,
- baseFilePath,
- metadataWriteConfig,
- partition,
- baseFile.getCommitTime());
+ for (FileSlice fileSlice : fileSlices) {
+ if (fileSlice.getBaseFile().isPresent()) {
+ HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ String filename = baseFile.getFileName();
+ Path baseFilePath = filePath(basePath, partition, filename);
+ buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, bloomFilterMetadataList, baseFilePath,
metadataWriteConfig, partition, baseFile.getCommitTime());
+ }
+ // Handle log files
+ fileSlice.getLogFiles().forEach(logFile -> {
+ String fileName = logFile.getFileName();
+ Path logFilePath = filePath(basePath, partition, fileName);
+ buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, bloomFilterMetadataList, logFilePath,
metadataWriteConfig, partition,
+ logFile.getDeltaCommitTime());
+ });
}
- // Handle log files
- fileSlice.getLogFiles().forEach(logFile -> {
- String fileName = logFile.getFileName();
- Path logFilePath = filePath(basePath, partition, fileName);
- buildBloomFilterMetadata(
- metaClient,
- readerSchema,
- functionalIndex,
- columnToIndex,
- sqlContext,
- bloomFilterMetadataList,
- logFilePath,
- metadataWriteConfig,
- partition,
- logFile.getDeltaCommitTime());
- });
- return HoodieJavaRDD.of(bloomFilterMetadataList, sparkEngineContext,
parallelism);
+ return bloomFilterMetadataList;
}
- private static void buildColumnRangeMetadata(
- HoodieTableMetaClient metaClient,
- Schema readerSchema,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
- long fileSize,
- Path filePath) {
- Dataset<Row> fileDf = readRecordsAsRow(
- new StoragePath[] {convertToStoragePath(filePath)},
- sqlContext,
- metaClient,
- readerSchema);
+ private static void buildColumnRangeMetadata(HoodieTableMetaClient
metaClient,
+ Schema readerSchema,
+ HoodieFunctionalIndex<Column,
Column> functionalIndex,
+ String columnToIndex,
+ SQLContext sqlContext,
+
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+ long fileSize,
+ Path filePath) {
Review Comment:
Should we just change this to storagepath?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
private static final String READ_PATHS_CONFIG =
"hoodie.datasource.read.paths";
private static final String GLOB_PATHS_CONFIG = "glob.paths";
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
Review Comment:
This and getFunctionalIndexRecordsUsingBloomFilter look very similar. Is
there a clean way to reuse code? There might not.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
private static final String READ_PATHS_CONFIG =
"hoodie.datasource.read.paths";
private static final String GLOB_PATHS_CONFIG = "glob.paths";
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+
Schema readerSchema,
+
List<FileSlice> fileSlices,
+
String basePath,
+
String partition,
+
HoodieFunctionalIndex<Column, Column> functionalIndex,
+
String columnToIndex,
+
SQLContext sqlContext) {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new
ArrayList<>();
- if (fileSlice.getBaseFile().isPresent()) {
- HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- String filename = baseFile.getFileName();
- long fileSize = baseFile.getFileSize();
- Path baseFilePath = filePath(basePath, partition, filename);
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+ for (FileSlice fileSlice : fileSlices) {
+ if (fileSlice.getBaseFile().isPresent()) {
+ HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ String filename = baseFile.getFileName();
+ long fileSize = baseFile.getFileSize();
+ Path baseFilePath = filePath(basePath, partition, filename);
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+ }
+ // Handle log files
+ fileSlice.getLogFiles().forEach(logFile -> {
+ String fileName = logFile.getFileName();
+ Path logFilePath = filePath(basePath, partition, fileName);
+ long fileSize = logFile.getFileSize();
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+ });
}
- // Handle log files
- fileSlice.getLogFiles().forEach(logFile -> {
- String fileName = logFile.getFileName();
- Path logFilePath = filePath(basePath, partition, fileName);
- long fileSize = logFile.getFileSize();
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
- });
- return HoodieJavaRDD.of(createColumnStatsRecords(partition,
columnRangeMetadataList, false).collect(Collectors.toList()),
sparkEngineContext, parallelism);
+ return createColumnStatsRecords(partition, columnRangeMetadataList,
false).collect(Collectors.toList());
}
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext,
- HoodieWriteConfig metadataWriteConfig) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+
Schema readerSchema,
+
List<FileSlice> fileSlices,
+
String basePath,
+
String partition,
+
HoodieFunctionalIndex<Column, Column> functionalIndex,
+
String columnToIndex,
+
SQLContext sqlContext,
+
HoodieWriteConfig metadataWriteConfig) {
List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
- if (fileSlice.getBaseFile().isPresent()) {
- HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- String filename = baseFile.getFileName();
- Path baseFilePath = filePath(basePath, partition, filename);
- buildBloomFilterMetadata(
- metaClient,
- readerSchema,
- functionalIndex,
- columnToIndex,
- sqlContext,
- bloomFilterMetadataList,
- baseFilePath,
- metadataWriteConfig,
- partition,
- baseFile.getCommitTime());
+ for (FileSlice fileSlice : fileSlices) {
+ if (fileSlice.getBaseFile().isPresent()) {
Review Comment:
No bootstrap base handling? Do we need to create a followup ticket for that?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -70,118 +68,88 @@ public class SparkMetadataWriterUtils {
private static final String READ_PATHS_CONFIG =
"hoodie.datasource.read.paths";
private static final String GLOB_PATHS_CONFIG = "glob.paths";
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
+
Schema readerSchema,
+
List<FileSlice> fileSlices,
+
String basePath,
+
String partition,
+
HoodieFunctionalIndex<Column, Column> functionalIndex,
+
String columnToIndex,
+
SQLContext sqlContext) {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new
ArrayList<>();
- if (fileSlice.getBaseFile().isPresent()) {
- HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- String filename = baseFile.getFileName();
- long fileSize = baseFile.getFileSize();
- Path baseFilePath = filePath(basePath, partition, filename);
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+ for (FileSlice fileSlice : fileSlices) {
+ if (fileSlice.getBaseFile().isPresent()) {
+ HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ String filename = baseFile.getFileName();
+ long fileSize = baseFile.getFileSize();
+ Path baseFilePath = filePath(basePath, partition, filename);
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFilePath);
+ }
+ // Handle log files
+ fileSlice.getLogFiles().forEach(logFile -> {
+ String fileName = logFile.getFileName();
+ Path logFilePath = filePath(basePath, partition, fileName);
+ long fileSize = logFile.getFileSize();
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
+ });
}
- // Handle log files
- fileSlice.getLogFiles().forEach(logFile -> {
- String fileName = logFile.getFileName();
- Path logFilePath = filePath(basePath, partition, fileName);
- long fileSize = logFile.getFileSize();
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFilePath);
- });
- return HoodieJavaRDD.of(createColumnStatsRecords(partition,
columnRangeMetadataList, false).collect(Collectors.toList()),
sparkEngineContext, parallelism);
+ return createColumnStatsRecords(partition, columnRangeMetadataList,
false).collect(Collectors.toList());
}
- public static HoodieJavaRDD<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(
- HoodieTableMetaClient metaClient,
- int parallelism,
- Schema readerSchema,
- FileSlice fileSlice,
- String basePath,
- String partition,
- HoodieFunctionalIndex<Column, Column> functionalIndex,
- String columnToIndex,
- SQLContext sqlContext,
- HoodieSparkEngineContext sparkEngineContext,
- HoodieWriteConfig metadataWriteConfig) {
+ public static List<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
+
Schema readerSchema,
+
List<FileSlice> fileSlices,
+
String basePath,
+
String partition,
+
HoodieFunctionalIndex<Column, Column> functionalIndex,
+
String columnToIndex,
+
SQLContext sqlContext,
+
HoodieWriteConfig metadataWriteConfig) {
List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
- if (fileSlice.getBaseFile().isPresent()) {
- HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- String filename = baseFile.getFileName();
- Path baseFilePath = filePath(basePath, partition, filename);
- buildBloomFilterMetadata(
- metaClient,
- readerSchema,
- functionalIndex,
- columnToIndex,
- sqlContext,
- bloomFilterMetadataList,
- baseFilePath,
- metadataWriteConfig,
- partition,
- baseFile.getCommitTime());
+ for (FileSlice fileSlice : fileSlices) {
+ if (fileSlice.getBaseFile().isPresent()) {
+ HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ String filename = baseFile.getFileName();
+ Path baseFilePath = filePath(basePath, partition, filename);
+ buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, bloomFilterMetadataList, baseFilePath,
metadataWriteConfig, partition, baseFile.getCommitTime());
+ }
+ // Handle log files
+ fileSlice.getLogFiles().forEach(logFile -> {
+ String fileName = logFile.getFileName();
+ Path logFilePath = filePath(basePath, partition, fileName);
Review Comment:
Shouldn't the log file already have the storagepath?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]