This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-9437
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c18fed477804ff9e28a6944c81143e583d6c05ff
Author: YueZhang <[email protected]>
AuthorDate: Thu May 22 21:04:33 2025 +0800

    HUDI-9437
---
 .../org/apache/hudi/sink/clustering/ClusteringOperator.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 4ac9c47e14c..6d2e77891f1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -42,7 +42,6 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.IOUtils;
-import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieIOFactory;
@@ -54,6 +53,7 @@ import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.format.HoodieRowDataParquetReader;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.DataTypeUtils;
@@ -328,11 +328,11 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
    */
   private Iterator<RowData> 
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
     List<Iterator<RowData>> iteratorsForPartition = 
clusteringOps.stream().map(clusteringOp -> {
-      Iterable<IndexedRecord> indexedRecords = () -> {
+      Iterable<RowData> indexedRecords = () -> {
         try {
           HoodieFileReaderFactory fileReaderFactory = 
HoodieIOFactory.getIOFactory(table.getStorage())
-              .getReaderFactory(writeConfig.getRecordMerger().getRecordType());
-          HoodieAvroFileReader fileReader = (HoodieAvroFileReader) 
fileReaderFactory.getFileReader(
+              .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK);
+          HoodieRowDataParquetReader fileReader = (HoodieRowDataParquetReader) 
fileReaderFactory.getFileReader(
               table.getConfig(), new 
StoragePath(clusteringOp.getDataFilePath()));
 
           return new 
CloseableMappingIterator<>(fileReader.getRecordIterator(readerSchema), 
HoodieRecord::getData);
@@ -342,7 +342,7 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
         }
       };
 
-      return StreamSupport.stream(indexedRecords.spliterator(), 
false).map(this::transform).iterator();
+      return StreamSupport.stream(indexedRecords.spliterator(), 
false).iterator();
     }).collect(Collectors.toList());
 
     return new ConcatenatingIterator<>(iteratorsForPartition);

Reply via email to