the-other-tim-brown commented on code in PR #13444:
URL: https://github.com/apache/hudi/pull/13444#discussion_r2198990306


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java:
##########
@@ -163,34 +161,23 @@ private void doCompaction(String instantTime,
     // schema evolution
     CompactionUtil.setAvroSchema(writeConfig, metaClient);
     List<WriteStatus> writeStatuses = compactor.compact(
-        flinkTable,
         writeConfig,
         compactionOperation,
         instantTime,
         flinkTable.getTaskContextSupplier(),
-        readerContextOpt,
+        readerContext,
         flinkTable);
     compactionMetrics.endCompaction();
     collector.collect(new CompactionCommitEvent(instantTime, 
compactionOperation.getFileId(), writeStatuses, taskID));
   }
 
-  private Option<HoodieReaderContext<?>> 
initReaderContext(HoodieFlinkWriteClient<?> writeClient) {
+  private HoodieReaderContext<?> initReaderContext(HoodieFlinkWriteClient<?> 
writeClient) {
     HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
-    boolean useFileGroupReaderBasedCompaction = !metaClient.isMetadataTable()
-        && 
writeClient.getConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
-        && writeClient.getConfig().populateMetaFields()                        
                                 // Virtual key support by fg reader is not 
ready
-        && !(metaClient.getTableConfig().isCDCEnabled() && 
writeClient.getConfig().isYieldingPureLogForMor());  // do not support produce 
cdc log during fg reader
-    if (useFileGroupReaderBasedCompaction) {
-      // CAUTION: reuse the meta client so that the timeline is updated
-      Supplier<InternalSchemaManager> internalSchemaManager = () -> 
InternalSchemaManager.get(conf, metaClient);
-      // initialize storage conf lazily.
-      StorageConfiguration<?> readerConf = 
writeClient.getEngineContext().getStorageConf();
-      return Option.of(new FlinkRowDataReaderContext(readerConf, 
internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig(), 
Option.empty()));
-    } else {
-      // always using avro record merger for legacy compaction since log 
scanner do not support rowdata reading yet.
-      
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
-    }
-    return Option.empty();
+    // CAUTION: reuse the meta client so that the timeline is updated
+    Supplier<InternalSchemaManager> internalSchemaManager = () -> 
InternalSchemaManager.get(conf, metaClient);
+    // initialize storage conf lazily.

Review Comment:
   We're switching all readers to go through FileGroupReader for consistency as 
part of the 1.1.0 release goals so that is why I cleaned this up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to