Hi ChienHsing, We are actually working on buffering the already fetched data for paused topicPartitions, so ideally it should not have any effect on performance. Associated jira : https://issues.apache.org/jira/browse/KAFKA-7548
Thanks, Mayuresh On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <chien...@opentext.com> wrote: > Hi Mayuresh, > > Thanks for the input! > > Pausing and Resuming are cumbersome and has some undesirable performance > impact since pausing will in effect clean up the completed fetch and > resuming will call the broker to retrieve again. > > The way I changed the code was just to parse the completed fetch earlier > and ensure the order to retrieve are the same as the completed fetch queue. > I did make code changes to take into account the following in Fetcher class. > > 1) exception handling > 2) ensure the parsed partitions are not included in fetchablePartitions > 3) clear buffer when not in the newly assigned partitions in > clearBufferedDataForUnassignedPartitions > 4) close them properly in close method > > Though the consumer does not guarantee explicit order, KIP 41 (link below) > did intend to ensure fair distribution and therefore the round robin > algorithm in the code. The change I propose was to enhance it. > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP-41:KafkaConsumerMaxRecords-EnsuringFairConsumption > > As for performance, the changes does not add any additional calls to the > broker nor does it introduce significant processing logic; it just parses > the completed fetch earlier and have a list to manage them. > > > CH > > -----Original Message----- > From: Mayuresh Gharat <gharatmayures...@gmail.com> > Sent: Tuesday, December 11, 2018 6:58 PM > To: dev@kafka.apache.org > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption > Across Partitions in KafkaConsumer > > Hi ChienHsing, > > The other way I was thinking, this can be done outside of KafkaConsumer is > by pausing and resuming TopicPartitions (may be in round robin fashion). > There is some gotcha there as in you might not know if the consumer has > already fetched data for the remaining partitions. > Also I am not sure, if we need a KIP for this as the KafkaConsumer does > not guarantee the end user, any order, I believe. So if this change goes > in, I don't think its changing the underlying behavior. > It would be good to check if this change will impact the performance of > the consumer. > > Thanks, > > Mayuresh > > > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu <chien...@opentext.com> > wrote: > > > Hi Mayuresh, > > > > To serve one poll call the logic greedily gets records from one > > completed fetch before including records from the next completed fetch > > from the queue, as you described. > > > > The algorithm remembers the current completed fetch as starting one > > when serving the next poll call. The net effect is that completed > > fetch will be retrieved to serve as many poll calls before retrieving > > records from any other completed fetches. > > > > For example, let's say the consumer has been assigned partition A, B > > and C and the max.poll.records is set to 100. Right now we have > > completed fetch A, and B. Each one has 300 records. It will take 6 > > poll calls to retrieve all record and the sequence of retrieved > > partitions will be: A, A, A, B, B, B. > > > > Ideally, it should alternate between A and B. I was proposing to move > > to the next one fetch for the next poll call based on the order in the > > completed fetch queue, so the order becomes A, B, A, B, A, B. The > > implementation parses the completed fetch only once. > > > > Thanks, CH > > > > -----Original Message----- > > From: Mayuresh Gharat <gharatmayures...@gmail.com> > > Sent: Tuesday, December 11, 2018 1:21 PM > > To: dev@kafka.apache.org > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message > > Consumption Across Partitions in KafkaConsumer > > > > Hi ChienHsing, > > > > Thanks for the KIP. > > It would be great if you can explain with an example, what you mean by " > > Currently the implementation will return available records starting > > from the last partition the last poll call retrieves records from. > > This leads to unfair patterns of record consumption from multiple > partitions." > > > > KafkaConsumer would send fetch requests to multiple brokers and then > > gets the corresponding responses and puts them in to a single queue of > > CompletedFetches. IT then iterates over these completed fetches queue > > and peels of number of records = max.poll.records from each > > completedFetch for each poll() before moving on to next > > completedFetch. Also it does not send a fetch request for a > > TopicPartition, if we already have a buffered data (completedFetch or > > nextInlineRecord) for that TopicPartition. It also moves the > > TopicPartition to the end of the assignment queue, once it has > > received data from broker for that TopicPartition, to maintain round > robin fetch sequence for fairness. > > > > Thanks, > > > > Mayuresh > > > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu <chien...@opentext.com> > > wrote: > > > > > Jason, > > > > > > > > > > > > KIP 41 was initiated by you and this KIP is to change the logic > > > discussed in the Ensure Fair Consumption< > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_ > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIF > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3 > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn-o > > SZuBYfZ_V2OY5ikbksVMzbt9o&e= > > >. > > > Your input on KIP-387< > > > > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org > > >_ > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumptio > > >n > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-X > > >D > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehjays > > >S > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf0k > > >T > > > tRlgk&e=> > > > would be very valuable. > > > > > > > > > > > > Thanks, ChienHsing > > > > > > > > > > > > -----Original Message----- > > > From: ChienHsing Wu <chien...@opentext.com> > > > Sent: Tuesday, December 04, 2018 11:43 AM > > > To: dev@kafka.apache.org > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message > > > Consumption Across Partitions in KafkaConsumer > > > > > > > > > > > > Hi, > > > > > > > > > > > > Any comments/updates? I am not sure the next steps if no one has any > > > further comments. > > > > > > > > > > > > Thanks, CH > > > > > > > > > > > > -----Original Message----- > > > > > > From: ChienHsing Wu > > > <chien...@opentext.com<mailto:chien...@opentext.com>> > > > > > > Sent: Tuesday, November 20, 2018 2:46 PM > > > > > > To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> > > > > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message > > > Consumption Across Partitions in KafkaConsumer > > > > > > > > > > > > Hi Matt, > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > The issue with the current design is that it stays on the previous > > > partition even if the last poll call consumes the max.poll.records; > > > it will consume all records in that partition available at the > > > consumer side to serve multiple poll calls before moving to the next > partition. > > > > > > > > > > > > Introducing another threshold at partition level will decrease the > > > number of records consumed in one partition within one poll call but > > > will still use that same partition as the starting one in the next > > > poll > > call. > > > > > > > > > > > > The same effect can be achieved by setting max.poll.records to 100 I > > > believe. The main difference is that the client will need to make > > > more poll calls when that value is set to 100, and because of the > > > non-blocking nature I believe the cost of extra poll calls are not > > significant. > > > > > > > > > > > > Further thoughts? > > > > > > > > > > > > Thanks, CH > > > > > > > > > > > > -----Original Message----- > > > > > > From: Matt Farmer <m...@frmr.me<mailto:m...@frmr.me>> > > > > > > Sent: Monday, November 19, 2018 9:32 PM > > > > > > To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> > > > > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message > > > Consumption Across Partitions in KafkaConsumer > > > > > > > > > > > > Hi there, > > > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > We’ve run into issues with this at Mailchimp so something to address > > > consuming behavior would save us from having to always ensure we’re > > > running enough consumers that each consumer has only one partition > > > (which is our usual MO). > > > > > > > > > > > > I wonder though if it would be simpler and more powerful to define > > > the maximum number of records the consumer should pull from one > > > partition before pulling some records from another? > > > > > > > > > > > > So if you set max.poll.records to 500 and then some new setting, > > > max.poll.records.per.partition, to 100 then the Consumer would > > > switch what partition it reads from every 100 records - looping back > > > around to the first partition that had records if there aren’t 5 or > > > more partitions with records. > > > > > > > > > > > > What do you think? > > > > > > > > > > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <chien...@opentext.com > > > <mailto:chien...@opentext.com>> wrote: > > > > > > > > > > > > > Hi, could anyone please review this KIP? > > > > > > > > > > > > > > Thanks, ChienHsing > > > > > > > > > > > > > > From: ChienHsing Wu > > > > > > > Sent: Friday, November 09, 2018 1:10 PM > > > > > > > To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> > > > > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across > > > > > > > Partitions in KafkaConsumer > > > > > > > > > > > > > > Just to check: Will anyone review this? It's been silent for a > week... > > > > > > > Thanks, ChienHsing > > > > > > > > > > > > > > From: ChienHsing Wu > > > > > > > Sent: Monday, November 05, 2018 4:18 PM > > > > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto: > > > > > > > dev@kafka.apache.org<mailto:dev@kafka.apache.org>>> > > > > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across > > > > Partitions > > > > > > > in KafkaConsumer > > > > > > > > > > > > > > Hi I just put together the KIP page as requested. This email is to > > > > > > > start the discussion thread. > > > > > > > > > > > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in > > > > > > > KafkaConsumer< > > > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache. > > > > or > > > > g_ > > > > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsump > > > > ti > > > > on > > > > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P > > > > 1- > > > > XD > > > > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8 > > > > Xi > > > > oE > > > > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP > > > > 1b > > > > F5 > > > > > > > 49_KU&e= > > > > > > > > > > > > > > > Pull Request: > > > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_ap > > > > ac > > > > he > > > > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToL > > > > W0 > > > > OF > > > > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5t > > > > BK > > > > 7L > > > > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e= > > > > > > > Jira: > > > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache > > > > .o > > > > rg > > > > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03 > > > > wM > > > > rb > > > > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c > > > > 8Y > > > > rz > > > > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e= > > > > > > > > > > > > > > Thanks, CH > > > > > > > > > > > > > > > > -- > > -Regards, > > Mayuresh R. Gharat > > (862) 250-7125 > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 > -- -Regards, Mayuresh R. Gharat (862) 250-7125