This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-9437 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c18fed477804ff9e28a6944c81143e583d6c05ff Author: YueZhang <[email protected]> AuthorDate: Thu May 22 21:04:33 2025 +0800 HUDI-9437 --- .../org/apache/hudi/sink/clustering/ClusteringOperator.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 4ac9c47e14c..6d2e77891f1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -42,7 +42,6 @@ import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.IOUtils; -import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieIOFactory; @@ -54,6 +53,7 @@ import org.apache.hudi.sink.bulk.sort.SortOperatorGen; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.format.HoodieRowDataParquetReader; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.DataTypeUtils; @@ -328,11 +328,11 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven */ private Iterator<RowData> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) { List<Iterator<RowData>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> { - Iterable<IndexedRecord> indexedRecords = () -> { + Iterable<RowData> indexedRecords = () -> { try { HoodieFileReaderFactory fileReaderFactory = HoodieIOFactory.getIOFactory(table.getStorage()) - .getReaderFactory(writeConfig.getRecordMerger().getRecordType()); - HoodieAvroFileReader fileReader = (HoodieAvroFileReader) fileReaderFactory.getFileReader( + .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK); + HoodieRowDataParquetReader fileReader = (HoodieRowDataParquetReader) fileReaderFactory.getFileReader( table.getConfig(), new StoragePath(clusteringOp.getDataFilePath())); return new CloseableMappingIterator<>(fileReader.getRecordIterator(readerSchema), HoodieRecord::getData); @@ -342,7 +342,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven } }; - return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator(); + return StreamSupport.stream(indexedRecords.spliterator(), false).iterator(); }).collect(Collectors.toList()); return new ConcatenatingIterator<>(iteratorsForPartition);
