> On May 23, 2016, at 10:59 AM, Jason Gustafson <ja...@confluent.io> wrote: > > 2. Maybe there's a better way to lay out the assignment without needing to > explicitly repeat the topic? For example, the leader could sort the topics > for each member and just use an integer to represent the index of each > topic within the sorted list (note this depends on the subscription > including the full topic list). > > Assignment -> [TopicIndex [Partition]] >
Jason, doesn't gzip (or other compression) basically do this? If the topic is a string and the topic is repeated throughout, won't compression basically replace all repeated instances of it with an index reference to the full string? -James > You could even combine these two options so that you have only 3 integers > for each topic assignment: > > Assignment -> [TopicIndex MinPartition MaxPartition] > > There may even be better options with a little more thought. All of this is > just part of the client-side protocol, so it wouldn't require any version > bumps on the broker. What do you think? > > Thanks, > Jason > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <wangg...@gmail.com> wrote: > >> The original concern is that regex may not be efficiently supported >> across-languages, but if there is a neat workaround I would love to learn. >> >> Guozhang >> >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <ism...@juma.me.uk> wrote: >> >>> +1 to Jun's suggestion. >>> >>> Having said that, as a general point, I think we should consider >> supporting >>> topic patterns in the wire protocol. It requires some thinking for >>> cross-language support, but it seems surmountable and it could make >> certain >>> operations a lot more efficient (the fact that a basic regex subscription >>> causes the consumer to request metadata for all topics is not great). >>> >>> Ismael >>> >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>>> I like Jun's suggestion in changing the handling logics of single large >>>> message on the consumer side. >>>> >>>> As for the case of "a single group subscribing to 3000 topics", with >> 100 >>>> consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK, >> we >>>> also have the znode limit which is set to 1Mb by default, though >>> admittedly >>>> it is only for one consumer). And if we do the change as Jun suggested, >>>> 2.5Mb on follower's memory pressure is OK I think. >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Sat, May 21, 2016 at 12:51 PM, Onur Karaman < >>>> onurkaraman.apa...@gmail.com >>>>> wrote: >>>> >>>>> Results without compression: >>>>> 1 consumer 292383 bytes >>>>> 5 consumers 1079579 bytes * the tipping point >>>>> 10 consumers 1855018 bytes >>>>> 20 consumers 2780220 bytes >>>>> 30 consumers 3705422 bytes >>>>> 40 consumers 4630624 bytes >>>>> 50 consumers 5555826 bytes >>>>> 60 consumers 6480788 bytes >>>>> 70 consumers 7405750 bytes >>>>> 80 consumers 8330712 bytes >>>>> 90 consumers 9255674 bytes >>>>> 100 consumers 10180636 bytes >>>>> >>>>> So it looks like gzip compression shrinks the message size by 4x. >>>>> >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun Rao <j...@confluent.io> wrote: >>>>> >>>>>> Onur, >>>>>> >>>>>> Thanks for the investigation. >>>>>> >>>>>> Another option is to just fix how we deal with the case when a >>> message >>>> is >>>>>> larger than the fetch size. Today, if the fetch size is smaller >> than >>>> the >>>>>> fetch size, the consumer will get stuck. Instead, we can simply >>> return >>>>> the >>>>>> full message if it's larger than the fetch size w/o requiring the >>>>> consumer >>>>>> to manually adjust the fetch size. On the broker side, to serve a >>> fetch >>>>>> request, we already do an index lookup and then scan the log a bit >> to >>>>> find >>>>>> the message with the requested offset. We can just check the size >> of >>>> that >>>>>> message and return the full message if its size is larger than the >>>> fetch >>>>>> size. This way, fetch size is really for performance optimization, >>> i.e. >>>>> in >>>>>> the common case, we will not return more bytes than fetch size, but >>> if >>>>>> there is a large message, we will return more bytes than the >>> specified >>>>>> fetch size. In practice, large messages are rare. So, it shouldn't >>>>> increase >>>>>> the memory consumption on the client too much. >>>>>> >>>>>> Jun >>>>>> >>>>>> On Sat, May 21, 2016 at 3:34 AM, Onur Karaman < >>>>>> onurkaraman.apa...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hey everyone. So I started doing some tests on the new >>>>>> consumer/coordinator >>>>>>> to see if it could handle more strenuous use cases like mirroring >>>>>> clusters >>>>>>> with thousands of topics and thought I'd share whatever I have so >>>> far. >>>>>>> >>>>>>> The scalability limit: the amount of group metadata we can fit >> into >>>> one >>>>>>> message >>>>>>> >>>>>>> Some background: >>>>>>> Client-side assignment is implemented in two phases >>>>>>> 1. a PreparingRebalance phase that identifies members of the >> group >>>> and >>>>>>> aggregates member subscriptions. >>>>>>> 2. an AwaitingSync phase that waits for the group leader to >> decide >>>>> member >>>>>>> assignments based on the member subscriptions across the group. >>>>>>> - The leader announces this decision with a SyncGroupRequest. >> The >>>>>>> GroupCoordinator handles SyncGroupRequests by appending all group >>>> state >>>>>>> into a single message under the __consumer_offsets topic. This >>>> message >>>>> is >>>>>>> keyed on the group id and contains each member subscription as >> well >>>> as >>>>>> the >>>>>>> decided assignment for each member. >>>>>>> >>>>>>> The environment: >>>>>>> - one broker >>>>>>> - one __consumer_offsets partition >>>>>>> - offsets.topic.compression.codec=1 // this is gzip >>>>>>> - broker has my pending KAFKA-3718 patch that actually makes use >> of >>>>>>> offsets.topic.compression.codec: >>>>>> https://github.com/apache/kafka/pull/1394 >>>>>>> - around 3000 topics. This is an actual subset of topics from one >>> of >>>>> our >>>>>>> clusters. >>>>>>> - topics have 8 partitions >>>>>>> - topics are 25 characters long on average >>>>>>> - one group with a varying number of consumers each hardcoded >> with >>>> all >>>>>> the >>>>>>> topics just to make the tests more consistent. wildcarding with >> .* >>>>> should >>>>>>> have the same effect once the subscription hits the coordinator >> as >>>> the >>>>>>> subscription has already been fully expanded out to the list of >>>> topics >>>>> by >>>>>>> the consumers. >>>>>>> - I added some log messages to Log.scala to print out the message >>>> sizes >>>>>>> after compression >>>>>>> - there are no producers at all and auto commits are disabled. >> The >>>> only >>>>>>> topic with messages getting added is the __consumer_offsets topic >>> and >>>>>>> they're only from storing group metadata while processing >>>>>>> SyncGroupRequests. >>>>>>> >>>>>>> Results: >>>>>>> The results below show that we exceed the 1000012 byte >>>>>>> KafkaConfig.messageMaxBytes limit relatively quickly (between >> 30-40 >>>>>>> consumers): >>>>>>> 1 consumer 54739 bytes >>>>>>> 5 consumers 261524 bytes >>>>>>> 10 consumers 459804 bytes >>>>>>> 20 consumers 702499 bytes >>>>>>> 30 consumers 930525 bytes >>>>>>> 40 consumers 1115657 bytes * the tipping point >>>>>>> 50 consumers 1363112 bytes >>>>>>> 60 consumers 1598621 bytes >>>>>>> 70 consumers 1837359 bytes >>>>>>> 80 consumers 2066934 bytes >>>>>>> 90 consumers 2310970 bytes >>>>>>> 100 consumers 2542735 bytes >>>>>>> >>>>>>> Note that the growth itself is pretty gradual. Plotting the >> points >>>>> makes >>>>>> it >>>>>>> look roughly linear w.r.t the number of consumers: >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735) >>>>>>> >>>>>>> Also note that these numbers aren't averages or medians or >> anything >>>>> like >>>>>>> that. It's just the byte size from a given run. I did run them a >>> few >>>>>> times >>>>>>> and saw similar results. >>>>>>> >>>>>>> Impact: >>>>>>> Even after adding gzip to the __consumer_offsets topic with my >>>> pending >>>>>>> KAFKA-3718 patch, the AwaitingSync phase of the group fails with >>>>>>> RecordTooLargeException. This means the combined size of each >>>> member's >>>>>>> subscriptions and assignments exceeded the >>>> KafkaConfig.messageMaxBytes >>>>> of >>>>>>> 1000012 bytes. The group ends up dying. >>>>>>> >>>>>>> Options: >>>>>>> 1. Config change: reduce the number of consumers in the group. >> This >>>>> isn't >>>>>>> always a realistic answer in more strenuous use cases like >>>> MirrorMaker >>>>>>> clusters or for auditing. >>>>>>> 2. Config change: split the group into smaller groups which >>> together >>>>> will >>>>>>> get full coverage of the topics. This gives each group member a >>>> smaller >>>>>>> subscription.(ex: g1 has topics starting with a-m while g2 has >>> topics >>>>>>> starting ith n-z). This would be operationally painful to manage. >>>>>>> 3. Config change: split the topics among members of the group. >>> Again >>>>> this >>>>>>> gives each group member a smaller subscription. This would also >> be >>>>>>> operationally painful to manage. >>>>>>> 4. Config change: bump up KafkaConfig.messageMaxBytes (a >>> topic-level >>>>>>> config) and KafkaConfig.replicaFetchMaxBytes (a broker-level >>> config). >>>>>>> Applying messageMaxBytes to just the __consumer_offsets topic >> seems >>>>>>> relatively harmless, but bumping up the broker-level >>>>> replicaFetchMaxBytes >>>>>>> would probably need more attention. >>>>>>> 5. Config change: try different compression codecs. Based on 2 >>>> minutes >>>>> of >>>>>>> googling, it seems like lz4 and snappy are faster than gzip but >>> have >>>>>> worse >>>>>>> compression, so this probably won't help. >>>>>>> 6. Implementation change: support sending the regex over the wire >>>>> instead >>>>>>> of the fully expanded topic subscriptions. I think people said in >>> the >>>>>> past >>>>>>> that different languages have subtle differences in regex, so >> this >>>>>> doesn't >>>>>>> play nicely with cross-language groups. >>>>>>> 7. Implementation change: maybe we can reverse the mapping? >> Instead >>>> of >>>>>>> mapping from member to subscriptions, we can map a subscription >> to >>> a >>>>> list >>>>>>> of members. >>>>>>> 8. Implementation change: maybe we can try to break apart the >>>>>> subscription >>>>>>> and assignments from the same SyncGroupRequest into multiple >>> records? >>>>>> They >>>>>>> can still go to the same message set and get appended together. >>> This >>>>> way >>>>>>> the limit become the segment size, which shouldn't be a problem. >>> This >>>>> can >>>>>>> be tricky to get right because we're currently keying these >>> messages >>>> on >>>>>> the >>>>>>> group, so I think records from the same rebalance might >>> accidentally >>>>>>> compact one another, but my understanding of compaction isn't >> that >>>>> great. >>>>>>> >>>>>>> Todo: >>>>>>> It would be interesting to rerun the tests with no compression >> just >>>> to >>>>>> see >>>>>>> how much gzip is helping but it's getting late. Maybe tomorrow? >>>>>>> >>>>>>> - Onur >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >> >> -- >> -- Guozhang >>