Resolved by downgrading Client to 2.2.2 and implementing an application level heartbeat on every Producer to avoid he UNKNOWN_PRODUCER_ID issue.
> On 9/03/2020, at 16:08, James Olsen <ja...@inaseq.com> wrote: > > P.S. I guess the big question is what is the best way to handle or avoid > UNKNOWN_PRODUCER_ID when running versions that don’t include KAFKA-7190 / > KAFKA-8710 ? > > We are using non-transactional idempotent Producers. > >> On 9/03/2020, at 12:59 PM, James Olsen <ja...@inaseq.com> wrote: >> >> For completeness I have also tested 2.4.0 Broker with 2.4.0 Client. All >> works correctly. Unfortunately as we are on AWS MSK we don’t have the >> option to use 2.4.0 for the Brokers. >> >> So now I guess the question changes to what combo is best for us and will it >> avoid UNKNOWN_PRODUCER_ID problems? >> >> We can choose 2.2.1 or 2.3.1 for the Broker (AWS recommend 2.2.1 although >> don’t state why). Based on the experiences below, I would then go with the >> corresponding 2.2.2 or 2.3.1 Client version. >> >> Which combo would people recommend? >> >>> On 9/03/2020, at 12:03 PM, James Olsen <ja...@inaseq.com> wrote: >>> >>> Jamie, >>> >>> I’ve just tested with 2.3.1 Broker and 2.3.1 Client and it works correctly. >>> So with that setup it does deliver the batch as soon as any partition has >>> data. This is what we would expect from the Kafka docs. >>> >>> So it looks like an issue with the 2.4.0 Client. This is concerning as I >>> wanted the fix for https://issues.apache.org/jira/browse/KAFKA-7190 as we >>> may have some very quiet Topics. 2.3.x does have some handling for this >>> as implied by https://issues.apache.org/jira/browse/KAFKA-8483 but I’m not >>> sure it is as complete. >>> >>> Regards, James. >>> >>> On 9/03/2020, at 11:54 AM, Jamie >>> <jamied...@aol.co.uk<mailto:jamied...@aol.co.uk>> wrote: >>> >>> Hi James, >>> >>> My understanding is that consumers will only ever have 1 in flight request >>> to each broker that has leader partitions of topics that it is subscribed >>> to. The fetch requests will ask for records for all leader partitions on >>> the broker so if the consumer is consuming from more than one partition on >>> a broker then they will be batched into one request. I assume this means if >>> there are some partitions with no data available then the request will wait >>> for fetch.max.wait.ms even if some of the partitions have more than >>> fetch.min.bytes data available to be read instantly? >>> >>> Thanks, >>> >>> Jamie >>> >>> Sent from AOL Mobile Mail >>> Get the new AOL app: mail.mobile.aol.com<http://mail.mobile.aol.com/> >>> >>> On Sunday, 8 March 2020, James Olsen >>> <ja...@inaseq.com<mailto:ja...@inaseq.com>> wrote: >>> >>> Using 2.3.1 Brokers makes things worse. There are now 2 fetch.max.wait.ms >>> delays before messages are delivered even though they were available at the >>> beginning. >>> >>> 2020-03-09 11:40:23,878 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch >>> READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data >>> (error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, >>> preferredReadReplica = absent, abortedTransactions = null, >>> recordsSizeInBytes=280) >>> 2020-03-09 11:40:23,878 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] >>> Ignoring fetched records for partition Ledger-1 since it no longer has >>> valid position >>> 2020-03-09 11:40:23,878 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added >>> READ_UNCOMMITTED fetch request for partition Ledger-0 at position >>> FetchPosition{offset=0, offsetEpoch=Optional.empty, >>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>> 2020-03-09 11:40:23,878 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] >>> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), >>> toForget=(Ledger-1), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 >>> rack: null) >>> 2020-03-09 11:40:24,382 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added >>> READ_UNCOMMITTED fetch request for partition Ledger-0 at position >>> FetchPosition{offset=0, offsetEpoch=Optional.empty, >>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>> 2020-03-09 11:40:24,382 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] >>> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), >>> implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null) >>> 2020-03-09 11:40:24,382 DEBUG >>> [org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] >>> Handling OffsetsForLeaderEpoch response for Ledger-1. Got offset 29 for >>> epoch 0 >>> 2020-03-09 11:40:24,885 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added >>> READ_UNCOMMITTED fetch request for partition Ledger-0 at position >>> FetchPosition{offset=0, offsetEpoch=Optional.empty, >>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>> 2020-03-09 11:40:24,885 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added >>> READ_UNCOMMITTED fetch request for partition Ledger-1 at position >>> FetchPosition{offset=28, offsetEpoch=Optional[0], >>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>> 2020-03-09 11:40:24,885 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] >>> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), >>> toForget=(), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: >>> null) >>> 2020-03-09 11:40:24,887 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch >>> READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data >>> (error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, >>> preferredReadReplica = absent, abortedTransactions = null, >>> recordsSizeInBytes=280) >>> 2020-03-09 11:40:24,889 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added >>> READ_UNCOMMITTED fetch request for partition Ledger-0 at position >>> FetchPosition{offset=0, offsetEpoch=Optional.empty, >>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>> 2020-03-09 11:40:24,889 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added >>> READ_UNCOMMITTED fetch request for partition Ledger-1 at position >>> FetchPosition{offset=29, offsetEpoch=Optional[0], >>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>> 2020-03-09 11:40:24,889 DEBUG >>> [org.apache.kafka.clients.consumer.internals.Fetcher] >>> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer >>> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] >>> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), >>> toForget=(), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: >>> null) >>> >>> >>>> On 9/03/2020, at 10:48 AM, James Olsen >>>> <ja...@inaseq.com<mailto:ja...@inaseq.com>> wrote: >>>> >>>> Thanks for your response. Yes the second issue can be mitigated by >>>> reducing the fetch.max.wait.ms although reducing it too far creates >>>> excessive CPU load on the Brokers. However I've done some further testing >>>> and found what looks like the underlying cause. >>>> >>>> In the scenario below the Consumer is consuming from 2 Partitions >>>> (MyTopic-0 and MyTopic-1). There is a cycle of messages being fetched and >>>> ignored. In each cycle a subsequent fetch to get them again does not >>>> occur until after a complete fetch.max.wait.ms expires. I suspect this is >>>> due initially to the fact that MyTopic-0 has never had any messages and >>>> hence has no epoch and subsequently is being fetched on it’s own - but >>>> being empty results in the delay. Someone who knows more about the >>>> meaning of "toSend=(), toForget=(MyTopic-1), implied=(MyTopic-0)” might be >>>> able to enlighten things further. >>>> >>>> I can post a more complete log of this if anyone wants to take a look. >>>> >>>> I’m going to try Kafka 2.3 Brokers to see if the "Skipping validation …” >>>> bit has any impact. >>>> >>>> 2020-03-09 09:46:43,093 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch >>>> READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data >>>> (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, >>>> preferredReadReplica = absent, abortedTransactions = null, >>>> recordsSizeInBytes=573) >>>> >>>> 2020-03-09 09:46:43,093 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Ignoring >>>> fetched records for partition MyTopic-1 since it no longer has valid >>>> position >>>> >>>> 2020-03-09 09:46:43,093 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added >>>> READ_UNCOMMITTED fetch request for partition MyTopic-0 at position >>>> FetchPosition{offset=0, offsetEpoch=Optional.empty, >>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>>> >>>> 2020-03-09 09:46:43,093 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending >>>> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(MyTopic-1), >>>> implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null) >>>> >>>> 2020-03-09 09:46:43,095 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Skipping >>>> validation of fetch offsets for partitions [MyTopic-1] since the broker >>>> does not support the required protocol version (introduced in Kafka 2.3) >>>> >>>> 2020-03-09 09:46:43,597 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added >>>> READ_UNCOMMITTED fetch request for partition MyTopic-0 at position >>>> FetchPosition{offset=0, offsetEpoch=Optional.empty, >>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>>> >>>> 2020-03-09 09:46:43,597 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added >>>> READ_UNCOMMITTED fetch request for partition MyTopic-1 at position >>>> FetchPosition{offset=40, offsetEpoch=Optional[0], >>>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), >>>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null) >>>> >>>> 2020-03-09 09:46:43,597 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending >>>> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(MyTopic-1), toForget=(), >>>> implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null) >>>> >>>> 2020-03-09 09:46:43,599 DEBUG >>>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer >>>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch >>>> READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data >>>> (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, >>>> preferredReadReplica = absent, abortedTransactions = null, >>>> recordsSizeInBytes=573) >>>> >>>> >>>> On 5/03/2020, at 11:45 PM, M. Manna >>>> <manme...@gmail.com<mailto:manme...@gmail.com><mailto:manme...@gmail.com<mailto:manme...@gmail.com>>> >>>> wrote: >>>> >>>> Hi James, >>>> >>>> 3 Consumers in a group means you are having 20 partitions per consumer (as >>>> per your 60 partition and 1 CGroup setup), 5 means 12. There's nothing >>>> special about these numbers as you also noticed. >>>> Have you tried setting fetch.max.wait.ms = 0 and see whether that's making >>>> a difference for you? >>>> >>>> Thanks, >>>> >>>> >>>> On Thu, 5 Mar 2020 at 03:43, James Olsen >>>> <ja...@inaseq.com<mailto:ja...@inaseq.com><mailto:ja...@inaseq.com<mailto:ja...@inaseq.com>>> >>>> wrote: >>>> >>>> I’m seeing behaviour that I don’t understand when I have Consumers >>>> fetching from multiple Partitions from the same Topic. There are two >>>> different conditions arising: >>>> >>>> 1. A subset of the Partitions allocated to a given Consumer not being >>>> consumed at all. The Consumer appears healthy, the Thread is running and >>>> logging activity and is successfully processing records from some of the >>>> Partitions it has been assigned. I don’t think this is due to the first >>>> Partition fetched filling a Batch (KIP-387). The problem does not occur if >>>> we have a particular number of Consumers (3 in this case) but it has failed >>>> with a range of other larger values. I don’t think there is anything >>>> special about 3 - it just happens to work OK with that value although it is >>>> the same as the Broker and Replica count. When we tried 6, 5 Consumers >>>> were fine but 1 exhibited this issue. >>>> >>>> 2. Up to a half second delay between Producer sending and Consumer >>>> receiving a message. This looks suspiciously like the >>>> fetch.max.wait.ms=500 >>>> but we also have fetch.min.bytes=1 so should get messages as soon as >>>> something is available. The only explanation I can think of is if the >>>> fetch.max.wait.ms is applied in full to the first Partition checked and >>>> it remains empty for the duration. Then it moves on to a subsequent >>>> non-empty Partition and delivers messages from there. >>>> >>>> Our environment is AWS MSK (Kafka 2.2.1) and Kafka Java client 2.4.0. >>>> >>>> All environments appear healthy and under light load, e.g. clients only >>>> operating at a 1-2% CPU, Brokers (3) at 5-10% CPU. No swap, no crashes, >>>> no dead threads etc. >>>> >>>> Typical scenario is a Topic with 60 Partitions, 3 Replicas and a single >>>> ConsumerGroup with 5 Consumers. The Partitioning is for semantic purposes >>>> with the intention being to add more Consumers as the business grows and >>>> load increases. Some of the Partitions are always empty due to using short >>>> string keys and the default Partitioner - we will probably implement a >>>> custom Partitioner to achieve better distribution in the near future. >>>> >>>> I don’t have access to the detailed JMX metrics yet but am working on that >>>> in the hope it will help diagnose. >>>> >>>> Thoughts and advice appreciated! >>>> >>> >>> >> >