The reason two Flink jobs using a Kafka consumer with the same consumer
group are seeing the same events is that Flink's FlinkKafkaConsumer does
not participate in Kafka's consumer group management. Instead Flink
manually assigns all partitions to the source operators (on a per job
basis). The consumer group will only be used to commit the current
offset to the Kafka brokers.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:42 AM op <520075...@qq.com> wrote:

>     hi,
>     i am confused about consumer group of FlinkKafkaConsumer,
>     i have two applications,with the same code like this:
> //---------------------------
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  Env.setRestartStrategy(RestartStrategies.noRestart())
>  val consumerProps = new Properties()
>  consumerProps.put("bootstrap.servers", brokers)
>  consumerProps.put("group.id", "test1234")
>
>  val consumer = new FlinkKafkaConsumer[String](topic,new 
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  Env.addSource(consumer).print()
>  Env.execute()
>
> //-----------------------------------
>
> then i launch both,they have the same topic and  group.id,and when i send 
> some message to the topic,
>
> i find both application consume all the data ,which does‘t behave as kafka 
> consumer group,
>
> can someone tell me why?
>
>

Reply via email to