fhan688 commented on code in PR #18897:
URL: https://github.com/apache/hudi/pull/18897#discussion_r3360671093


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -140,7 +141,10 @@ public static DataStream<RowData> bulkInsert(Configuration 
conf, RowType rowType
             "Consistent hashing bucket index does not work with bulk insert 
using FLINK engine. Use simple bucket index or Spark engine.");
       }
       String indexKeys = OptionsResolver.getIndexKeyField(conf);
-      BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(conf, indexKeys);
+      Partitioner<HoodieKey> partitioner =
+          OptionsResolver.shouldUseBucketRemotePartitioner(conf)

Review Comment:
   > can we introduce a factory class to create these partitioners?
   
   Done. I added `BucketIndexPartitionerFactory` to centralize the local/remote 
simple bucket partitioner selection and updated both bulk insert and streaming 
simple bucket paths to use it. I also added unit coverage for both local and 
remote partitioner creation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to