> > Assignments also can be optimized with some tricks like the ones Jason > mentioned, but I think these end up being specific to the assignment > strategy, making it hard to keep a generic ConsumerProtocol.
Leaving the protocol generic would be ideal since tools (such as consumer-groups.sh) depend on it. That said, we specifically designed it to allow assignors to use their own schemas, so I don't see a major problem taking advantage of that and we could hide the detail from tools easily enough. Changing the message format used by the coordinator for group metadata seems doable as well as long as it only affects the value schema (and it sounds like it would). I'm a little less optimistic about reversing the member -> subscription mapping as a general solution, however, since many assignors will probably have consumer-specific data (e.g. in sticky or rack-based assignment), but I'm definitely in favor of laying out all the options. Thanks, Jason On Mon, May 23, 2016 at 1:37 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Discussed with Jason about several optimization proposals, and summarize > them here: > > ----------- > Today the offset topic message value format is: > > [member subscription assignment] > > where subscription and assignment are just bytes to the brokers, and > consumers know the schema to interpret them; usually subscriptions contains > the topic list > > [Topic-name] > > , and assignment contains the topic-partitions list > > [Topic-name [Partition-Id]]. > > > Now assuming we have 100 consumer subscribing to 3000 topics with each 25 > bytes on average, and assuming each consumer gets two partitions of each of > the subscribed topics (I know the second assumption maybe off, but just > keep it as for illustration purposes), the subscription will take about > 3000 * 25 * 100 bytes in total, and the assignment will take about 3000 * > (25 + 8 * 2) * 100 bytes, before compressed. > > > One proposal from Jason is that we can change the assignment bytes to > [Index [Partition-Id]], where Index is the the index of the topic in the > subscription bytes, that saves about (25 - 8) * 3000 * 100 bytes; > > Another proposal from Onur goes further, that we can change the format of > the message value to: > > [subscription [member assignment]] > > , which, combining with Jason's approach, will further reduce the > subscription size from 3000 * 25 * 100 to 3000 * 25 * 1 in the best case. > > > I think Jason's proposal is a good start as it does not change the protocol > at all (again, to brokers it is just bytes), but only upgrading the format > protocol and the interpretation logic on the consumer-side; and hence a new > (say 0.11 after this optimization) consumer client can even work with an > older broker. And it's saving is already quite beneficial. > > > Guozhang > > > On Mon, May 23, 2016 at 11:10 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > > Hi Jason, > > > > It would definitely be interesting to try a few of these optimisations > on a > > real world example to quantify the impact. > > > > Ismael > > > > On Mon, May 23, 2016 at 6:59 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Onur, > > > > > > Thanks for the investigation. I agree with Ismael that pushing regex or > > > some kind of patterns into the protocol would help for communicating > > > subscriptions and for avoiding unnecessary overhead when fetching topic > > > metadata, but it doesn't seem like it would address the main issue here > > > since the leader would still have to split the regex into the > individual > > > topics in order to do the partition assignment. It feels like we may > > need a > > > different approach for representing assignments at this scale. > > > > > > One minor question I had is which assignment strategy you used in these > > > tests? Do you see any difference in overhead overall if you change > > between > > > "range" and "round-robin"? One thought that occurred to me is that you > > > could reduce the redundancy in the leader sync group by trying to limit > > the > > > number of consumers that received partitions from each topic. Ideally, > > each > > > topic would be given to exactly one member so that its name was > repeated > > > only once in the leader's sync group. I guess the compression before > > > writing the group metadata would end up removing this redundancy > anyway, > > > but still might be worth investigating. > > > > > > It actually seems to me that there's quite a bit of room for cleverness > > > here without changing the protocol. Here are a couple ideas: > > > > > > 1. Currently we list all of the assigned partitions explicitly in the > > > assignment: > > > > > > Assignment -> [Topic [Partition]] > > > > > > Alternatively, you could let the assignment contain just a minimum and > > > maximum partition: > > > > > > Assignment -> [Topic MinPartition MaxPartition] > > > > > > Obviously this would only fit range-based assignment approaches, but if > > you > > > have a lot of partitions per topic, this could be a big win. > > > > > > 2. Maybe there's a better way to lay out the assignment without needing > > to > > > explicitly repeat the topic? For example, the leader could sort the > > topics > > > for each member and just use an integer to represent the index of each > > > topic within the sorted list (note this depends on the subscription > > > including the full topic list). > > > > > > Assignment -> [TopicIndex [Partition]] > > > > > > You could even combine these two options so that you have only 3 > integers > > > for each topic assignment: > > > > > > Assignment -> [TopicIndex MinPartition MaxPartition] > > > > > > There may even be better options with a little more thought. All of > this > > is > > > just part of the client-side protocol, so it wouldn't require any > version > > > bumps on the broker. What do you think? > > > > > > Thanks, > > > Jason > > > > > > > > > > > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > The original concern is that regex may not be efficiently supported > > > > across-languages, but if there is a neat workaround I would love to > > > learn. > > > > > > > > Guozhang > > > > > > > > On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <ism...@juma.me.uk> > > wrote: > > > > > > > > > +1 to Jun's suggestion. > > > > > > > > > > Having said that, as a general point, I think we should consider > > > > supporting > > > > > topic patterns in the wire protocol. It requires some thinking for > > > > > cross-language support, but it seems surmountable and it could make > > > > certain > > > > > operations a lot more efficient (the fact that a basic regex > > > subscription > > > > > causes the consumer to request metadata for all topics is not > great). > > > > > > > > > > Ismael > > > > > > > > > > On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > I like Jun's suggestion in changing the handling logics of single > > > large > > > > > > message on the consumer side. > > > > > > > > > > > > As for the case of "a single group subscribing to 3000 topics", > > with > > > > 100 > > > > > > consumers the 2.5Mb Gzip size is reasonable to me (when storing > in > > > ZK, > > > > we > > > > > > also have the znode limit which is set to 1Mb by default, though > > > > > admittedly > > > > > > it is only for one consumer). And if we do the change as Jun > > > suggested, > > > > > > 2.5Mb on follower's memory pressure is OK I think. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Sat, May 21, 2016 at 12:51 PM, Onur Karaman < > > > > > > onurkaraman.apa...@gmail.com > > > > > > > wrote: > > > > > > > > > > > > > Results without compression: > > > > > > > 1 consumer 292383 bytes > > > > > > > 5 consumers 1079579 bytes * the tipping point > > > > > > > 10 consumers 1855018 bytes > > > > > > > 20 consumers 2780220 bytes > > > > > > > 30 consumers 3705422 bytes > > > > > > > 40 consumers 4630624 bytes > > > > > > > 50 consumers 5555826 bytes > > > > > > > 60 consumers 6480788 bytes > > > > > > > 70 consumers 7405750 bytes > > > > > > > 80 consumers 8330712 bytes > > > > > > > 90 consumers 9255674 bytes > > > > > > > 100 consumers 10180636 bytes > > > > > > > > > > > > > > So it looks like gzip compression shrinks the message size by > 4x. > > > > > > > > > > > > > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao <j...@confluent.io> > > wrote: > > > > > > > > > > > > > > > Onur, > > > > > > > > > > > > > > > > Thanks for the investigation. > > > > > > > > > > > > > > > > Another option is to just fix how we deal with the case when > a > > > > > message > > > > > > is > > > > > > > > larger than the fetch size. Today, if the fetch size is > smaller > > > > than > > > > > > the > > > > > > > > fetch size, the consumer will get stuck. Instead, we can > simply > > > > > return > > > > > > > the > > > > > > > > full message if it's larger than the fetch size w/o requiring > > the > > > > > > > consumer > > > > > > > > to manually adjust the fetch size. On the broker side, to > > serve a > > > > > fetch > > > > > > > > request, we already do an index lookup and then scan the log > a > > > bit > > > > to > > > > > > > find > > > > > > > > the message with the requested offset. We can just check the > > size > > > > of > > > > > > that > > > > > > > > message and return the full message if its size is larger > than > > > the > > > > > > fetch > > > > > > > > size. This way, fetch size is really for performance > > > optimization, > > > > > i.e. > > > > > > > in > > > > > > > > the common case, we will not return more bytes than fetch > size, > > > but > > > > > if > > > > > > > > there is a large message, we will return more bytes than the > > > > > specified > > > > > > > > fetch size. In practice, large messages are rare. So, it > > > shouldn't > > > > > > > increase > > > > > > > > the memory consumption on the client too much. > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Sat, May 21, 2016 at 3:34 AM, Onur Karaman < > > > > > > > > onurkaraman.apa...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey everyone. So I started doing some tests on the new > > > > > > > > consumer/coordinator > > > > > > > > > to see if it could handle more strenuous use cases like > > > mirroring > > > > > > > > clusters > > > > > > > > > with thousands of topics and thought I'd share whatever I > > have > > > so > > > > > > far. > > > > > > > > > > > > > > > > > > The scalability limit: the amount of group metadata we can > > fit > > > > into > > > > > > one > > > > > > > > > message > > > > > > > > > > > > > > > > > > Some background: > > > > > > > > > Client-side assignment is implemented in two phases > > > > > > > > > 1. a PreparingRebalance phase that identifies members of > the > > > > group > > > > > > and > > > > > > > > > aggregates member subscriptions. > > > > > > > > > 2. an AwaitingSync phase that waits for the group leader to > > > > decide > > > > > > > member > > > > > > > > > assignments based on the member subscriptions across the > > group. > > > > > > > > > - The leader announces this decision with a > > SyncGroupRequest. > > > > The > > > > > > > > > GroupCoordinator handles SyncGroupRequests by appending all > > > group > > > > > > state > > > > > > > > > into a single message under the __consumer_offsets topic. > > This > > > > > > message > > > > > > > is > > > > > > > > > keyed on the group id and contains each member subscription > > as > > > > well > > > > > > as > > > > > > > > the > > > > > > > > > decided assignment for each member. > > > > > > > > > > > > > > > > > > The environment: > > > > > > > > > - one broker > > > > > > > > > - one __consumer_offsets partition > > > > > > > > > - offsets.topic.compression.codec=1 // this is gzip > > > > > > > > > - broker has my pending KAFKA-3718 patch that actually > makes > > > use > > > > of > > > > > > > > > offsets.topic.compression.codec: > > > > > > > > https://github.com/apache/kafka/pull/1394 > > > > > > > > > - around 3000 topics. This is an actual subset of topics > from > > > one > > > > > of > > > > > > > our > > > > > > > > > clusters. > > > > > > > > > - topics have 8 partitions > > > > > > > > > - topics are 25 characters long on average > > > > > > > > > - one group with a varying number of consumers each > hardcoded > > > > with > > > > > > all > > > > > > > > the > > > > > > > > > topics just to make the tests more consistent. wildcarding > > with > > > > .* > > > > > > > should > > > > > > > > > have the same effect once the subscription hits the > > coordinator > > > > as > > > > > > the > > > > > > > > > subscription has already been fully expanded out to the > list > > of > > > > > > topics > > > > > > > by > > > > > > > > > the consumers. > > > > > > > > > - I added some log messages to Log.scala to print out the > > > message > > > > > > sizes > > > > > > > > > after compression > > > > > > > > > - there are no producers at all and auto commits are > > disabled. > > > > The > > > > > > only > > > > > > > > > topic with messages getting added is the __consumer_offsets > > > topic > > > > > and > > > > > > > > > they're only from storing group metadata while processing > > > > > > > > > SyncGroupRequests. > > > > > > > > > > > > > > > > > > Results: > > > > > > > > > The results below show that we exceed the 1000012 byte > > > > > > > > > KafkaConfig.messageMaxBytes limit relatively quickly > (between > > > > 30-40 > > > > > > > > > consumers): > > > > > > > > > 1 consumer 54739 bytes > > > > > > > > > 5 consumers 261524 bytes > > > > > > > > > 10 consumers 459804 bytes > > > > > > > > > 20 consumers 702499 bytes > > > > > > > > > 30 consumers 930525 bytes > > > > > > > > > 40 consumers 1115657 bytes * the tipping point > > > > > > > > > 50 consumers 1363112 bytes > > > > > > > > > 60 consumers 1598621 bytes > > > > > > > > > 70 consumers 1837359 bytes > > > > > > > > > 80 consumers 2066934 bytes > > > > > > > > > 90 consumers 2310970 bytes > > > > > > > > > 100 consumers 2542735 bytes > > > > > > > > > > > > > > > > > > Note that the growth itself is pretty gradual. Plotting the > > > > points > > > > > > > makes > > > > > > > > it > > > > > > > > > look roughly linear w.r.t the number of consumers: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735) > > > > > > > > > > > > > > > > > > Also note that these numbers aren't averages or medians or > > > > anything > > > > > > > like > > > > > > > > > that. It's just the byte size from a given run. I did run > > them > > > a > > > > > few > > > > > > > > times > > > > > > > > > and saw similar results. > > > > > > > > > > > > > > > > > > Impact: > > > > > > > > > Even after adding gzip to the __consumer_offsets topic with > > my > > > > > > pending > > > > > > > > > KAFKA-3718 patch, the AwaitingSync phase of the group fails > > > with > > > > > > > > > RecordTooLargeException. This means the combined size of > each > > > > > > member's > > > > > > > > > subscriptions and assignments exceeded the > > > > > > KafkaConfig.messageMaxBytes > > > > > > > of > > > > > > > > > 1000012 bytes. The group ends up dying. > > > > > > > > > > > > > > > > > > Options: > > > > > > > > > 1. Config change: reduce the number of consumers in the > > group. > > > > This > > > > > > > isn't > > > > > > > > > always a realistic answer in more strenuous use cases like > > > > > > MirrorMaker > > > > > > > > > clusters or for auditing. > > > > > > > > > 2. Config change: split the group into smaller groups which > > > > > together > > > > > > > will > > > > > > > > > get full coverage of the topics. This gives each group > > member a > > > > > > smaller > > > > > > > > > subscription.(ex: g1 has topics starting with a-m while g2 > > has > > > > > topics > > > > > > > > > starting ith n-z). This would be operationally painful to > > > manage. > > > > > > > > > 3. Config change: split the topics among members of the > > group. > > > > > Again > > > > > > > this > > > > > > > > > gives each group member a smaller subscription. This would > > also > > > > be > > > > > > > > > operationally painful to manage. > > > > > > > > > 4. Config change: bump up KafkaConfig.messageMaxBytes (a > > > > > topic-level > > > > > > > > > config) and KafkaConfig.replicaFetchMaxBytes (a > broker-level > > > > > config). > > > > > > > > > Applying messageMaxBytes to just the __consumer_offsets > topic > > > > seems > > > > > > > > > relatively harmless, but bumping up the broker-level > > > > > > > replicaFetchMaxBytes > > > > > > > > > would probably need more attention. > > > > > > > > > 5. Config change: try different compression codecs. Based > on > > 2 > > > > > > minutes > > > > > > > of > > > > > > > > > googling, it seems like lz4 and snappy are faster than gzip > > but > > > > > have > > > > > > > > worse > > > > > > > > > compression, so this probably won't help. > > > > > > > > > 6. Implementation change: support sending the regex over > the > > > wire > > > > > > > instead > > > > > > > > > of the fully expanded topic subscriptions. I think people > > said > > > in > > > > > the > > > > > > > > past > > > > > > > > > that different languages have subtle differences in regex, > so > > > > this > > > > > > > > doesn't > > > > > > > > > play nicely with cross-language groups. > > > > > > > > > 7. Implementation change: maybe we can reverse the mapping? > > > > Instead > > > > > > of > > > > > > > > > mapping from member to subscriptions, we can map a > > subscription > > > > to > > > > > a > > > > > > > list > > > > > > > > > of members. > > > > > > > > > 8. Implementation change: maybe we can try to break apart > the > > > > > > > > subscription > > > > > > > > > and assignments from the same SyncGroupRequest into > multiple > > > > > records? > > > > > > > > They > > > > > > > > > can still go to the same message set and get appended > > together. > > > > > This > > > > > > > way > > > > > > > > > the limit become the segment size, which shouldn't be a > > > problem. > > > > > This > > > > > > > can > > > > > > > > > be tricky to get right because we're currently keying these > > > > > messages > > > > > > on > > > > > > > > the > > > > > > > > > group, so I think records from the same rebalance might > > > > > accidentally > > > > > > > > > compact one another, but my understanding of compaction > isn't > > > > that > > > > > > > great. > > > > > > > > > > > > > > > > > > Todo: > > > > > > > > > It would be interesting to rerun the tests with no > > compression > > > > just > > > > > > to > > > > > > > > see > > > > > > > > > how much gzip is helping but it's getting late. Maybe > > tomorrow? > > > > > > > > > > > > > > > > > > - Onur > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > -- Guozhang >