nsivabalan commented on code in PR #13472:
URL: https://github.com/apache/hudi/pull/13472#discussion_r2162778221
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -250,9 +251,9 @@ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O
}
@Override
- public ReaderContextFactory<?> getReaderContextFactory(HoodieTableMetaClient
metaClient) {
+ public ReaderContextFactory<?> getReaderContextFactory(HoodieTableMetaClient
metaClient, HoodieRecord.HoodieRecordType recordType) {
Review Comment:
do we have UTs for this?
##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java:
##########
@@ -304,15 +304,16 @@ protected HoodieMergeHandle getUpdateHandle(String
instantTime, String partition
}
}
return HoodieMergeHandleFactory.create(config, instantTime, this,
keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt,
Option.of(context.getReaderContextFactory(metaClient,
config.getRecordMerger().getRecordType())));
}
@Override
public Iterator<List<WriteStatus>> handleInsert(
String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<?>> recordMap) {
HoodieCreateHandle<?, ?, ?, ?> createHandle =
- new HoodieCreateHandle(config, instantTime, this, partitionPath,
fileId, recordMap, taskContextSupplier);
+ new HoodieCreateHandle(config, instantTime, this, partitionPath,
fileId, recordMap, taskContextSupplier,
+ Option.of(context.getReaderContextFactory(metaClient,
config.getRecordMerger().getRecordType())));
Review Comment:
I see we are passing in Option.empty for flink.
since we don't support SI.
then, why are we trying to pass the reader context factory for java?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -280,7 +280,7 @@ private CompletableFuture<HoodieData<WriteStatus>>
runClusteringForGroupAsyncAsR
private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext
jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
List<ClusteringOperation> clusteringOps =
clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
int readParallelism =
Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
- ReaderContextFactory<T> readerContextFactory =
getEngineContext().getReaderContextFactory(getHoodieTable().getMetaClient());
+ ReaderContextFactory<T> readerContextFactory =
getEngineContext().getReaderContextFactory(getHoodieTable().getMetaClient(),
recordType);
Review Comment:
why not hard code to SPARK as you have done below ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]