The reason two Flink jobs using a Kafka consumer with the same consumer group are seeing the same events is that Flink's FlinkKafkaConsumer does not participate in Kafka's consumer group management. Instead Flink manually assigns all partitions to the source operators (on a per job basis). The consumer group will only be used to commit the current offset to the Kafka brokers.
Cheers, Till On Wed, Sep 2, 2020 at 9:42 AM op <520075...@qq.com> wrote: > hi, > i am confused about consumer group of FlinkKafkaConsumer, > i have two applications,with the same code like this: > //--------------------------- > > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > Env.setRestartStrategy(RestartStrategies.noRestart()) > val consumerProps = new Properties() > consumerProps.put("bootstrap.servers", brokers) > consumerProps.put("group.id", "test1234") > > val consumer = new FlinkKafkaConsumer[String](topic,new > KafkaStringSchema,consumerProps).setStartFromLatest() > Env.addSource(consumer).print() > Env.execute() > > //----------------------------------- > > then i launch both,they have the same topic and group.id,and when i send > some message to the topic, > > i find both application consume all the data ,which does‘t behave as kafka > consumer group, > > can someone tell me why? > >