To get a better sense of the limit and what we should be optimizing for, it helps to look at the message format: private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING), new Field("client_id", STRING), new Field("client_host", STRING), new Field("session_timeout", INT32), new Field("subscription", BYTES), new Field("assignment", BYTES)) private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING), new Field("generation", INT32), new Field("protocol", STRING), new Field("leader", STRING), new Field("members", new ArrayOf(MEMBER_METADATA_V0)))
Subscription => Version Topics UserData Version => Int16 Topics => [String] UserData => Bytes Assignment => Version TopicPartitions Version => int16 TopicPartitions => [Topic Partitions] Topic => String Partitions => [int32] The UserData isn't being used for range and roundrobin. message size ~ sum(c_i*s_i + c_i*a_i for c_i in C) C = the consumer group c_i = the ith consumer in C s_i = the subscription size for c_i a_i = the assignment size for c_i With today's implementation, trying out different assignment strategies wouldn't make any difference since they all share the same subscription and assignment serialization defined in ConsumerProtocol. As proof, I hit the same limit with both range and roundrobin. My results shown in the original post were using roundrobin. There are basically two pieces we can optimize: 1. the subscriptions 2. the assignments Subscriptions can be optimized in a couple ways: 1. allowing regex. this only helps consumers with pattern-based subscriptions. 2. Option 7 that I had mentioned in the original post: broker-side deduplication of subscriptions by mapping subscriptions to a list of members. Assignments also can be optimized with some tricks like the ones Jason mentioned, but I think these end up being specific to the assignment strategy, making it hard to keep a generic ConsumerProtocol. I think it's worth thinking through our options some more. On Mon, May 23, 2016 at 11:10 AM, Ismael Juma <ism...@juma.me.uk> wrote: > Hi Jason, > > It would definitely be interesting to try a few of these optimisations on a > real world example to quantify the impact. > > Ismael > > On Mon, May 23, 2016 at 6:59 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hey Onur, > > > > Thanks for the investigation. I agree with Ismael that pushing regex or > > some kind of patterns into the protocol would help for communicating > > subscriptions and for avoiding unnecessary overhead when fetching topic > > metadata, but it doesn't seem like it would address the main issue here > > since the leader would still have to split the regex into the individual > > topics in order to do the partition assignment. It feels like we may > need a > > different approach for representing assignments at this scale. > > > > One minor question I had is which assignment strategy you used in these > > tests? Do you see any difference in overhead overall if you change > between > > "range" and "round-robin"? One thought that occurred to me is that you > > could reduce the redundancy in the leader sync group by trying to limit > the > > number of consumers that received partitions from each topic. Ideally, > each > > topic would be given to exactly one member so that its name was repeated > > only once in the leader's sync group. I guess the compression before > > writing the group metadata would end up removing this redundancy anyway, > > but still might be worth investigating. > > > > It actually seems to me that there's quite a bit of room for cleverness > > here without changing the protocol. Here are a couple ideas: > > > > 1. Currently we list all of the assigned partitions explicitly in the > > assignment: > > > > Assignment -> [Topic [Partition]] > > > > Alternatively, you could let the assignment contain just a minimum and > > maximum partition: > > > > Assignment -> [Topic MinPartition MaxPartition] > > > > Obviously this would only fit range-based assignment approaches, but if > you > > have a lot of partitions per topic, this could be a big win. > > > > 2. Maybe there's a better way to lay out the assignment without needing > to > > explicitly repeat the topic? For example, the leader could sort the > topics > > for each member and just use an integer to represent the index of each > > topic within the sorted list (note this depends on the subscription > > including the full topic list). > > > > Assignment -> [TopicIndex [Partition]] > > > > You could even combine these two options so that you have only 3 integers > > for each topic assignment: > > > > Assignment -> [TopicIndex MinPartition MaxPartition] > > > > There may even be better options with a little more thought. All of this > is > > just part of the client-side protocol, so it wouldn't require any version > > bumps on the broker. What do you think? > > > > Thanks, > > Jason > > > > > > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > The original concern is that regex may not be efficiently supported > > > across-languages, but if there is a neat workaround I would love to > > learn. > > > > > > Guozhang > > > > > > On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <ism...@juma.me.uk> > wrote: > > > > > > > +1 to Jun's suggestion. > > > > > > > > Having said that, as a general point, I think we should consider > > > supporting > > > > topic patterns in the wire protocol. It requires some thinking for > > > > cross-language support, but it seems surmountable and it could make > > > certain > > > > operations a lot more efficient (the fact that a basic regex > > subscription > > > > causes the consumer to request metadata for all topics is not great). > > > > > > > > Ismael > > > > > > > > On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > I like Jun's suggestion in changing the handling logics of single > > large > > > > > message on the consumer side. > > > > > > > > > > As for the case of "a single group subscribing to 3000 topics", > with > > > 100 > > > > > consumers the 2.5Mb Gzip size is reasonable to me (when storing in > > ZK, > > > we > > > > > also have the znode limit which is set to 1Mb by default, though > > > > admittedly > > > > > it is only for one consumer). And if we do the change as Jun > > suggested, > > > > > 2.5Mb on follower's memory pressure is OK I think. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Sat, May 21, 2016 at 12:51 PM, Onur Karaman < > > > > > onurkaraman.apa...@gmail.com > > > > > > wrote: > > > > > > > > > > > Results without compression: > > > > > > 1 consumer 292383 bytes > > > > > > 5 consumers 1079579 bytes * the tipping point > > > > > > 10 consumers 1855018 bytes > > > > > > 20 consumers 2780220 bytes > > > > > > 30 consumers 3705422 bytes > > > > > > 40 consumers 4630624 bytes > > > > > > 50 consumers 5555826 bytes > > > > > > 60 consumers 6480788 bytes > > > > > > 70 consumers 7405750 bytes > > > > > > 80 consumers 8330712 bytes > > > > > > 90 consumers 9255674 bytes > > > > > > 100 consumers 10180636 bytes > > > > > > > > > > > > So it looks like gzip compression shrinks the message size by 4x. > > > > > > > > > > > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao <j...@confluent.io> > wrote: > > > > > > > > > > > > > Onur, > > > > > > > > > > > > > > Thanks for the investigation. > > > > > > > > > > > > > > Another option is to just fix how we deal with the case when a > > > > message > > > > > is > > > > > > > larger than the fetch size. Today, if the fetch size is smaller > > > than > > > > > the > > > > > > > fetch size, the consumer will get stuck. Instead, we can simply > > > > return > > > > > > the > > > > > > > full message if it's larger than the fetch size w/o requiring > the > > > > > > consumer > > > > > > > to manually adjust the fetch size. On the broker side, to > serve a > > > > fetch > > > > > > > request, we already do an index lookup and then scan the log a > > bit > > > to > > > > > > find > > > > > > > the message with the requested offset. We can just check the > size > > > of > > > > > that > > > > > > > message and return the full message if its size is larger than > > the > > > > > fetch > > > > > > > size. This way, fetch size is really for performance > > optimization, > > > > i.e. > > > > > > in > > > > > > > the common case, we will not return more bytes than fetch size, > > but > > > > if > > > > > > > there is a large message, we will return more bytes than the > > > > specified > > > > > > > fetch size. In practice, large messages are rare. So, it > > shouldn't > > > > > > increase > > > > > > > the memory consumption on the client too much. > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Sat, May 21, 2016 at 3:34 AM, Onur Karaman < > > > > > > > onurkaraman.apa...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hey everyone. So I started doing some tests on the new > > > > > > > consumer/coordinator > > > > > > > > to see if it could handle more strenuous use cases like > > mirroring > > > > > > > clusters > > > > > > > > with thousands of topics and thought I'd share whatever I > have > > so > > > > > far. > > > > > > > > > > > > > > > > The scalability limit: the amount of group metadata we can > fit > > > into > > > > > one > > > > > > > > message > > > > > > > > > > > > > > > > Some background: > > > > > > > > Client-side assignment is implemented in two phases > > > > > > > > 1. a PreparingRebalance phase that identifies members of the > > > group > > > > > and > > > > > > > > aggregates member subscriptions. > > > > > > > > 2. an AwaitingSync phase that waits for the group leader to > > > decide > > > > > > member > > > > > > > > assignments based on the member subscriptions across the > group. > > > > > > > > - The leader announces this decision with a > SyncGroupRequest. > > > The > > > > > > > > GroupCoordinator handles SyncGroupRequests by appending all > > group > > > > > state > > > > > > > > into a single message under the __consumer_offsets topic. > This > > > > > message > > > > > > is > > > > > > > > keyed on the group id and contains each member subscription > as > > > well > > > > > as > > > > > > > the > > > > > > > > decided assignment for each member. > > > > > > > > > > > > > > > > The environment: > > > > > > > > - one broker > > > > > > > > - one __consumer_offsets partition > > > > > > > > - offsets.topic.compression.codec=1 // this is gzip > > > > > > > > - broker has my pending KAFKA-3718 patch that actually makes > > use > > > of > > > > > > > > offsets.topic.compression.codec: > > > > > > > https://github.com/apache/kafka/pull/1394 > > > > > > > > - around 3000 topics. This is an actual subset of topics from > > one > > > > of > > > > > > our > > > > > > > > clusters. > > > > > > > > - topics have 8 partitions > > > > > > > > - topics are 25 characters long on average > > > > > > > > - one group with a varying number of consumers each hardcoded > > > with > > > > > all > > > > > > > the > > > > > > > > topics just to make the tests more consistent. wildcarding > with > > > .* > > > > > > should > > > > > > > > have the same effect once the subscription hits the > coordinator > > > as > > > > > the > > > > > > > > subscription has already been fully expanded out to the list > of > > > > > topics > > > > > > by > > > > > > > > the consumers. > > > > > > > > - I added some log messages to Log.scala to print out the > > message > > > > > sizes > > > > > > > > after compression > > > > > > > > - there are no producers at all and auto commits are > disabled. > > > The > > > > > only > > > > > > > > topic with messages getting added is the __consumer_offsets > > topic > > > > and > > > > > > > > they're only from storing group metadata while processing > > > > > > > > SyncGroupRequests. > > > > > > > > > > > > > > > > Results: > > > > > > > > The results below show that we exceed the 1000012 byte > > > > > > > > KafkaConfig.messageMaxBytes limit relatively quickly (between > > > 30-40 > > > > > > > > consumers): > > > > > > > > 1 consumer 54739 bytes > > > > > > > > 5 consumers 261524 bytes > > > > > > > > 10 consumers 459804 bytes > > > > > > > > 20 consumers 702499 bytes > > > > > > > > 30 consumers 930525 bytes > > > > > > > > 40 consumers 1115657 bytes * the tipping point > > > > > > > > 50 consumers 1363112 bytes > > > > > > > > 60 consumers 1598621 bytes > > > > > > > > 70 consumers 1837359 bytes > > > > > > > > 80 consumers 2066934 bytes > > > > > > > > 90 consumers 2310970 bytes > > > > > > > > 100 consumers 2542735 bytes > > > > > > > > > > > > > > > > Note that the growth itself is pretty gradual. Plotting the > > > points > > > > > > makes > > > > > > > it > > > > > > > > look roughly linear w.r.t the number of consumers: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735) > > > > > > > > > > > > > > > > Also note that these numbers aren't averages or medians or > > > anything > > > > > > like > > > > > > > > that. It's just the byte size from a given run. I did run > them > > a > > > > few > > > > > > > times > > > > > > > > and saw similar results. > > > > > > > > > > > > > > > > Impact: > > > > > > > > Even after adding gzip to the __consumer_offsets topic with > my > > > > > pending > > > > > > > > KAFKA-3718 patch, the AwaitingSync phase of the group fails > > with > > > > > > > > RecordTooLargeException. This means the combined size of each > > > > > member's > > > > > > > > subscriptions and assignments exceeded the > > > > > KafkaConfig.messageMaxBytes > > > > > > of > > > > > > > > 1000012 bytes. The group ends up dying. > > > > > > > > > > > > > > > > Options: > > > > > > > > 1. Config change: reduce the number of consumers in the > group. > > > This > > > > > > isn't > > > > > > > > always a realistic answer in more strenuous use cases like > > > > > MirrorMaker > > > > > > > > clusters or for auditing. > > > > > > > > 2. Config change: split the group into smaller groups which > > > > together > > > > > > will > > > > > > > > get full coverage of the topics. This gives each group > member a > > > > > smaller > > > > > > > > subscription.(ex: g1 has topics starting with a-m while g2 > has > > > > topics > > > > > > > > starting ith n-z). This would be operationally painful to > > manage. > > > > > > > > 3. Config change: split the topics among members of the > group. > > > > Again > > > > > > this > > > > > > > > gives each group member a smaller subscription. This would > also > > > be > > > > > > > > operationally painful to manage. > > > > > > > > 4. Config change: bump up KafkaConfig.messageMaxBytes (a > > > > topic-level > > > > > > > > config) and KafkaConfig.replicaFetchMaxBytes (a broker-level > > > > config). > > > > > > > > Applying messageMaxBytes to just the __consumer_offsets topic > > > seems > > > > > > > > relatively harmless, but bumping up the broker-level > > > > > > replicaFetchMaxBytes > > > > > > > > would probably need more attention. > > > > > > > > 5. Config change: try different compression codecs. Based on > 2 > > > > > minutes > > > > > > of > > > > > > > > googling, it seems like lz4 and snappy are faster than gzip > but > > > > have > > > > > > > worse > > > > > > > > compression, so this probably won't help. > > > > > > > > 6. Implementation change: support sending the regex over the > > wire > > > > > > instead > > > > > > > > of the fully expanded topic subscriptions. I think people > said > > in > > > > the > > > > > > > past > > > > > > > > that different languages have subtle differences in regex, so > > > this > > > > > > > doesn't > > > > > > > > play nicely with cross-language groups. > > > > > > > > 7. Implementation change: maybe we can reverse the mapping? > > > Instead > > > > > of > > > > > > > > mapping from member to subscriptions, we can map a > subscription > > > to > > > > a > > > > > > list > > > > > > > > of members. > > > > > > > > 8. Implementation change: maybe we can try to break apart the > > > > > > > subscription > > > > > > > > and assignments from the same SyncGroupRequest into multiple > > > > records? > > > > > > > They > > > > > > > > can still go to the same message set and get appended > together. > > > > This > > > > > > way > > > > > > > > the limit become the segment size, which shouldn't be a > > problem. > > > > This > > > > > > can > > > > > > > > be tricky to get right because we're currently keying these > > > > messages > > > > > on > > > > > > > the > > > > > > > > group, so I think records from the same rebalance might > > > > accidentally > > > > > > > > compact one another, but my understanding of compaction isn't > > > that > > > > > > great. > > > > > > > > > > > > > > > > Todo: > > > > > > > > It would be interesting to rerun the tests with no > compression > > > just > > > > > to > > > > > > > see > > > > > > > > how much gzip is helping but it's getting late. Maybe > tomorrow? > > > > > > > > > > > > > > > > - Onur > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >