Resolved by downgrading Client to 2.2.2 and implementing an application level 
heartbeat on every Producer to avoid he UNKNOWN_PRODUCER_ID issue.

> On 9/03/2020, at 16:08, James Olsen <ja...@inaseq.com> wrote:
> 
> P.S. I guess the big question is what is the best way to handle or avoid 
> UNKNOWN_PRODUCER_ID when running versions that don’t include KAFKA-7190 / 
> KAFKA-8710 ?
> 
> We are using non-transactional idempotent Producers.
> 
>> On 9/03/2020, at 12:59 PM, James Olsen <ja...@inaseq.com> wrote:
>> 
>> For completeness I have also tested 2.4.0 Broker with 2.4.0 Client. All 
>> works correctly.  Unfortunately as we are on AWS MSK we don’t have the 
>> option to use 2.4.0 for the Brokers.
>> 
>> So now I guess the question changes to what combo is best for us and will it 
>> avoid UNKNOWN_PRODUCER_ID problems?
>> 
>> We can choose 2.2.1 or 2.3.1 for the Broker (AWS recommend 2.2.1 although 
>> don’t state why).  Based on the experiences below, I would then go with the 
>> corresponding 2.2.2 or 2.3.1 Client version.
>> 
>> Which combo would people recommend?
>> 
>>> On 9/03/2020, at 12:03 PM, James Olsen <ja...@inaseq.com> wrote:
>>> 
>>> Jamie,
>>> 
>>> I’ve just tested with 2.3.1 Broker and 2.3.1 Client and it works correctly. 
>>>  So with that setup it does deliver the batch as soon as any partition has 
>>> data.  This is what we would expect from the Kafka docs.
>>> 
>>> So it looks like an issue with the 2.4.0 Client.  This is concerning as I 
>>> wanted the fix for https://issues.apache.org/jira/browse/KAFKA-7190 as we 
>>> may have some very quiet Topics.    2.3.x does have some handling for this 
>>> as implied by https://issues.apache.org/jira/browse/KAFKA-8483 but I’m not 
>>> sure it is as complete.
>>> 
>>> Regards, James.
>>> 
>>> On 9/03/2020, at 11:54 AM, Jamie 
>>> <jamied...@aol.co.uk<mailto:jamied...@aol.co.uk>> wrote:
>>> 
>>> Hi James,
>>> 
>>> My understanding is that consumers will only ever have 1 in flight request 
>>> to each broker that has leader partitions of topics that it is subscribed 
>>> to. The fetch requests will ask for records for all leader partitions on 
>>> the broker so if the consumer is consuming from more than one partition on 
>>> a broker then they will be batched into one request. I assume this means if 
>>> there are some partitions with no data available then the request will wait 
>>> for fetch.max.wait.ms even if some of the partitions have more than 
>>> fetch.min.bytes data available to be read instantly?
>>> 
>>> Thanks,
>>> 
>>> Jamie
>>> 
>>> Sent from AOL Mobile Mail
>>> Get the new AOL app: mail.mobile.aol.com<http://mail.mobile.aol.com/>
>>> 
>>> On Sunday, 8 March 2020, James Olsen 
>>> <ja...@inaseq.com<mailto:ja...@inaseq.com>> wrote:
>>> 
>>> Using 2.3.1 Brokers makes things worse.  There are now 2 fetch.max.wait.ms 
>>> delays before messages are delivered even though they were available at the 
>>> beginning.
>>> 
>>> 2020-03-09 11:40:23,878 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch 
>>> READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data 
>>> (error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, 
>>> preferredReadReplica = absent, abortedTransactions = null, 
>>> recordsSizeInBytes=280)
>>> 2020-03-09 11:40:23,878 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] 
>>> Ignoring fetched records for partition Ledger-1 since it no longer has 
>>> valid position
>>> 2020-03-09 11:40:23,878 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
>>> READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
>>> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>> 2020-03-09 11:40:23,878 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] 
>>> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), 
>>> toForget=(Ledger-1), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 
>>> rack: null)
>>> 2020-03-09 11:40:24,382 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
>>> READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
>>> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>> 2020-03-09 11:40:24,382 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] 
>>> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), 
>>> implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)
>>> 2020-03-09 11:40:24,382 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] 
>>> Handling OffsetsForLeaderEpoch response for Ledger-1. Got offset 29 for 
>>> epoch 0
>>> 2020-03-09 11:40:24,885 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
>>> READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
>>> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>> 2020-03-09 11:40:24,885 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
>>> READ_UNCOMMITTED fetch request for partition Ledger-1 at position 
>>> FetchPosition{offset=28, offsetEpoch=Optional[0], 
>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>> 2020-03-09 11:40:24,885 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] 
>>> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), 
>>> toForget=(), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: 
>>> null)
>>> 2020-03-09 11:40:24,887 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch 
>>> READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data 
>>> (error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, 
>>> preferredReadReplica = absent, abortedTransactions = null, 
>>> recordsSizeInBytes=280)
>>> 2020-03-09 11:40:24,889 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
>>> READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
>>> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>> 2020-03-09 11:40:24,889 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
>>> READ_UNCOMMITTED fetch request for partition Ledger-1 at position 
>>> FetchPosition{offset=29, offsetEpoch=Optional[0], 
>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>> 2020-03-09 11:40:24,889 DEBUG 
>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 
>>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
>>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] 
>>> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), 
>>> toForget=(), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: 
>>> null)
>>> 
>>> 
>>>> On 9/03/2020, at 10:48 AM, James Olsen 
>>>> <ja...@inaseq.com<mailto:ja...@inaseq.com>> wrote:
>>>> 
>>>> Thanks for your response.  Yes the second issue can be mitigated by 
>>>> reducing the fetch.max.wait.ms although reducing it too far creates 
>>>> excessive CPU load on the Brokers.  However I've done some further testing 
>>>> and found what looks like the underlying cause.
>>>> 
>>>> In the scenario below the Consumer is consuming from 2 Partitions 
>>>> (MyTopic-0 and MyTopic-1).  There is a cycle of messages being fetched and 
>>>> ignored.  In each cycle a subsequent fetch to get them again does not 
>>>> occur until after a complete fetch.max.wait.ms expires.  I suspect this is 
>>>> due initially to the fact that MyTopic-0 has never had any messages and 
>>>> hence has no epoch and subsequently is being fetched on it’s own - but 
>>>> being empty results in the delay.  Someone who knows more about the 
>>>> meaning of "toSend=(), toForget=(MyTopic-1), implied=(MyTopic-0)” might be 
>>>> able to enlighten things further.
>>>> 
>>>> I can post a more complete log of this if anyone wants to take a look.
>>>> 
>>>> I’m going to try Kafka 2.3 Brokers to see if the "Skipping validation …” 
>>>> bit has any impact.
>>>> 
>>>> 2020-03-09 09:46:43,093 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch 
>>>> READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data 
>>>> (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, 
>>>> preferredReadReplica = absent, abortedTransactions = null, 
>>>> recordsSizeInBytes=573)
>>>> 
>>>> 2020-03-09 09:46:43,093 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Ignoring 
>>>> fetched records for partition MyTopic-1 since it no longer has valid 
>>>> position
>>>> 
>>>> 2020-03-09 09:46:43,093 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
>>>> READ_UNCOMMITTED fetch request for partition MyTopic-0 at position 
>>>> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
>>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>>> 
>>>> 2020-03-09 09:46:43,093 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending 
>>>> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(MyTopic-1), 
>>>> implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null)
>>>> 
>>>> 2020-03-09 09:46:43,095 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Skipping 
>>>> validation of fetch offsets for partitions [MyTopic-1] since the broker 
>>>> does not support the required protocol version (introduced in Kafka 2.3)
>>>> 
>>>> 2020-03-09 09:46:43,597 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
>>>> READ_UNCOMMITTED fetch request for partition MyTopic-0 at position 
>>>> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
>>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>>> 
>>>> 2020-03-09 09:46:43,597 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
>>>> READ_UNCOMMITTED fetch request for partition MyTopic-1 at position 
>>>> FetchPosition{offset=40, offsetEpoch=Optional[0], 
>>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>>>> 
>>>> 2020-03-09 09:46:43,597 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending 
>>>> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(MyTopic-1), toForget=(), 
>>>> implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null)
>>>> 
>>>> 2020-03-09 09:46:43,599 DEBUG 
>>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch 
>>>> READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data 
>>>> (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, 
>>>> preferredReadReplica = absent, abortedTransactions = null, 
>>>> recordsSizeInBytes=573)
>>>> 
>>>> 
>>>> On 5/03/2020, at 11:45 PM, M. Manna 
>>>> <manme...@gmail.com<mailto:manme...@gmail.com><mailto:manme...@gmail.com<mailto:manme...@gmail.com>>>
>>>>  wrote:
>>>> 
>>>> Hi James,
>>>> 
>>>> 3 Consumers in a group means you are having 20 partitions per consumer (as
>>>> per your 60 partition and 1 CGroup setup), 5 means 12. There's nothing
>>>> special about these numbers as you also noticed.
>>>> Have you tried setting fetch.max.wait.ms = 0 and see whether that's making
>>>> a difference for you?
>>>> 
>>>> Thanks,
>>>> 
>>>> 
>>>> On Thu, 5 Mar 2020 at 03:43, James Olsen 
>>>> <ja...@inaseq.com<mailto:ja...@inaseq.com><mailto:ja...@inaseq.com<mailto:ja...@inaseq.com>>>
>>>>  wrote:
>>>> 
>>>> I’m seeing behaviour that I don’t understand when I have Consumers
>>>> fetching from multiple Partitions from the same Topic.  There are two
>>>> different conditions arising:
>>>> 
>>>> 1. A subset of the Partitions allocated to a given Consumer not being
>>>> consumed at all.  The Consumer appears healthy, the Thread is running and
>>>> logging activity and is successfully processing records from some of the
>>>> Partitions it has been assigned.  I don’t think this is due to the first
>>>> Partition fetched filling a Batch (KIP-387).  The problem does not occur if
>>>> we have a particular number of Consumers (3 in this case) but it has failed
>>>> with a range of other larger values.  I don’t think there is anything
>>>> special about 3 - it just happens to work OK with that value although it is
>>>> the same as the Broker and Replica count.  When we tried 6, 5 Consumers
>>>> were fine but 1 exhibited this issue.
>>>> 
>>>> 2. Up to a half second delay between Producer sending and Consumer
>>>> receiving a message.  This looks suspiciously like the 
>>>> fetch.max.wait.ms=500
>>>> but we also have fetch.min.bytes=1 so should get messages as soon as
>>>> something is available.  The only explanation I can think of is if the
>>>> fetch.max.wait.ms is applied in full to the first Partition checked and
>>>> it remains empty for the duration.  Then it moves on to a subsequent
>>>> non-empty Partition and delivers messages from there.
>>>> 
>>>> Our environment is AWS MSK (Kafka 2.2.1) and Kafka Java client 2.4.0.
>>>> 
>>>> All environments appear healthy and under light load, e.g. clients only
>>>> operating at a 1-2% CPU, Brokers (3) at 5-10% CPU.  No swap, no crashes,
>>>> no dead threads etc.
>>>> 
>>>> Typical scenario is a Topic with 60 Partitions, 3 Replicas and a single
>>>> ConsumerGroup with 5 Consumers.  The Partitioning is for semantic purposes
>>>> with the intention being to add more Consumers as the business grows and
>>>> load increases.  Some of the Partitions are always empty due to using short
>>>> string keys and the default Partitioner - we will probably implement a
>>>> custom Partitioner to achieve better distribution in the near future.
>>>> 
>>>> I don’t have access to the detailed JMX metrics yet but am working on that
>>>> in the hope it will help diagnose.
>>>> 
>>>> Thoughts and advice appreciated!
>>>> 
>>> 
>>> 
>> 
> 

Reply via email to