Hi Hatem, Could it be that you don't have checkpointing enabled? Flink only commits its offset when a checkpoint has been completed successfully, as explained on https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing
Best regards, Martijn On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa <m...@hatem.co> wrote: > Hello, > > I have two questions that are related to each other: > > *First question:* > > I have been trying to set `client.id` to set a kafka client quota > <https://kafka.apache.org/documentation.html#design_quotas> for > consumer_byte_rate since whenever our kafka job gets redeployed it reads a > lot of data from our kafka cluster causing a denial of service for our > kafka cluster. However `client.id` gets overridden by flink source here > <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L87>. > How would I enforce quotas for flink kafka source? > > *Second question:* > > Also something I didn't quite understand when describing our consumer > group in kafka why I don't see the metadata for the consumer group > information (consumer id, client id & host) and I get that the consumer > group has no active members but it's actually active and consuming. > > *Example describing a flink consumer group* > >> ./kafka-consumer-groups.sh --bootstrap-server kafka-server-address:9092 >> --describe --group flink-consumer-group >> Consumer group 'flink-consumer-group' has no active members. >> GROUP TOPIC PARTITION CURRENT-OFFSET >> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID >> flink_consumer_group topic_name 1 514588965 514689721 >> 100756 - >> - - > > > > *Example describing a normal consumer group written using a confluent > kafka python library.* > >> ./kafka-consumer-groups.sh ---bootstrap-server kafka-server-address:9092 >> --describe --group python_confluent_kafka_consumer >> GROUP TOPIC >> PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID >> >> HOST CLIENT-ID >> python_confluent_kafka_consumer topic_name 1 >> 17279532 17279908 376 >> python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e >> /<HOST-IP> python_confluent_kafka_consumer_client_id > > > > I am using flink version 1.15. > > Thanks, > Hatem > > > >