nsivabalan commented on code in PR #13472:
URL: https://github.com/apache/hudi/pull/13472#discussion_r2162778221


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -250,9 +251,9 @@ public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O
   }
 
   @Override
-  public ReaderContextFactory<?> getReaderContextFactory(HoodieTableMetaClient 
metaClient) {
+  public ReaderContextFactory<?> getReaderContextFactory(HoodieTableMetaClient 
metaClient, HoodieRecord.HoodieRecordType recordType) {

Review Comment:
   do we have UTs for this?



##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java:
##########
@@ -304,15 +304,16 @@ protected HoodieMergeHandle getUpdateHandle(String 
instantTime, String partition
       }
     }
     return HoodieMergeHandleFactory.create(config, instantTime, this, 
keyToNewRecords, partitionPath, fileId,
-        dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+        dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt, 
Option.of(context.getReaderContextFactory(metaClient, 
config.getRecordMerger().getRecordType())));
   }
 
   @Override
   public Iterator<List<WriteStatus>> handleInsert(
       String instantTime, String partitionPath, String fileId,
       Map<String, HoodieRecord<?>> recordMap) {
     HoodieCreateHandle<?, ?, ?, ?> createHandle =
-        new HoodieCreateHandle(config, instantTime, this, partitionPath, 
fileId, recordMap, taskContextSupplier);
+        new HoodieCreateHandle(config, instantTime, this, partitionPath, 
fileId, recordMap, taskContextSupplier,
+            Option.of(context.getReaderContextFactory(metaClient, 
config.getRecordMerger().getRecordType())));

Review Comment:
   I see we are passing in Option.empty for flink. 
   since we don't support SI. 
   then, why are we trying to pass the reader context factory for java? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -280,7 +280,7 @@ private CompletableFuture<HoodieData<WriteStatus>> 
runClusteringForGroupAsyncAsR
   private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext 
jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
     List<ClusteringOperation> clusteringOps = 
clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
     int readParallelism = 
Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
-    ReaderContextFactory<T> readerContextFactory = 
getEngineContext().getReaderContextFactory(getHoodieTable().getMetaClient());
+    ReaderContextFactory<T> readerContextFactory = 
getEngineContext().getReaderContextFactory(getHoodieTable().getMetaClient(), 
recordType);

Review Comment:
   why not hard code to SPARK as you have done below ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to