Ricardo,

Thanks for your response.  I've been looking over your advice and examining the 
Kafka client code and our logs.  I'm still thinking that the issue is simply 
that the Broker can't keep up with the offset commit rate.  The reasons for 
this are:

* The first message in any sequence is 
"org.apache.kafka.common.errors.TimeoutException: The request timed out."
* The subsequent "Caused by: 
org.apache.kafka.common.errors.DisconnectException" messages are because the 
consumer has disconnected itself due to the above timeout (this is evident in 
the Kafka client code).  It is not because the server side has failed (other 
than not responding in time to the earlier commit).  We get one of these errors 
for every commitAsync that was queued up behind the one that timed out.
* The "Group coordinator 
b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094>..." 
remains available to other consumers.
* We do not get any heartbeat timeouts or rebalances.
* The consumer continues to process incoming messages in a timely fashion (this 
can be observed by comparing the record create timestamp to the consumer log 
timestamp).
* The are no error or dropped packets in the network monitoring.

I'm trying to get access to the various commit-*metrics from 
https://kafka.apache.org/documentation/#consumer_group_monitoring to confirm my 
hypothesis.

But at this stage I still think my original questions are the relevant ones.

So I think it is likely we are expecting too much to do commitAsync after every 
poll, particularly considering that this consumer is committing 10 partitions 
each time when only 1 has usually changed.  I had a look at the server side 
code that I think is relevant 
(https://github.com/apache/kafka/tree/trunk/core/src/main/scala/kafka/coordinator/group)
 and don't see it doing anything smart around avoiding work where the offset 
has not changed (but I'm not very Scala literate so happy to be told I missed 
something).

If I'm right about this then our options are either to make our manual commit 
smarter or use auto-commit if it is already smart and suitable.

* Does auto-commit only commit changed offsets?

* Does auto-commit only commit the offsets from the prior poll during the 
current poll (assuming auto.commit.interval.ms has been met)?  This is not 
described in the standard docs but I think I did read it somewhere. We would 
not want the auto-commit to ever commit the offsets for the record(s) being 
processed during the current poll.

* Does auto-commit include a commitSync during a clean KafkaConsumer.close?

If not we'll probably implement that ourselves and also incorporate an 
equivalent to auto.commit.interval.ms.

Regards, James.

On 20/06/2020, at 02:15, Ricardo Ferreira 
<rifer...@riferrei.com<mailto:rifer...@riferrei.com>> wrote:


James,

If I were you I would start investigating what is causing this network drops 
between your cluster and your consumers. The following messages are some 
indications of this:

* "Offset commit failed on partition MyTopic-53 at offset 957: The request 
timed out."

* "Caused by: org.apache.kafka.common.errors.DisconnectException"

* "Group coordinator 
b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094><http://2.redacted.amazonaws.com:9094><http://2.redacted.amazonaws.com:9094/>
 (id: redacted rack: null) is unavailable or invalid, will attempt rediscovery"

In Kafka, the group coordinator is one of the brokers that receives heartbeats 
and pull requests from consumers. Heartbeats are used to detect when a consumer 
is no longer available; whereas pull requests are literally the pull requests 
sent by consumers. Regardless, when no heartbeats are detected from a given 
period the group coordinator consider the consumer dead and triggers and 
rebalance where the partitions will be reassigned. If the group coordinator is 
no longer available (as described in one of the error messages) then this whole 
process becomes stale.

Moreover, `commitAsync()` calls as the name implies are asynchronous and 
doesn't block the consumer thread until an response is sent from the cluster. 
However, if this response never comes then it will count towards the amount of 
time specified in the property `max.poll.interval.ms` which if maxed out will 
trigger the consumer to leave the consumer group. Again, it all boils down to 
how fast the network is enabling all of this without taking to much time.

Since you are using AWS MSK then you can use AWS native tools (such as 
CloudWatch, VPC logs, and the AWSSupport-SetupIPMonitoringFromVPC) to better 
troubleshoot these networking issues. I would also file a support ticket 
against the MSK service since some of these networking issues has to do with 
one of the brokers being unavailable -- something that is not supposed to 
happen.

Thanks,

-- Ricardo

On 6/18/20 9:18 PM, James Olsen wrote:

We are using AWS MSK with Kafka 2.4.1 (and same client version), 3 Brokers.  We 
are seeing fairly frequent consumer offset commit fails as shown in the example 
logs below.  Things continue working as they are all retriable, however I would 
like to improve this situation.

The issue occurs most often on the Consumer processing our busiest partition 
(MyTopic-50 in the case below).

We are using KafkaConsumer::commitAsync to manage the offsets and calling it 
after processing all the records in a given poll - probably mostly one message 
per poll and around 10 messages per second.  Doesn't seem like a heavy load and 
the consumer itself is keeping up fine.

The consumer is processing 10 Partitions on the Topic, most of which have not 
changed, e.g. in the logs below the first message refers to MyTopic-53 at 
offset 957, which actually hadn't changed for several minutes.

I note the the standard auto-commit-offsets functionality throttles the commit 
to once every 5 seconds by default.

Are we expecting too much to do commitAsync each time as we do?  We could build 
in a throttling like auto-commit does.
Is it possible that the unchanged partition offsets that commitAsync sends is 
creating unnecessary load? We could use the version of commitAsync that takes 
the map of offsets and only commit the ones we know have changed.
Does auto-commit already optimise to send only changed offsets?  If so we could 
consider switching to auto-commit.

Any advice or thoughts on the best option is appreciated.

Example logs...

2020-06-18 23:53:01,225 WARN  
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit failed on partition MyTopic-53 at 
offset 957: The request timed out.

2020-06-18 23:53:01,225 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Group coordinator 
b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094><http://2.redacted.amazonaws.com:9094><http://2.redacted.amazonaws.com:9094/>
 (id: redacted rack: null) is unavailable or invalid, will attempt rediscovery

2020-06-18 23:53:01,225 ERROR 
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit with offsets 
{MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, 
MyTopic-50=OffsetAndMetadata{offset=131419049, leaderEpoch=1, metadata=''}, 
MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, 
MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, 
MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, 
MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, 
MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, 
MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed 
out.

2020-06-18 23:53:01,353 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Discovered group coordinator 
b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094><http://b-2.redacted.amazonaws.com:9094><http://b-2.redacted.amazonaws.com:9094/>
 (id: redacted rack: null)

2020-06-18 23:53:01,355 ERROR 
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit with offsets 
{MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, 
MyTopic-50=OffsetAndMetadata{offset=131419050, leaderEpoch=1, metadata=''}, 
MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, 
MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, 
MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, 
MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, 
MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, 
MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException

2020-06-18 23:53:01,355 ERROR 
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit with offsets 
{MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, 
MyTopic-50=OffsetAndMetadata{offset=131419051, leaderEpoch=1, metadata=''}, 
MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, 
MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, 
MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, 
MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, 
MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, 
MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException

... plus many more with increasing 50=OffsetAndMetadata{offset=xxx ...} (the 
busy Partition).

Thanks, James.



Reply via email to