fhan688 commented on code in PR #18897:
URL: https://github.com/apache/hudi/pull/18897#discussion_r3360671093
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -140,7 +141,10 @@ public static DataStream<RowData> bulkInsert(Configuration
conf, RowType rowType
"Consistent hashing bucket index does not work with bulk insert
using FLINK engine. Use simple bucket index or Spark engine.");
}
String indexKeys = OptionsResolver.getIndexKeyField(conf);
- BucketIndexPartitioner<HoodieKey> partitioner = new
BucketIndexPartitioner<>(conf, indexKeys);
+ Partitioner<HoodieKey> partitioner =
+ OptionsResolver.shouldUseBucketRemotePartitioner(conf)
Review Comment:
> can we introduce a factory class to create these partitioners?
Done. I added `BucketIndexPartitionerFactory` to centralize the local/remote
simple bucket partitioner selection and updated both bulk insert and streaming
simple bucket paths to use it. I also added unit coverage for both local and
remote partitioner creation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]