Hi, I think the preceding message that the consumer is not a member of the group suggests that there is some connectivity issue. Perhaps, heartbeats are timing out in which case you might want to increase session.timeout.ms [1] and heartbeat.interval.ms.
[1] https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms Regards, Roman On Fri, Aug 27, 2021 at 11:43 AM Qingsheng Ren <renqs...@gmail.com> wrote: > > Hi Hemant, > > One possible reason is that another Kafka consumer is using the same consumer > group id as the one in FlinkKafkaConsumer. You can try to use another > group.id in FlinkKafkaConsumer to validate this. > > If it’s not group id’s problem, there are some Kafka consumer metrics [1] > that might be helpful for debugging this, such as “time-between-poll-avg”, > “heartbeat-rate” and so forth, to check whether it’s poll interval’s problem > as suggested by Kafka’s exception. All Kafka consumer metrics are registered > under metric group “KafkaConsumer” in Flink’s metric system. > > Besides, it might be helpful to set logging level of > org.apache.kafka.clients.consumer to DEBUG or TRACE, which can provide more > information about why offset commit is failed. > > Hope this can help you~ > > [1] https://kafka.apache.org/documentation/#consumer_monitoring > > -- > Best Regards, > > Qingsheng Ren > Email: renqs...@gmail.com > On Aug 26, 2021, 10:25 PM +0800, bat man <tintin0...@gmail.com>, wrote: > > Hi, > > I am using flink 12.1 to consume data from kafka in a streaming job. Using > the flink-connector-kafka_2.12:1.12.1. Kafka broker version is 2.2.1 > In logs I see warnings like this - > > 2021-08-26 13:36:49,903 WARN > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - > Committing offsets to Kafka failed. This does not compromise Flink's > checkpoints. > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. > This means that the time between subsequent calls to poll() was longer than > the configured max.poll.interval.ms, which typically implies that the poll > loop is spending too much time message processing. > You can address this either by increasing max.poll.interval.ms or by reducing > the maximum size of batches returned in poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:790) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:910) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:890) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:258) > > I understand that this might not cause an issue as checkpointing is not > impacted, however metrics monitoring might as I am using burrow to monitor > group offsets. I have already tried to change below properties in kafka > producer configs - > > kafkaProps.setProperty("max.poll.interval.ms","900000"); > kafkaProps.setProperty("max.poll.records","200"); > kafkaProps.setProperty("heartbeat.interval.ms","1000"); > kafkaProps.setProperty("request.timeout.ms","40000"); > kafkaProps.setProperty("session.timeout.ms","10000"); > But the warnings are still present in the logs. > > In addition I see this error just before this warn - > ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - > [Consumer clientId=consumer-3, groupId=xxx] Offset commit failed on partition > xxx-1 at offset 33651: > The coordinator is not aware of this member. > > The code uses watermarkstrategy to extract timestamp and emit watermark. > > Any clue is much appreciated. > > Thanks, > Hemant