Ricardo, Thanks for your response. I've been looking over your advice and examining the Kafka client code and our logs. I'm still thinking that the issue is simply that the Broker can't keep up with the offset commit rate. The reasons for this are:
* The first message in any sequence is "org.apache.kafka.common.errors.TimeoutException: The request timed out." * The subsequent "Caused by: org.apache.kafka.common.errors.DisconnectException" messages are because the consumer has disconnected itself due to the above timeout (this is evident in the Kafka client code). It is not because the server side has failed (other than not responding in time to the earlier commit). We get one of these errors for every commitAsync that was queued up behind the one that timed out. * The "Group coordinator b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094>..." remains available to other consumers. * We do not get any heartbeat timeouts or rebalances. * The consumer continues to process incoming messages in a timely fashion (this can be observed by comparing the record create timestamp to the consumer log timestamp). * The are no error or dropped packets in the network monitoring. I'm trying to get access to the various commit-*metrics from https://kafka.apache.org/documentation/#consumer_group_monitoring to confirm my hypothesis. But at this stage I still think my original questions are the relevant ones. So I think it is likely we are expecting too much to do commitAsync after every poll, particularly considering that this consumer is committing 10 partitions each time when only 1 has usually changed. I had a look at the server side code that I think is relevant (https://github.com/apache/kafka/tree/trunk/core/src/main/scala/kafka/coordinator/group) and don't see it doing anything smart around avoiding work where the offset has not changed (but I'm not very Scala literate so happy to be told I missed something). If I'm right about this then our options are either to make our manual commit smarter or use auto-commit if it is already smart and suitable. * Does auto-commit only commit changed offsets? * Does auto-commit only commit the offsets from the prior poll during the current poll (assuming auto.commit.interval.ms has been met)? This is not described in the standard docs but I think I did read it somewhere. We would not want the auto-commit to ever commit the offsets for the record(s) being processed during the current poll. * Does auto-commit include a commitSync during a clean KafkaConsumer.close? If not we'll probably implement that ourselves and also incorporate an equivalent to auto.commit.interval.ms. Regards, James. On 20/06/2020, at 02:15, Ricardo Ferreira <rifer...@riferrei.com<mailto:rifer...@riferrei.com>> wrote: James, If I were you I would start investigating what is causing this network drops between your cluster and your consumers. The following messages are some indications of this: * "Offset commit failed on partition MyTopic-53 at offset 957: The request timed out." * "Caused by: org.apache.kafka.common.errors.DisconnectException" * "Group coordinator b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094><http://2.redacted.amazonaws.com:9094><http://2.redacted.amazonaws.com:9094/> (id: redacted rack: null) is unavailable or invalid, will attempt rediscovery" In Kafka, the group coordinator is one of the brokers that receives heartbeats and pull requests from consumers. Heartbeats are used to detect when a consumer is no longer available; whereas pull requests are literally the pull requests sent by consumers. Regardless, when no heartbeats are detected from a given period the group coordinator consider the consumer dead and triggers and rebalance where the partitions will be reassigned. If the group coordinator is no longer available (as described in one of the error messages) then this whole process becomes stale. Moreover, `commitAsync()` calls as the name implies are asynchronous and doesn't block the consumer thread until an response is sent from the cluster. However, if this response never comes then it will count towards the amount of time specified in the property `max.poll.interval.ms` which if maxed out will trigger the consumer to leave the consumer group. Again, it all boils down to how fast the network is enabling all of this without taking to much time. Since you are using AWS MSK then you can use AWS native tools (such as CloudWatch, VPC logs, and the AWSSupport-SetupIPMonitoringFromVPC) to better troubleshoot these networking issues. I would also file a support ticket against the MSK service since some of these networking issues has to do with one of the brokers being unavailable -- something that is not supposed to happen. Thanks, -- Ricardo On 6/18/20 9:18 PM, James Olsen wrote: We are using AWS MSK with Kafka 2.4.1 (and same client version), 3 Brokers. We are seeing fairly frequent consumer offset commit fails as shown in the example logs below. Things continue working as they are all retriable, however I would like to improve this situation. The issue occurs most often on the Consumer processing our busiest partition (MyTopic-50 in the case below). We are using KafkaConsumer::commitAsync to manage the offsets and calling it after processing all the records in a given poll - probably mostly one message per poll and around 10 messages per second. Doesn't seem like a heavy load and the consumer itself is keeping up fine. The consumer is processing 10 Partitions on the Topic, most of which have not changed, e.g. in the logs below the first message refers to MyTopic-53 at offset 957, which actually hadn't changed for several minutes. I note the the standard auto-commit-offsets functionality throttles the commit to once every 5 seconds by default. Are we expecting too much to do commitAsync each time as we do? We could build in a throttling like auto-commit does. Is it possible that the unchanged partition offsets that commitAsync sends is creating unnecessary load? We could use the version of commitAsync that takes the map of offsets and only commit the ones we know have changed. Does auto-commit already optimise to send only changed offsets? If so we could consider switching to auto-commit. Any advice or thoughts on the best option is appreciated. Example logs... 2020-06-18 23:53:01,225 WARN [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] Offset commit failed on partition MyTopic-53 at offset 957: The request timed out. 2020-06-18 23:53:01,225 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] Group coordinator b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094><http://2.redacted.amazonaws.com:9094><http://2.redacted.amazonaws.com:9094/> (id: redacted rack: null) is unavailable or invalid, will attempt rediscovery 2020-06-18 23:53:01,225 ERROR [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] Offset commit with offsets {MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, MyTopic-50=OffsetAndMetadata{offset=131419049, leaderEpoch=1, metadata=''}, MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out. 2020-06-18 23:53:01,353 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] Discovered group coordinator b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094><http://b-2.redacted.amazonaws.com:9094><http://b-2.redacted.amazonaws.com:9094/> (id: redacted rack: null) 2020-06-18 23:53:01,355 ERROR [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] Offset commit with offsets {MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, MyTopic-50=OffsetAndMetadata{offset=131419050, leaderEpoch=1, metadata=''}, MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.DisconnectException 2020-06-18 23:53:01,355 ERROR [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] Offset commit with offsets {MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, MyTopic-50=OffsetAndMetadata{offset=131419051, leaderEpoch=1, metadata=''}, MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.DisconnectException ... plus many more with increasing 50=OffsetAndMetadata{offset=xxx ...} (the busy Partition). Thanks, James.