Would be interesting to see size after with compression on. On Mon, May 23, 2016 at 4:23 PM, Onur Karaman <onurkaraman.apa...@gmail.com> wrote:
> When figuring out these optimizations, it's worth keeping in mind the > improvements when the message is uncompressed vs when it's compressed. > > When uncompressed: > Fixing the Assignment serialization to instead be a topic index into the > corresponding member's subscription list would usually be a good thing. > > I think the proposal is only worse when the topic names are small. The > Type.STRING we use in our protocol for the assignment's TOPIC_KEY_NAME is > limited in length to Short.MAX_VALUE, so our strings are first prepended > with 2 bytes to indicate the string size. > > The new proposal does worse when: > 2 + utf_encoded_string_payload_size < index_type_size > in other words when: > utf_encoded_string_payload_size < index_type_size - 2 > > If the index type ends up being Type.INT32, then the proposal is worse when > the topic is length 1. > If the index type ends up being Type.INT64, then the proposal is worse when > the topic is less than length 6. > > When compressed: > As James Cheng brought up, I'm not sure how things change when compression > comes into the picture. This would be worth investigating. > > On Mon, May 23, 2016 at 4:05 PM, James Cheng <wushuja...@gmail.com> wrote: > > > > > > On May 23, 2016, at 10:59 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > > 2. Maybe there's a better way to lay out the assignment without needing > > to > > > explicitly repeat the topic? For example, the leader could sort the > > topics > > > for each member and just use an integer to represent the index of each > > > topic within the sorted list (note this depends on the subscription > > > including the full topic list). > > > > > > Assignment -> [TopicIndex [Partition]] > > > > > > > Jason, doesn't gzip (or other compression) basically do this? If the > topic > > is a string and the topic is repeated throughout, won't compression > > basically replace all repeated instances of it with an index reference to > > the full string? > > > > -James > > > > > You could even combine these two options so that you have only 3 > integers > > > for each topic assignment: > > > > > > Assignment -> [TopicIndex MinPartition MaxPartition] > > > > > > There may even be better options with a little more thought. All of > this > > is > > > just part of the client-side protocol, so it wouldn't require any > version > > > bumps on the broker. What do you think? > > > > > > Thanks, > > > Jason > > > > > > > > > > > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> The original concern is that regex may not be efficiently supported > > >> across-languages, but if there is a neat workaround I would love to > > learn. > > >> > > >> Guozhang > > >> > > >> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <ism...@juma.me.uk> > wrote: > > >> > > >>> +1 to Jun's suggestion. > > >>> > > >>> Having said that, as a general point, I think we should consider > > >> supporting > > >>> topic patterns in the wire protocol. It requires some thinking for > > >>> cross-language support, but it seems surmountable and it could make > > >> certain > > >>> operations a lot more efficient (the fact that a basic regex > > subscription > > >>> causes the consumer to request metadata for all topics is not great). > > >>> > > >>> Ismael > > >>> > > >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang <wangg...@gmail.com> > > >>> wrote: > > >>> > > >>>> I like Jun's suggestion in changing the handling logics of single > > large > > >>>> message on the consumer side. > > >>>> > > >>>> As for the case of "a single group subscribing to 3000 topics", with > > >> 100 > > >>>> consumers the 2.5Mb Gzip size is reasonable to me (when storing in > ZK, > > >> we > > >>>> also have the znode limit which is set to 1Mb by default, though > > >>> admittedly > > >>>> it is only for one consumer). And if we do the change as Jun > > suggested, > > >>>> 2.5Mb on follower's memory pressure is OK I think. > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> On Sat, May 21, 2016 at 12:51 PM, Onur Karaman < > > >>>> onurkaraman.apa...@gmail.com > > >>>>> wrote: > > >>>> > > >>>>> Results without compression: > > >>>>> 1 consumer 292383 bytes > > >>>>> 5 consumers 1079579 bytes * the tipping point > > >>>>> 10 consumers 1855018 bytes > > >>>>> 20 consumers 2780220 bytes > > >>>>> 30 consumers 3705422 bytes > > >>>>> 40 consumers 4630624 bytes > > >>>>> 50 consumers 5555826 bytes > > >>>>> 60 consumers 6480788 bytes > > >>>>> 70 consumers 7405750 bytes > > >>>>> 80 consumers 8330712 bytes > > >>>>> 90 consumers 9255674 bytes > > >>>>> 100 consumers 10180636 bytes > > >>>>> > > >>>>> So it looks like gzip compression shrinks the message size by 4x. > > >>>>> > > >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun Rao <j...@confluent.io> wrote: > > >>>>> > > >>>>>> Onur, > > >>>>>> > > >>>>>> Thanks for the investigation. > > >>>>>> > > >>>>>> Another option is to just fix how we deal with the case when a > > >>> message > > >>>> is > > >>>>>> larger than the fetch size. Today, if the fetch size is smaller > > >> than > > >>>> the > > >>>>>> fetch size, the consumer will get stuck. Instead, we can simply > > >>> return > > >>>>> the > > >>>>>> full message if it's larger than the fetch size w/o requiring the > > >>>>> consumer > > >>>>>> to manually adjust the fetch size. On the broker side, to serve a > > >>> fetch > > >>>>>> request, we already do an index lookup and then scan the log a bit > > >> to > > >>>>> find > > >>>>>> the message with the requested offset. We can just check the size > > >> of > > >>>> that > > >>>>>> message and return the full message if its size is larger than the > > >>>> fetch > > >>>>>> size. This way, fetch size is really for performance optimization, > > >>> i.e. > > >>>>> in > > >>>>>> the common case, we will not return more bytes than fetch size, > but > > >>> if > > >>>>>> there is a large message, we will return more bytes than the > > >>> specified > > >>>>>> fetch size. In practice, large messages are rare. So, it shouldn't > > >>>>> increase > > >>>>>> the memory consumption on the client too much. > > >>>>>> > > >>>>>> Jun > > >>>>>> > > >>>>>> On Sat, May 21, 2016 at 3:34 AM, Onur Karaman < > > >>>>>> onurkaraman.apa...@gmail.com> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hey everyone. So I started doing some tests on the new > > >>>>>> consumer/coordinator > > >>>>>>> to see if it could handle more strenuous use cases like mirroring > > >>>>>> clusters > > >>>>>>> with thousands of topics and thought I'd share whatever I have so > > >>>> far. > > >>>>>>> > > >>>>>>> The scalability limit: the amount of group metadata we can fit > > >> into > > >>>> one > > >>>>>>> message > > >>>>>>> > > >>>>>>> Some background: > > >>>>>>> Client-side assignment is implemented in two phases > > >>>>>>> 1. a PreparingRebalance phase that identifies members of the > > >> group > > >>>> and > > >>>>>>> aggregates member subscriptions. > > >>>>>>> 2. an AwaitingSync phase that waits for the group leader to > > >> decide > > >>>>> member > > >>>>>>> assignments based on the member subscriptions across the group. > > >>>>>>> - The leader announces this decision with a SyncGroupRequest. > > >> The > > >>>>>>> GroupCoordinator handles SyncGroupRequests by appending all group > > >>>> state > > >>>>>>> into a single message under the __consumer_offsets topic. This > > >>>> message > > >>>>> is > > >>>>>>> keyed on the group id and contains each member subscription as > > >> well > > >>>> as > > >>>>>> the > > >>>>>>> decided assignment for each member. > > >>>>>>> > > >>>>>>> The environment: > > >>>>>>> - one broker > > >>>>>>> - one __consumer_offsets partition > > >>>>>>> - offsets.topic.compression.codec=1 // this is gzip > > >>>>>>> - broker has my pending KAFKA-3718 patch that actually makes use > > >> of > > >>>>>>> offsets.topic.compression.codec: > > >>>>>> https://github.com/apache/kafka/pull/1394 > > >>>>>>> - around 3000 topics. This is an actual subset of topics from one > > >>> of > > >>>>> our > > >>>>>>> clusters. > > >>>>>>> - topics have 8 partitions > > >>>>>>> - topics are 25 characters long on average > > >>>>>>> - one group with a varying number of consumers each hardcoded > > >> with > > >>>> all > > >>>>>> the > > >>>>>>> topics just to make the tests more consistent. wildcarding with > > >> .* > > >>>>> should > > >>>>>>> have the same effect once the subscription hits the coordinator > > >> as > > >>>> the > > >>>>>>> subscription has already been fully expanded out to the list of > > >>>> topics > > >>>>> by > > >>>>>>> the consumers. > > >>>>>>> - I added some log messages to Log.scala to print out the message > > >>>> sizes > > >>>>>>> after compression > > >>>>>>> - there are no producers at all and auto commits are disabled. > > >> The > > >>>> only > > >>>>>>> topic with messages getting added is the __consumer_offsets topic > > >>> and > > >>>>>>> they're only from storing group metadata while processing > > >>>>>>> SyncGroupRequests. > > >>>>>>> > > >>>>>>> Results: > > >>>>>>> The results below show that we exceed the 1000012 byte > > >>>>>>> KafkaConfig.messageMaxBytes limit relatively quickly (between > > >> 30-40 > > >>>>>>> consumers): > > >>>>>>> 1 consumer 54739 bytes > > >>>>>>> 5 consumers 261524 bytes > > >>>>>>> 10 consumers 459804 bytes > > >>>>>>> 20 consumers 702499 bytes > > >>>>>>> 30 consumers 930525 bytes > > >>>>>>> 40 consumers 1115657 bytes * the tipping point > > >>>>>>> 50 consumers 1363112 bytes > > >>>>>>> 60 consumers 1598621 bytes > > >>>>>>> 70 consumers 1837359 bytes > > >>>>>>> 80 consumers 2066934 bytes > > >>>>>>> 90 consumers 2310970 bytes > > >>>>>>> 100 consumers 2542735 bytes > > >>>>>>> > > >>>>>>> Note that the growth itself is pretty gradual. Plotting the > > >> points > > >>>>> makes > > >>>>>> it > > >>>>>>> look roughly linear w.r.t the number of consumers: > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735) > > >>>>>>> > > >>>>>>> Also note that these numbers aren't averages or medians or > > >> anything > > >>>>> like > > >>>>>>> that. It's just the byte size from a given run. I did run them a > > >>> few > > >>>>>> times > > >>>>>>> and saw similar results. > > >>>>>>> > > >>>>>>> Impact: > > >>>>>>> Even after adding gzip to the __consumer_offsets topic with my > > >>>> pending > > >>>>>>> KAFKA-3718 patch, the AwaitingSync phase of the group fails with > > >>>>>>> RecordTooLargeException. This means the combined size of each > > >>>> member's > > >>>>>>> subscriptions and assignments exceeded the > > >>>> KafkaConfig.messageMaxBytes > > >>>>> of > > >>>>>>> 1000012 bytes. The group ends up dying. > > >>>>>>> > > >>>>>>> Options: > > >>>>>>> 1. Config change: reduce the number of consumers in the group. > > >> This > > >>>>> isn't > > >>>>>>> always a realistic answer in more strenuous use cases like > > >>>> MirrorMaker > > >>>>>>> clusters or for auditing. > > >>>>>>> 2. Config change: split the group into smaller groups which > > >>> together > > >>>>> will > > >>>>>>> get full coverage of the topics. This gives each group member a > > >>>> smaller > > >>>>>>> subscription.(ex: g1 has topics starting with a-m while g2 has > > >>> topics > > >>>>>>> starting ith n-z). This would be operationally painful to manage. > > >>>>>>> 3. Config change: split the topics among members of the group. > > >>> Again > > >>>>> this > > >>>>>>> gives each group member a smaller subscription. This would also > > >> be > > >>>>>>> operationally painful to manage. > > >>>>>>> 4. Config change: bump up KafkaConfig.messageMaxBytes (a > > >>> topic-level > > >>>>>>> config) and KafkaConfig.replicaFetchMaxBytes (a broker-level > > >>> config). > > >>>>>>> Applying messageMaxBytes to just the __consumer_offsets topic > > >> seems > > >>>>>>> relatively harmless, but bumping up the broker-level > > >>>>> replicaFetchMaxBytes > > >>>>>>> would probably need more attention. > > >>>>>>> 5. Config change: try different compression codecs. Based on 2 > > >>>> minutes > > >>>>> of > > >>>>>>> googling, it seems like lz4 and snappy are faster than gzip but > > >>> have > > >>>>>> worse > > >>>>>>> compression, so this probably won't help. > > >>>>>>> 6. Implementation change: support sending the regex over the wire > > >>>>> instead > > >>>>>>> of the fully expanded topic subscriptions. I think people said in > > >>> the > > >>>>>> past > > >>>>>>> that different languages have subtle differences in regex, so > > >> this > > >>>>>> doesn't > > >>>>>>> play nicely with cross-language groups. > > >>>>>>> 7. Implementation change: maybe we can reverse the mapping? > > >> Instead > > >>>> of > > >>>>>>> mapping from member to subscriptions, we can map a subscription > > >> to > > >>> a > > >>>>> list > > >>>>>>> of members. > > >>>>>>> 8. Implementation change: maybe we can try to break apart the > > >>>>>> subscription > > >>>>>>> and assignments from the same SyncGroupRequest into multiple > > >>> records? > > >>>>>> They > > >>>>>>> can still go to the same message set and get appended together. > > >>> This > > >>>>> way > > >>>>>>> the limit become the segment size, which shouldn't be a problem. > > >>> This > > >>>>> can > > >>>>>>> be tricky to get right because we're currently keying these > > >>> messages > > >>>> on > > >>>>>> the > > >>>>>>> group, so I think records from the same rebalance might > > >>> accidentally > > >>>>>>> compact one another, but my understanding of compaction isn't > > >> that > > >>>>> great. > > >>>>>>> > > >>>>>>> Todo: > > >>>>>>> It would be interesting to rerun the tests with no compression > > >> just > > >>>> to > > >>>>>> see > > >>>>>>> how much gzip is helping but it's getting late. Maybe tomorrow? > > >>>>>>> > > >>>>>>> - Onur > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >>>> > > >>>> -- > > >>>> -- Guozhang > > >>>> > > >>> > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > -- Liquan Pei Software Engineer, Confluent Inc