the-other-tim-brown commented on code in PR #13444:
URL: https://github.com/apache/hudi/pull/13444#discussion_r2198990306
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java:
##########
@@ -163,34 +161,23 @@ private void doCompaction(String instantTime,
// schema evolution
CompactionUtil.setAvroSchema(writeConfig, metaClient);
List<WriteStatus> writeStatuses = compactor.compact(
- flinkTable,
writeConfig,
compactionOperation,
instantTime,
flinkTable.getTaskContextSupplier(),
- readerContextOpt,
+ readerContext,
flinkTable);
compactionMetrics.endCompaction();
collector.collect(new CompactionCommitEvent(instantTime,
compactionOperation.getFileId(), writeStatuses, taskID));
}
- private Option<HoodieReaderContext<?>>
initReaderContext(HoodieFlinkWriteClient<?> writeClient) {
+ private HoodieReaderContext<?> initReaderContext(HoodieFlinkWriteClient<?>
writeClient) {
HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
- boolean useFileGroupReaderBasedCompaction = !metaClient.isMetadataTable()
- &&
writeClient.getConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
- && writeClient.getConfig().populateMetaFields()
// Virtual key support by fg reader is not
ready
- && !(metaClient.getTableConfig().isCDCEnabled() &&
writeClient.getConfig().isYieldingPureLogForMor()); // do not support produce
cdc log during fg reader
- if (useFileGroupReaderBasedCompaction) {
- // CAUTION: reuse the meta client so that the timeline is updated
- Supplier<InternalSchemaManager> internalSchemaManager = () ->
InternalSchemaManager.get(conf, metaClient);
- // initialize storage conf lazily.
- StorageConfiguration<?> readerConf =
writeClient.getEngineContext().getStorageConf();
- return Option.of(new FlinkRowDataReaderContext(readerConf,
internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig(),
Option.empty()));
- } else {
- // always using avro record merger for legacy compaction since log
scanner do not support rowdata reading yet.
-
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
- }
- return Option.empty();
+ // CAUTION: reuse the meta client so that the timeline is updated
+ Supplier<InternalSchemaManager> internalSchemaManager = () ->
InternalSchemaManager.get(conf, metaClient);
+ // initialize storage conf lazily.
Review Comment:
We're switching all readers to go through FileGroupReader for consistency as
part of the 1.1.0 release goals so that is why I cleaned this up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]