[ 
https://issues.apache.org/jira/browse/FLINK-36817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17902975#comment-17902975
 ] 

david radley commented on FLINK-36817:
--------------------------------------

[~lkokhreidze] I had a search in the src tree  for _new KafkaConsumer_ . I see 
other hits in the code where we create Kafka consumers including 
_KafkaPartitionDiscoverer_ and {_}KafkaConsumerThread{_}. I am not sure what 
your custom consumer is doing, but I suspect you will want it used everywhere 
in place of the default Kafka consumer. WDYT?

 

 
 
 

> Give users ability to provide their own KafkaConsumer when using 
> flink-connector-kafka
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-36817
>                 URL: https://issues.apache.org/jira/browse/FLINK-36817
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Kafka
>            Reporter: Levani Kokhreidze
>            Priority: Major
>              Labels: pull-request-available
>
> In certain scenarios users of the `KafkaSource` in the flink-connector-kafka 
> might want to provide their own KafkaConsumer. Right now this is not possible 
> as consumer is created in the 
> [KafkaPartitionSplitReader|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L97]
>  which makes customisation impossible.
> Proposal is to let users pass `KafkaConsumerFactory` when building the 
> KafkaSource.
> {code:java}
> public interface KafkaConsumerFactory {
>   KafkaConsumer<byte[], byte[]> get(Properties properties);
> }{code}
> Builder will have a default implementation which creates the KafkaConsumer 
> the same as it is done now.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to