I think the value of adding a "offsets.replica.fetch.max.bytes" config is that we don't break/change the meaning of "replica.fetch.max.bytes".
We can also set "offsets.replica.fetch.max.bytes" to be a value safely larger than what we expect to ever allow the __consumer_offsets topic max message size to be without doing the larger change of bumping up the global "replica.fetch.max.bytes". On Thu, Jun 9, 2016 at 10:40 AM, Becket Qin <becket....@gmail.com> wrote: > I think taking bigger one of the fetch size and message size limit is > probably good enough. If we have a separate > "offset.replica.fetch.max.bytes", I guess the value will always be set to > max message size of the __consumer_offsets topic, which does not seem to > have much value. > > On Thu, Jun 9, 2016 at 3:15 AM, Onur Karaman <okara...@linkedin.com.invalid > > > wrote: > > > Maybe another approach can be to add a new > > "offsets.replica.fetch.max.bytes" config on the brokers. > > > > On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman <okara...@linkedin.com> > > wrote: > > > > > I made a PR with a tweak to Jun's/Becket's proposal: > > > https://github.com/apache/kafka/pull/1484 > > > > > > It just tweaks the fetch behavior specifically for replicas fetching > from > > > the __consumer_offsets topic when the fetcher's > "replica.fetch.max.bytes" > > > is less than the __consumer_offset leader's "message.max.bytes" to take > > the > > > max of the two. > > > > > > I'm honestly not that happy with this solution, as I'd rather not > change > > > the "replica.fetch.max.bytes" config from being a limit to a > > > recommendation. I'd definitely be happy to hear other alternatives! > > > > > > On Sun, May 29, 2016 at 1:57 PM, Onur Karaman < > > > onurkaraman.apa...@gmail.com> wrote: > > > > > >> Sorry I know next to nothing about Kafka Connect. I didn't understand > > the > > >> Kafka Connect / MM idea you brought up. Can you go into more detail? > > >> > > >> Otherwise I think our remaining options are: > > >> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for > > >> __consumer_offsets topic and change the fetch behavior when message > size > > >> is > > >> larger than fetch size > > >> - option 6: support sending the regex over the wire instead of the > fully > > >> expanded topic subscriptions. This should cut down the message size > from > > >> the subscription side. Again this only helps when pattern-based > > >> subscriptions are done. > > >> > > >> minor correction to an earlier comment I made regarding the message > > size: > > >> message size ~ sum(s_i + a_i for i in range [1, |C|]) > > >> > > >> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson <ja...@confluent.io> > > >> wrote: > > >> > > >> > Hey Onur, > > >> > > > >> > Thanks for the investigation. It seems the conclusion is that the > > >> compact > > >> > format helps, but perhaps not enough to justify adding a new > > assignment > > >> > schema? I'm not sure there's much more room for savings unless we > > change > > >> > something more fundamental in the assignment approach. We spent some > > >> time > > >> > thinking before about whether we could let the consumers compute > their > > >> > assignment locally from a smaller set of information, but the > > difficulty > > >> > (I'm sure you remember) is reaching consensus on topic metadata. > Kafka > > >> > Connect has a similar problem where all the workers need to agree on > > >> > connector configurations. Since all configs are stored in a single > > topic > > >> > partition, the approach we take there is to propagate the offset in > > the > > >> > assignment protocol. Not sure if we could do something similar for > > MM... > > >> > Anyway, it seems like the best workaround at the moment is Jun's > > initial > > >> > suggestion. What do you think? > > >> > > > >> > -Jason > > >> > > > >> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman < > > >> > onurkaraman.apa...@gmail.com > > >> > > wrote: > > >> > > > >> > > I gave the topic index assignment trick a try against the same > > >> > environment. > > >> > > The implementation just changed the assignment serialization and > > >> > > deserialization logic. It didn't change SyncGroupResponse, meaning > > it > > >> > > continues to exclude the subscription from the SyncGroupResponse > and > > >> > > assumes the member has kept track of its last subscription. > > >> > > > > >> > > Assignment topic indexing with compression: > > >> > > 1 consumer 34346 bytes > > >> > > 5 consumers 177687 bytes > > >> > > 10 consumers 331897 bytes > > >> > > 20 consumers 572467 bytes > > >> > > 30 consumers 811269 bytes > > >> > > 40 consumers 1047188 bytes * the tipping point > > >> > > 50 consumers 1290092 bytes > > >> > > 60 consumers 1527806 bytes > > >> > > 70 consumers 1769259 bytes > > >> > > 80 consumers 2000118 bytes > > >> > > 90 consumers 2244392 bytes > > >> > > 100 consumers 2482415 bytes > > >> > > > > >> > > Assignment topic indexing without compression: > > >> > > 1 consumer 211904 bytes > > >> > > 5 consumers 677184 bytes > > >> > > 10 consumers 1211154 bytes * the tipping point > > >> > > 20 consumers 2136196 bytes > > >> > > 30 consumers 3061238 bytes > > >> > > 40 consumers 3986280 bytes > > >> > > 50 consumers 4911322 bytes > > >> > > 60 consumers 5836284 bytes > > >> > > 70 consumers 6761246 bytes > > >> > > 80 consumers 7686208 bytes > > >> > > 90 consumers 8611170 bytes > > >> > > 100 consumers 9536132 bytes > > >> > > > > >> > > Assignment topic indexing seems to reduce the size by 500KB > without > > >> > > compression and 80KB with compression. So assignment topic > indexing > > >> makes > > >> > > some difference in both with and without compression but in our > case > > >> was > > >> > > not nearly enough. > > >> > > > > >> > > This can be explained by the fact that we aren't actually hitting > > the > > >> > worst > > >> > > case scenario of each consumer being assigned a partition from > every > > >> > topic. > > >> > > The reason is simple: a topic can only fully span all the > consumers > > >> if it > > >> > > has at least as many partitions as there are consumers. Given that > > >> there > > >> > > are 8 partitions per topic and we have 100 consumers, it makes > sense > > >> that > > >> > > we aren't close to this worse case scenario where topic indexing > > would > > >> > make > > >> > > a bigger difference. > > >> > > > > >> > > I tweaked the group leader's assignment code to print out the > > >> assignments > > >> > > and found that each consumer was getting either 238 or 239 > > partitions. > > >> > Each > > >> > > of these partitions were from unique topics. So the consumers were > > >> really > > >> > > getting partitions from 239 topics instead of the full worst case > > >> > scenario > > >> > > of 3000 topics. > > >> > > > > >> > > On Wed, May 25, 2016 at 1:42 PM, Jason Gustafson < > > ja...@confluent.io> > > >> > > wrote: > > >> > > > > >> > > > Gwen, Joel: > > >> > > > > > >> > > > That's correct. The protocol does allow us to give an assignor > its > > >> own > > >> > > > assignment schema, but I think this will require a couple > internal > > >> > > changes > > >> > > > to the consumer to make use of the full generality. > > >> > > > > > >> > > > One thing I'm a little uncertain about is whether we should use > a > > >> > > different > > >> > > > protocol type. For a little context, the group membership > protocol > > >> > allows > > >> > > > the client to provide a "protocol type" when joining the group > to > > >> > ensure > > >> > > > that all members have some basic semantic compatibility. For > > >> example, > > >> > the > > >> > > > consumer uses "consumer" and Kafka Connect uses "connect." > > Currently > > >> > all > > >> > > > assignors using the "consumer" protocol share a common schema > for > > >> > > > representing subscriptions and assignment. This is convenient > for > > >> tools > > >> > > > (like consumer-groups.sh) since they just need to know how to > > parse > > >> the > > >> > > > "consumer" protocol type without knowing anything about the > > >> assignors. > > >> > So > > >> > > > introducing another schema would break that assumption and we'd > > need > > >> > > those > > >> > > > tools to do assignor-specific parsing. Maybe this is OK? > > >> Alternatively, > > >> > > we > > >> > > > could use a separate protocol type (e.g. "compact-consumer"), > but > > >> that > > >> > > > seems less than desirable. > > >> > > > > > >> > > > Thanks, > > >> > > > Jason > > >> > > > > > >> > > > On Wed, May 25, 2016 at 11:00 AM, Gwen Shapira < > g...@confluent.io > > > > > >> > > wrote: > > >> > > > > > >> > > > > ah, right - we can add as many strategies as we want. > > >> > > > > > > >> > > > > On Wed, May 25, 2016 at 10:54 AM, Joel Koshy < > > jjkosh...@gmail.com > > >> > > > >> > > > wrote: > > >> > > > > > > >> > > > > > > Yes it would be a protocol bump. > > >> > > > > > > > > >> > > > > > > > >> > > > > > Sorry - I'm officially confused. I think it may not be > > required > > >> - > > >> > > since > > >> > > > > the > > >> > > > > > more compact format would be associated with a new > assignment > > >> > > strategy > > >> > > > - > > >> > > > > > right? > > >> > > > > > > > >> > > > > > > > >> > > > > > > smaller than the plaintext PAL, but the post-compressed > > binary > > >> > PAL > > >> > > is > > >> > > > > > just > > >> > > > > > > 25% smaller than the post-compressed plaintext PAL. IOW > > using > > >> a > > >> > > > symbol > > >> > > > > > > table helps a lot but further compression on that already > > >> compact > > >> > > > > format > > >> > > > > > > would yield only marginal return. > > >> > > > > > > > > >> > > > > > > > >> > > > > > > So basically I feel we could get pretty far with a more > > >> compact > > >> > > field > > >> > > > > > > format for assignment and if we do that then we would > > >> potentially > > >> > > not > > >> > > > > > even > > >> > > > > > > want to do any compression. > > >> > > > > > > > > >> > > > > > > > >> > > > > > Also just wanted to add that this compression on the binary > > PAL > > >> did > > >> > > > help > > >> > > > > > but the compression ratio was obviously not as high as > > plaintext > > >> > > > > > compression. > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > > On Tue, May 24, 2016 at 4:19 PM, Gwen Shapira < > > >> g...@confluent.io > > >> > > > > >> > > > > wrote: > > >> > > > > > > > > >> > > > > > >> Regarding the change to the assignment field. It would > be a > > >> > > protocol > > >> > > > > > bump, > > >> > > > > > >> otherwise consumers will not know how to parse the bytes > > the > > >> > > broker > > >> > > > is > > >> > > > > > >> returning, right? > > >> > > > > > >> Or did I misunderstand the suggestion? > > >> > > > > > >> > > >> > > > > > >> On Tue, May 24, 2016 at 2:52 PM, Guozhang Wang < > > >> > > wangg...@gmail.com> > > >> > > > > > >> wrote: > > >> > > > > > >> > > >> > > > > > >> > I think for just solving issue 1), Jun's suggestion is > > >> > > sufficient > > >> > > > > and > > >> > > > > > >> > simple. So I'd prefer that approach. > > >> > > > > > >> > > > >> > > > > > >> > In addition, Jason's optimization on the assignment > field > > >> > would > > >> > > be > > >> > > > > > good > > >> > > > > > >> for > > >> > > > > > >> > 2) and 3) as well, and I like that optimization for its > > >> > > simplicity > > >> > > > > and > > >> > > > > > >> no > > >> > > > > > >> > format change as well. And in the future I'm in favor > of > > >> > > > considering > > >> > > > > > to > > >> > > > > > >> > change the in-memory cache format as Jiangjie > suggested. > > >> > > > > > >> > > > >> > > > > > >> > Guozhang > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > On Tue, May 24, 2016 at 12:42 PM, Becket Qin < > > >> > > > becket....@gmail.com> > > >> > > > > > >> wrote: > > >> > > > > > >> > > > >> > > > > > >> > > Hi Jason, > > >> > > > > > >> > > > > >> > > > > > >> > > There are a few problems we want to solve here: > > >> > > > > > >> > > 1. The group metadata is too big to be appended to > the > > >> log. > > >> > > > > > >> > > 2. Reduce the memory footprint on the broker > > >> > > > > > >> > > 3. Reduce the bytes transferred over the wire. > > >> > > > > > >> > > > > >> > > > > > >> > > To solve (1), I like your idea of having separate > > >> messages > > >> > per > > >> > > > > > member. > > >> > > > > > >> > The > > >> > > > > > >> > > proposal (Onur's option 8) is to break metadata into > > >> small > > >> > > > records > > >> > > > > > in > > >> > > > > > >> the > > >> > > > > > >> > > same uncompressed message set so each record is > small. > > I > > >> > agree > > >> > > > it > > >> > > > > > >> would > > >> > > > > > >> > be > > >> > > > > > >> > > ideal if we are able to store the metadata separately > > for > > >> > each > > >> > > > > > >> member. I > > >> > > > > > >> > > was also thinking about storing the metadata into > > >> multiple > > >> > > > > messages, > > >> > > > > > >> too. > > >> > > > > > >> > > What concerns me was that having multiple messages > > seems > > >> > > > breaking > > >> > > > > > the > > >> > > > > > >> > > atomicity. I am not sure how we are going to deal > with > > >> the > > >> > > > > potential > > >> > > > > > >> > > issues. For example, What if group metadata is > > replicated > > >> > but > > >> > > > the > > >> > > > > > >> member > > >> > > > > > >> > > metadata is not? It might be fine depending on the > > >> > > > implementation > > >> > > > > > >> though, > > >> > > > > > >> > > but I am not sure. > > >> > > > > > >> > > > > >> > > > > > >> > > For (2) we want to store the metadata onto the disk, > > >> which > > >> > is > > >> > > > what > > >> > > > > > we > > >> > > > > > >> > have > > >> > > > > > >> > > to do anyway. The only question is in what format > > should > > >> we > > >> > > > store > > >> > > > > > >> them. > > >> > > > > > >> > > > > >> > > > > > >> > > To address (3) we want to have the metadata to be > > >> > compressed, > > >> > > > > which > > >> > > > > > is > > >> > > > > > >> > > contradict to the the above solution of (1). > > >> > > > > > >> > > > > >> > > > > > >> > > I think Jun's suggestion is probably still the > > simplest. > > >> To > > >> > > > avoid > > >> > > > > > >> > changing > > >> > > > > > >> > > the behavior for consumers, maybe we can do that only > > for > > >> > > > > > >> offset_topic, > > >> > > > > > >> > > i.e, if the max fetch bytes of the fetch request is > > >> smaller > > >> > > than > > >> > > > > the > > >> > > > > > >> > > message size on the offset topic, we always return at > > >> least > > >> > > one > > >> > > > > full > > >> > > > > > >> > > message. This should avoid the unexpected problem on > > the > > >> > > client > > >> > > > > side > > >> > > > > > >> > > because supposedly only tools and brokers will fetch > > from > > >> > the > > >> > > > the > > >> > > > > > >> > internal > > >> > > > > > >> > > topics, > > >> > > > > > >> > > > > >> > > > > > >> > > As a modification to what you suggested, one > solution I > > >> was > > >> > > > > thinking > > >> > > > > > >> was > > >> > > > > > >> > to > > >> > > > > > >> > > have multiple messages in a single compressed > message. > > >> That > > >> > > > means > > >> > > > > > for > > >> > > > > > >> > > SyncGroupResponse we still need to read the entire > > >> > compressed > > >> > > > > > messages > > >> > > > > > >> > and > > >> > > > > > >> > > extract the inner messages, which seems not quite > > >> different > > >> > > from > > >> > > > > > >> having a > > >> > > > > > >> > > single message containing everything. But let me just > > >> put it > > >> > > > here > > >> > > > > > and > > >> > > > > > >> see > > >> > > > > > >> > > if that makes sense. > > >> > > > > > >> > > > > >> > > > > > >> > > We can have a map of GroupMetadataKey -> > > >> > > > GroupMetadataValueOffset. > > >> > > > > > >> > > > > >> > > > > > >> > > The GroupMetadataValue is stored in a compressed > > message. > > >> > The > > >> > > > > inner > > >> > > > > > >> > > messages are the following: > > >> > > > > > >> > > > > >> > > > > > >> > > Inner Message 0: Version GroupId Generation > > >> > > > > > >> > > > > >> > > > > > >> > > Inner Message 1: MemberId MemberMetadata_1 (we can > > >> compress > > >> > > the > > >> > > > > > bytes > > >> > > > > > >> > here) > > >> > > > > > >> > > > > >> > > > > > >> > > Inner Message 2: MemberId MemberMetadata_2 > > >> > > > > > >> > > .... > > >> > > > > > >> > > Inner Message N: MemberId MemberMetadata_N > > >> > > > > > >> > > > > >> > > > > > >> > > The MemberMetadata format is the following: > > >> > > > > > >> > > MemberMetadata => Version Generation ClientId Host > > >> > > > Subscription > > >> > > > > > >> > > Assignment > > >> > > > > > >> > > > > >> > > > > > >> > > So DescribeGroupResponse will just return the entire > > >> > > compressed > > >> > > > > > >> > > GroupMetadataMessage. SyncGroupResponse will return > the > > >> > > > > > corresponding > > >> > > > > > >> > inner > > >> > > > > > >> > > message. > > >> > > > > > >> > > > > >> > > > > > >> > > Thanks, > > >> > > > > > >> > > > > >> > > > > > >> > > Jiangjie (Becket) Qin > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > On Tue, May 24, 2016 at 9:14 AM, Jason Gustafson < > > >> > > > > > ja...@confluent.io> > > >> > > > > > >> > > wrote: > > >> > > > > > >> > > > > >> > > > > > >> > > > Hey Becket, > > >> > > > > > >> > > > > > >> > > > > > >> > > > I like your idea to store only the offset for the > > group > > >> > > > metadata > > >> > > > > > in > > >> > > > > > >> > > memory. > > >> > > > > > >> > > > I think it would be safe to keep it in memory for a > > >> short > > >> > > time > > >> > > > > > after > > >> > > > > > >> > the > > >> > > > > > >> > > > rebalance completes, but after that, it's only real > > >> > purpose > > >> > > is > > >> > > > > to > > >> > > > > > >> > answer > > >> > > > > > >> > > > DescribeGroup requests, so your proposal makes a > lot > > of > > >> > > sense > > >> > > > to > > >> > > > > > me. > > >> > > > > > >> > > > > > >> > > > > > >> > > > As for the specific problem with the size of the > > group > > >> > > > metadata > > >> > > > > > >> message > > >> > > > > > >> > > for > > >> > > > > > >> > > > the MM case, if we cannot succeed in reducing the > > size > > >> of > > >> > > the > > >> > > > > > >> > > > subscription/assignment (which I think is still > > >> probably > > >> > the > > >> > > > > best > > >> > > > > > >> > > > alternative if it can work), then I think there are > > >> some > > >> > > > options > > >> > > > > > for > > >> > > > > > >> > > > changing the message format (option #8 in Onur's > > >> initial > > >> > > > > e-mail). > > >> > > > > > >> > > > Currently, the key used for storing the group > > metadata > > >> is > > >> > > > this: > > >> > > > > > >> > > > > > >> > > > > > >> > > > GroupMetadataKey => Version GroupId > > >> > > > > > >> > > > > > >> > > > > > >> > > > And the value is something like this (some details > > >> > elided): > > >> > > > > > >> > > > > > >> > > > > > >> > > > GroupMetadataValue => Version GroupId Generation > > >> > > > > [MemberMetadata] > > >> > > > > > >> > > > MemberMetadata => ClientId Host Subscription > > >> Assignment > > >> > > > > > >> > > > > > >> > > > > > >> > > > I don't think we can change the key without a lot > of > > >> pain, > > >> > > but > > >> > > > > it > > >> > > > > > >> seems > > >> > > > > > >> > > > like we can change the value format. Maybe we can > > take > > >> the > > >> > > > > > >> > > > subscription/assignment payloads out of the value > and > > >> > > > introduce > > >> > > > > a > > >> > > > > > >> new > > >> > > > > > >> > > > "MemberMetadata" message for each member in the > > group. > > >> For > > >> > > > > > example: > > >> > > > > > >> > > > > > >> > > > > > >> > > > MemberMetadataKey => Version GroupId MemberId > > >> > > > > > >> > > > > > >> > > > > > >> > > > MemberMetadataValue => Version Generation ClientId > > Host > > >> > > > > > Subscription > > >> > > > > > >> > > > Assignment > > >> > > > > > >> > > > > > >> > > > > > >> > > > When a new generation is created, we would first > > write > > >> the > > >> > > > group > > >> > > > > > >> > metadata > > >> > > > > > >> > > > message which includes the generation and all of > the > > >> > > > memberIds, > > >> > > > > > and > > >> > > > > > >> > then > > >> > > > > > >> > > > we'd write the member metadata messages. To answer > > the > > >> > > > > > DescribeGroup > > >> > > > > > >> > > > request, we'd read the group metadata at the cached > > >> offset > > >> > > > and, > > >> > > > > > >> > depending > > >> > > > > > >> > > > on the version, all of the following member > metadata. > > >> This > > >> > > > would > > >> > > > > > be > > >> > > > > > >> > more > > >> > > > > > >> > > > complex to maintain, but it seems doable if it > comes > > to > > >> > it. > > >> > > > > > >> > > > > > >> > > > > > >> > > > Thanks, > > >> > > > > > >> > > > Jason > > >> > > > > > >> > > > > > >> > > > > > >> > > > On Mon, May 23, 2016 at 6:15 PM, Becket Qin < > > >> > > > > becket....@gmail.com > > >> > > > > > > > > >> > > > > > >> > > wrote: > > >> > > > > > >> > > > > > >> > > > > > >> > > > > It might worth thinking a little further. We have > > >> > > discussed > > >> > > > > this > > >> > > > > > >> > before > > >> > > > > > >> > > > > that we want to avoid holding all the group > > metadata > > >> in > > >> > > > > memory. > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > I am thinking about the following end state: > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > 1. Enable compression on the offset topic. > > >> > > > > > >> > > > > 2. Instead of holding the entire group metadata > in > > >> > memory > > >> > > on > > >> > > > > the > > >> > > > > > >> > > brokers, > > >> > > > > > >> > > > > each broker only keeps a [group -> Offset] map, > the > > >> > offset > > >> > > > > > points > > >> > > > > > >> to > > >> > > > > > >> > > the > > >> > > > > > >> > > > > message in the offset topic which holds the > latest > > >> > > metadata > > >> > > > of > > >> > > > > > the > > >> > > > > > >> > > group. > > >> > > > > > >> > > > > 3. DescribeGroupResponse will read from the > offset > > >> topic > > >> > > > > > directly > > >> > > > > > >> > like > > >> > > > > > >> > > a > > >> > > > > > >> > > > > normal consumption, except that only exactly one > > >> message > > >> > > > will > > >> > > > > be > > >> > > > > > >> > > > returned. > > >> > > > > > >> > > > > 4. SyncGroupResponse will read the message, > extract > > >> the > > >> > > > > > assignment > > >> > > > > > >> > part > > >> > > > > > >> > > > and > > >> > > > > > >> > > > > send back the partition assignment. We can > compress > > >> the > > >> > > > > > partition > > >> > > > > > >> > > > > assignment before sends it out if we want. > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > Jiangjie (Becket) Qin > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > On Mon, May 23, 2016 at 5:08 PM, Jason Gustafson > < > > >> > > > > > >> ja...@confluent.io > > >> > > > > > >> > > > > >> > > > > > >> > > > > wrote: > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > Jason, doesn't gzip (or other compression) > > >> basically > > >> > > do > > >> > > > > > this? > > >> > > > > > >> If > > >> > > > > > >> > > the > > >> > > > > > >> > > > > > topic > > >> > > > > > >> > > > > > > is a string and the topic is repeated > > throughout, > > >> > > won't > > >> > > > > > >> > compression > > >> > > > > > >> > > > > > > basically replace all repeated instances of > it > > >> with > > >> > an > > >> > > > > index > > >> > > > > > >> > > > reference > > >> > > > > > >> > > > > to > > >> > > > > > >> > > > > > > the full string? > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > Hey James, yeah, that's probably true, but keep > > in > > >> > mind > > >> > > > that > > >> > > > > > the > > >> > > > > > >> > > > > > compression happens on the broker side. It > would > > be > > >> > nice > > >> > > > to > > >> > > > > > >> have a > > >> > > > > > >> > > more > > >> > > > > > >> > > > > > compact representation so that get some benefit > > >> over > > >> > the > > >> > > > > wire > > >> > > > > > as > > >> > > > > > >> > > well. > > >> > > > > > >> > > > > This > > >> > > > > > >> > > > > > seems to be less of a concern here, so the > bigger > > >> > gains > > >> > > > are > > >> > > > > > >> > probably > > >> > > > > > >> > > > from > > >> > > > > > >> > > > > > reducing the number of partitions that need to > be > > >> > listed > > >> > > > > > >> > > individually. > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > -Jason > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > On Mon, May 23, 2016 at 4:23 PM, Onur Karaman < > > >> > > > > > >> > > > > > onurkaraman.apa...@gmail.com> > > >> > > > > > >> > > > > > wrote: > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > When figuring out these optimizations, it's > > worth > > >> > > > keeping > > >> > > > > in > > >> > > > > > >> mind > > >> > > > > > >> > > the > > >> > > > > > >> > > > > > > improvements when the message is uncompressed > > vs > > >> > when > > >> > > > it's > > >> > > > > > >> > > > compressed. > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > When uncompressed: > > >> > > > > > >> > > > > > > Fixing the Assignment serialization to > instead > > >> be a > > >> > > > topic > > >> > > > > > >> index > > >> > > > > > >> > > into > > >> > > > > > >> > > > > the > > >> > > > > > >> > > > > > > corresponding member's subscription list > would > > >> > usually > > >> > > > be > > >> > > > > a > > >> > > > > > >> good > > >> > > > > > >> > > > thing. > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > I think the proposal is only worse when the > > topic > > >> > > names > > >> > > > > are > > >> > > > > > >> > small. > > >> > > > > > >> > > > The > > >> > > > > > >> > > > > > > Type.STRING we use in our protocol for the > > >> > > assignment's > > >> > > > > > >> > > > TOPIC_KEY_NAME > > >> > > > > > >> > > > > is > > >> > > > > > >> > > > > > > limited in length to Short.MAX_VALUE, so our > > >> strings > > >> > > are > > >> > > > > > first > > >> > > > > > >> > > > > prepended > > >> > > > > > >> > > > > > > with 2 bytes to indicate the string size. > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > The new proposal does worse when: > > >> > > > > > >> > > > > > > 2 + utf_encoded_string_payload_size < > > >> > index_type_size > > >> > > > > > >> > > > > > > in other words when: > > >> > > > > > >> > > > > > > utf_encoded_string_payload_size < > > >> index_type_size - > > >> > 2 > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > If the index type ends up being Type.INT32, > > then > > >> the > > >> > > > > > proposal > > >> > > > > > >> is > > >> > > > > > >> > > > worse > > >> > > > > > >> > > > > > when > > >> > > > > > >> > > > > > > the topic is length 1. > > >> > > > > > >> > > > > > > If the index type ends up being Type.INT64, > > then > > >> the > > >> > > > > > proposal > > >> > > > > > >> is > > >> > > > > > >> > > > worse > > >> > > > > > >> > > > > > when > > >> > > > > > >> > > > > > > the topic is less than length 6. > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > When compressed: > > >> > > > > > >> > > > > > > As James Cheng brought up, I'm not sure how > > >> things > > >> > > > change > > >> > > > > > when > > >> > > > > > >> > > > > > compression > > >> > > > > > >> > > > > > > comes into the picture. This would be worth > > >> > > > investigating. > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > On Mon, May 23, 2016 at 4:05 PM, James Cheng > < > > >> > > > > > >> > wushuja...@gmail.com > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > wrote: > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > On May 23, 2016, at 10:59 AM, Jason > > >> Gustafson < > > >> > > > > > >> > > > ja...@confluent.io> > > >> > > > > > >> > > > > > > > wrote: > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > 2. Maybe there's a better way to lay out > > the > > >> > > > > assignment > > >> > > > > > >> > without > > >> > > > > > >> > > > > > needing > > >> > > > > > >> > > > > > > > to > > >> > > > > > >> > > > > > > > > explicitly repeat the topic? For example, > > the > > >> > > leader > > >> > > > > > could > > >> > > > > > >> > sort > > >> > > > > > >> > > > the > > >> > > > > > >> > > > > > > > topics > > >> > > > > > >> > > > > > > > > for each member and just use an integer > to > > >> > > represent > > >> > > > > the > > >> > > > > > >> > index > > >> > > > > > >> > > of > > >> > > > > > >> > > > > > each > > >> > > > > > >> > > > > > > > > topic within the sorted list (note this > > >> depends > > >> > on > > >> > > > the > > >> > > > > > >> > > > subscription > > >> > > > > > >> > > > > > > > > including the full topic list). > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > Assignment -> [TopicIndex [Partition]] > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > Jason, doesn't gzip (or other compression) > > >> > basically > > >> > > > do > > >> > > > > > >> this? > > >> > > > > > >> > If > > >> > > > > > >> > > > the > > >> > > > > > >> > > > > > > topic > > >> > > > > > >> > > > > > > > is a string and the topic is repeated > > >> throughout, > > >> > > > won't > > >> > > > > > >> > > compression > > >> > > > > > >> > > > > > > > basically replace all repeated instances of > > it > > >> > with > > >> > > an > > >> > > > > > index > > >> > > > > > >> > > > > reference > > >> > > > > > >> > > > > > to > > >> > > > > > >> > > > > > > > the full string? > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > -James > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > You could even combine these two options > so > > >> that > > >> > > you > > >> > > > > > have > > >> > > > > > >> > only > > >> > > > > > >> > > 3 > > >> > > > > > >> > > > > > > integers > > >> > > > > > >> > > > > > > > > for each topic assignment: > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > Assignment -> [TopicIndex MinPartition > > >> > > MaxPartition] > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > There may even be better options with a > > >> little > > >> > > more > > >> > > > > > >> thought. > > >> > > > > > >> > > All > > >> > > > > > >> > > > of > > >> > > > > > >> > > > > > > this > > >> > > > > > >> > > > > > > > is > > >> > > > > > >> > > > > > > > > just part of the client-side protocol, so > > it > > >> > > > wouldn't > > >> > > > > > >> require > > >> > > > > > >> > > any > > >> > > > > > >> > > > > > > version > > >> > > > > > >> > > > > > > > > bumps on the broker. What do you think? > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > Thanks, > > >> > > > > > >> > > > > > > > > Jason > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang > > >> Wang < > > >> > > > > > >> > > > wangg...@gmail.com > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > wrote: > > >> > > > > > >> > > > > > > > > > > >> > > > > > >> > > > > > > > >> The original concern is that regex may > not > > >> be > > >> > > > > > efficiently > > >> > > > > > >> > > > > supported > > >> > > > > > >> > > > > > > > >> across-languages, but if there is a neat > > >> > > > workaround I > > >> > > > > > >> would > > >> > > > > > >> > > love > > >> > > > > > >> > > > > to > > >> > > > > > >> > > > > > > > learn. > > >> > > > > > >> > > > > > > > >> > > >> > > > > > >> > > > > > > > >> Guozhang > > >> > > > > > >> > > > > > > > >> > > >> > > > > > >> > > > > > > > >> On Mon, May 23, 2016 at 5:31 AM, Ismael > > >> Juma < > > >> > > > > > >> > > ism...@juma.me.uk > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > wrote: > > >> > > > > > >> > > > > > > > >> > > >> > > > > > >> > > > > > > > >>> +1 to Jun's suggestion. > > >> > > > > > >> > > > > > > > >>> > > >> > > > > > >> > > > > > > > >>> Having said that, as a general point, I > > >> think > > >> > we > > >> > > > > > should > > >> > > > > > >> > > > consider > > >> > > > > > >> > > > > > > > >> supporting > > >> > > > > > >> > > > > > > > >>> topic patterns in the wire protocol. It > > >> > requires > > >> > > > > some > > >> > > > > > >> > > thinking > > >> > > > > > >> > > > > for > > >> > > > > > >> > > > > > > > >>> cross-language support, but it seems > > >> > > surmountable > > >> > > > > and > > >> > > > > > it > > >> > > > > > >> > > could > > >> > > > > > >> > > > > make > > >> > > > > > >> > > > > > > > >> certain > > >> > > > > > >> > > > > > > > >>> operations a lot more efficient (the > fact > > >> > that a > > >> > > > > basic > > >> > > > > > >> > regex > > >> > > > > > >> > > > > > > > subscription > > >> > > > > > >> > > > > > > > >>> causes the consumer to request metadata > > for > > >> > all > > >> > > > > topics > > >> > > > > > >> is > > >> > > > > > >> > not > > >> > > > > > >> > > > > > great). > > >> > > > > > >> > > > > > > > >>> > > >> > > > > > >> > > > > > > > >>> Ismael > > >> > > > > > >> > > > > > > > >>> > > >> > > > > > >> > > > > > > > >>> On Sun, May 22, 2016 at 11:49 PM, > > Guozhang > > >> > Wang > > >> > > < > > >> > > > > > >> > > > > > wangg...@gmail.com> > > >> > > > > > >> > > > > > > > >>> wrote: > > >> > > > > > >> > > > > > > > >>> > > >> > > > > > >> > > > > > > > >>>> I like Jun's suggestion in changing > the > > >> > > handling > > >> > > > > > >> logics of > > >> > > > > > >> > > > > single > > >> > > > > > >> > > > > > > > large > > >> > > > > > >> > > > > > > > >>>> message on the consumer side. > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>> As for the case of "a single group > > >> > subscribing > > >> > > to > > >> > > > > > 3000 > > >> > > > > > >> > > > topics", > > >> > > > > > >> > > > > > with > > >> > > > > > >> > > > > > > > >> 100 > > >> > > > > > >> > > > > > > > >>>> consumers the 2.5Mb Gzip size is > > >> reasonable > > >> > to > > >> > > me > > >> > > > > > (when > > >> > > > > > >> > > > storing > > >> > > > > > >> > > > > in > > >> > > > > > >> > > > > > > ZK, > > >> > > > > > >> > > > > > > > >> we > > >> > > > > > >> > > > > > > > >>>> also have the znode limit which is set > > to > > >> 1Mb > > >> > > by > > >> > > > > > >> default, > > >> > > > > > >> > > > though > > >> > > > > > >> > > > > > > > >>> admittedly > > >> > > > > > >> > > > > > > > >>>> it is only for one consumer). And if > we > > do > > >> > the > > >> > > > > change > > >> > > > > > >> as > > >> > > > > > >> > Jun > > >> > > > > > >> > > > > > > > suggested, > > >> > > > > > >> > > > > > > > >>>> 2.5Mb on follower's memory pressure is > > OK > > >> I > > >> > > > think. > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>> Guozhang > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>> On Sat, May 21, 2016 at 12:51 PM, Onur > > >> > Karaman > > >> > > < > > >> > > > > > >> > > > > > > > >>>> onurkaraman.apa...@gmail.com > > >> > > > > > >> > > > > > > > >>>>> wrote: > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>>> Results without compression: > > >> > > > > > >> > > > > > > > >>>>> 1 consumer 292383 bytes > > >> > > > > > >> > > > > > > > >>>>> 5 consumers 1079579 bytes * the > tipping > > >> > point > > >> > > > > > >> > > > > > > > >>>>> 10 consumers 1855018 bytes > > >> > > > > > >> > > > > > > > >>>>> 20 consumers 2780220 bytes > > >> > > > > > >> > > > > > > > >>>>> 30 consumers 3705422 bytes > > >> > > > > > >> > > > > > > > >>>>> 40 consumers 4630624 bytes > > >> > > > > > >> > > > > > > > >>>>> 50 consumers 5555826 bytes > > >> > > > > > >> > > > > > > > >>>>> 60 consumers 6480788 bytes > > >> > > > > > >> > > > > > > > >>>>> 70 consumers 7405750 bytes > > >> > > > > > >> > > > > > > > >>>>> 80 consumers 8330712 bytes > > >> > > > > > >> > > > > > > > >>>>> 90 consumers 9255674 bytes > > >> > > > > > >> > > > > > > > >>>>> 100 consumers 10180636 bytes > > >> > > > > > >> > > > > > > > >>>>> > > >> > > > > > >> > > > > > > > >>>>> So it looks like gzip compression > > shrinks > > >> > the > > >> > > > > > message > > >> > > > > > >> > size > > >> > > > > > >> > > by > > >> > > > > > >> > > > > 4x. > > >> > > > > > >> > > > > > > > >>>>> > > >> > > > > > >> > > > > > > > >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun > > Rao > > >> < > > >> > > > > > >> > j...@confluent.io > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > wrote: > > >> > > > > > >> > > > > > > > >>>>> > > >> > > > > > >> > > > > > > > >>>>>> Onur, > > >> > > > > > >> > > > > > > > >>>>>> > > >> > > > > > >> > > > > > > > >>>>>> Thanks for the investigation. > > >> > > > > > >> > > > > > > > >>>>>> > > >> > > > > > >> > > > > > > > >>>>>> Another option is to just fix how we > > >> deal > > >> > > with > > >> > > > > the > > >> > > > > > >> case > > >> > > > > > >> > > > when a > > >> > > > > > >> > > > > > > > >>> message > > >> > > > > > >> > > > > > > > >>>> is > > >> > > > > > >> > > > > > > > >>>>>> larger than the fetch size. Today, > if > > >> the > > >> > > fetch > > >> > > > > > size > > >> > > > > > >> is > > >> > > > > > >> > > > > smaller > > >> > > > > > >> > > > > > > > >> than > > >> > > > > > >> > > > > > > > >>>> the > > >> > > > > > >> > > > > > > > >>>>>> fetch size, the consumer will get > > stuck. > > >> > > > Instead, > > >> > > > > > we > > >> > > > > > >> can > > >> > > > > > >> > > > > simply > > >> > > > > > >> > > > > > > > >>> return > > >> > > > > > >> > > > > > > > >>>>> the > > >> > > > > > >> > > > > > > > >>>>>> full message if it's larger than the > > >> fetch > > >> > > size > > >> > > > > w/o > > >> > > > > > >> > > > requiring > > >> > > > > > >> > > > > > the > > >> > > > > > >> > > > > > > > >>>>> consumer > > >> > > > > > >> > > > > > > > >>>>>> to manually adjust the fetch size. > On > > >> the > > >> > > > broker > > >> > > > > > >> side, > > >> > > > > > >> > to > > >> > > > > > >> > > > > serve > > >> > > > > > >> > > > > > a > > >> > > > > > >> > > > > > > > >>> fetch > > >> > > > > > >> > > > > > > > >>>>>> request, we already do an index > lookup > > >> and > > >> > > then > > >> > > > > > scan > > >> > > > > > >> the > > >> > > > > > >> > > > log a > > >> > > > > > >> > > > > > bit > > >> > > > > > >> > > > > > > > >> to > > >> > > > > > >> > > > > > > > >>>>> find > > >> > > > > > >> > > > > > > > >>>>>> the message with the requested > offset. > > >> We > > >> > can > > >> > > > > just > > >> > > > > > >> check > > >> > > > > > >> > > the > > >> > > > > > >> > > > > > size > > >> > > > > > >> > > > > > > > >> of > > >> > > > > > >> > > > > > > > >>>> that > > >> > > > > > >> > > > > > > > >>>>>> message and return the full message > if > > >> its > > >> > > size > > >> > > > > is > > >> > > > > > >> > larger > > >> > > > > > >> > > > than > > >> > > > > > >> > > > > > the > > >> > > > > > >> > > > > > > > >>>> fetch > > >> > > > > > >> > > > > > > > >>>>>> size. This way, fetch size is really > > for > > >> > > > > > performance > > >> > > > > > >> > > > > > optimization, > > >> > > > > > >> > > > > > > > >>> i.e. > > >> > > > > > >> > > > > > > > >>>>> in > > >> > > > > > >> > > > > > > > >>>>>> the common case, we will not return > > more > > >> > > bytes > > >> > > > > than > > >> > > > > > >> > fetch > > >> > > > > > >> > > > > size, > > >> > > > > > >> > > > > > > but > > >> > > > > > >> > > > > > > > >>> if > > >> > > > > > >> > > > > > > > >>>>>> there is a large message, we will > > return > > >> > more > > >> > > > > bytes > > >> > > > > > >> than > > >> > > > > > >> > > the > > >> > > > > > >> > > > > > > > >>> specified > > >> > > > > > >> > > > > > > > >>>>>> fetch size. In practice, large > > messages > > >> are > > >> > > > rare. > > >> > > > > > >> So, it > > >> > > > > > >> > > > > > shouldn't > > >> > > > > > >> > > > > > > > >>>>> increase > > >> > > > > > >> > > > > > > > >>>>>> the memory consumption on the client > > too > > >> > > much. > > >> > > > > > >> > > > > > > > >>>>>> > > >> > > > > > >> > > > > > > > >>>>>> Jun > > >> > > > > > >> > > > > > > > >>>>>> > > >> > > > > > >> > > > > > > > >>>>>> On Sat, May 21, 2016 at 3:34 AM, > Onur > > >> > > Karaman < > > >> > > > > > >> > > > > > > > >>>>>> onurkaraman.apa...@gmail.com> > > >> > > > > > >> > > > > > > > >>>>>> wrote: > > >> > > > > > >> > > > > > > > >>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> Hey everyone. So I started doing > some > > >> > tests > > >> > > on > > >> > > > > the > > >> > > > > > >> new > > >> > > > > > >> > > > > > > > >>>>>> consumer/coordinator > > >> > > > > > >> > > > > > > > >>>>>>> to see if it could handle more > > >> strenuous > > >> > use > > >> > > > > cases > > >> > > > > > >> like > > >> > > > > > >> > > > > > mirroring > > >> > > > > > >> > > > > > > > >>>>>> clusters > > >> > > > > > >> > > > > > > > >>>>>>> with thousands of topics and > thought > > >> I'd > > >> > > share > > >> > > > > > >> > whatever I > > >> > > > > > >> > > > > have > > >> > > > > > >> > > > > > so > > >> > > > > > >> > > > > > > > >>>> far. > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> The scalability limit: the amount > of > > >> group > > >> > > > > > metadata > > >> > > > > > >> we > > >> > > > > > >> > > can > > >> > > > > > >> > > > > fit > > >> > > > > > >> > > > > > > > >> into > > >> > > > > > >> > > > > > > > >>>> one > > >> > > > > > >> > > > > > > > >>>>>>> message > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> Some background: > > >> > > > > > >> > > > > > > > >>>>>>> Client-side assignment is > implemented > > >> in > > >> > two > > >> > > > > > phases > > >> > > > > > >> > > > > > > > >>>>>>> 1. a PreparingRebalance phase that > > >> > > identifies > > >> > > > > > >> members > > >> > > > > > >> > of > > >> > > > > > >> > > > the > > >> > > > > > >> > > > > > > > >> group > > >> > > > > > >> > > > > > > > >>>> and > > >> > > > > > >> > > > > > > > >>>>>>> aggregates member subscriptions. > > >> > > > > > >> > > > > > > > >>>>>>> 2. an AwaitingSync phase that waits > > for > > >> > the > > >> > > > > group > > >> > > > > > >> > leader > > >> > > > > > >> > > to > > >> > > > > > >> > > > > > > > >> decide > > >> > > > > > >> > > > > > > > >>>>> member > > >> > > > > > >> > > > > > > > >>>>>>> assignments based on the member > > >> > > subscriptions > > >> > > > > > across > > >> > > > > > >> > the > > >> > > > > > >> > > > > group. > > >> > > > > > >> > > > > > > > >>>>>>> - The leader announces this > decision > > >> > with a > > >> > > > > > >> > > > > SyncGroupRequest. > > >> > > > > > >> > > > > > > > >> The > > >> > > > > > >> > > > > > > > >>>>>>> GroupCoordinator handles > > >> SyncGroupRequests > > >> > > by > > >> > > > > > >> appending > > >> > > > > > >> > > all > > >> > > > > > >> > > > > > group > > >> > > > > > >> > > > > > > > >>>> state > > >> > > > > > >> > > > > > > > >>>>>>> into a single message under the > > >> > > > > __consumer_offsets > > >> > > > > > >> > topic. > > >> > > > > > >> > > > > This > > >> > > > > > >> > > > > > > > >>>> message > > >> > > > > > >> > > > > > > > >>>>> is > > >> > > > > > >> > > > > > > > >>>>>>> keyed on the group id and contains > > each > > >> > > member > > >> > > > > > >> > > subscription > > >> > > > > > >> > > > > as > > >> > > > > > >> > > > > > > > >> well > > >> > > > > > >> > > > > > > > >>>> as > > >> > > > > > >> > > > > > > > >>>>>> the > > >> > > > > > >> > > > > > > > >>>>>>> decided assignment for each member. > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> The environment: > > >> > > > > > >> > > > > > > > >>>>>>> - one broker > > >> > > > > > >> > > > > > > > >>>>>>> - one __consumer_offsets partition > > >> > > > > > >> > > > > > > > >>>>>>> - offsets.topic.compression.codec=1 > > // > > >> > this > > >> > > is > > >> > > > > > gzip > > >> > > > > > >> > > > > > > > >>>>>>> - broker has my pending KAFKA-3718 > > >> patch > > >> > > that > > >> > > > > > >> actually > > >> > > > > > >> > > > makes > > >> > > > > > >> > > > > > use > > >> > > > > > >> > > > > > > > >> of > > >> > > > > > >> > > > > > > > >>>>>>> offsets.topic.compression.codec: > > >> > > > > > >> > > > > > > > >>>>>> > > >> https://github.com/apache/kafka/pull/1394 > > >> > > > > > >> > > > > > > > >>>>>>> - around 3000 topics. This is an > > actual > > >> > > subset > > >> > > > > of > > >> > > > > > >> > topics > > >> > > > > > >> > > > from > > >> > > > > > >> > > > > > one > > >> > > > > > >> > > > > > > > >>> of > > >> > > > > > >> > > > > > > > >>>>> our > > >> > > > > > >> > > > > > > > >>>>>>> clusters. > > >> > > > > > >> > > > > > > > >>>>>>> - topics have 8 partitions > > >> > > > > > >> > > > > > > > >>>>>>> - topics are 25 characters long on > > >> average > > >> > > > > > >> > > > > > > > >>>>>>> - one group with a varying number > of > > >> > > consumers > > >> > > > > > each > > >> > > > > > >> > > > hardcoded > > >> > > > > > >> > > > > > > > >> with > > >> > > > > > >> > > > > > > > >>>> all > > >> > > > > > >> > > > > > > > >>>>>> the > > >> > > > > > >> > > > > > > > >>>>>>> topics just to make the tests more > > >> > > consistent. > > >> > > > > > >> > > wildcarding > > >> > > > > > >> > > > > with > > >> > > > > > >> > > > > > > > >> .* > > >> > > > > > >> > > > > > > > >>>>> should > > >> > > > > > >> > > > > > > > >>>>>>> have the same effect once the > > >> subscription > > >> > > > hits > > >> > > > > > the > > >> > > > > > >> > > > > coordinator > > >> > > > > > >> > > > > > > > >> as > > >> > > > > > >> > > > > > > > >>>> the > > >> > > > > > >> > > > > > > > >>>>>>> subscription has already been fully > > >> > expanded > > >> > > > out > > >> > > > > > to > > >> > > > > > >> the > > >> > > > > > >> > > > list > > >> > > > > > >> > > > > of > > >> > > > > > >> > > > > > > > >>>> topics > > >> > > > > > >> > > > > > > > >>>>> by > > >> > > > > > >> > > > > > > > >>>>>>> the consumers. > > >> > > > > > >> > > > > > > > >>>>>>> - I added some log messages to > > >> Log.scala > > >> > to > > >> > > > > print > > >> > > > > > >> out > > >> > > > > > >> > the > > >> > > > > > >> > > > > > message > > >> > > > > > >> > > > > > > > >>>> sizes > > >> > > > > > >> > > > > > > > >>>>>>> after compression > > >> > > > > > >> > > > > > > > >>>>>>> - there are no producers at all and > > >> auto > > >> > > > commits > > >> > > > > > are > > >> > > > > > >> > > > > disabled. > > >> > > > > > >> > > > > > > > >> The > > >> > > > > > >> > > > > > > > >>>> only > > >> > > > > > >> > > > > > > > >>>>>>> topic with messages getting added > is > > >> the > > >> > > > > > >> > > __consumer_offsets > > >> > > > > > >> > > > > > topic > > >> > > > > > >> > > > > > > > >>> and > > >> > > > > > >> > > > > > > > >>>>>>> they're only from storing group > > >> metadata > > >> > > while > > >> > > > > > >> > processing > > >> > > > > > >> > > > > > > > >>>>>>> SyncGroupRequests. > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> Results: > > >> > > > > > >> > > > > > > > >>>>>>> The results below show that we > exceed > > >> the > > >> > > > > 1000012 > > >> > > > > > >> byte > > >> > > > > > >> > > > > > > > >>>>>>> KafkaConfig.messageMaxBytes limit > > >> > relatively > > >> > > > > > quickly > > >> > > > > > >> > > > (between > > >> > > > > > >> > > > > > > > >> 30-40 > > >> > > > > > >> > > > > > > > >>>>>>> consumers): > > >> > > > > > >> > > > > > > > >>>>>>> 1 consumer 54739 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 5 consumers 261524 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 10 consumers 459804 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 20 consumers 702499 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 30 consumers 930525 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 40 consumers 1115657 bytes * the > > >> tipping > > >> > > point > > >> > > > > > >> > > > > > > > >>>>>>> 50 consumers 1363112 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 60 consumers 1598621 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 70 consumers 1837359 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 80 consumers 2066934 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 90 consumers 2310970 bytes > > >> > > > > > >> > > > > > > > >>>>>>> 100 consumers 2542735 bytes > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> Note that the growth itself is > pretty > > >> > > gradual. > > >> > > > > > >> Plotting > > >> > > > > > >> > > the > > >> > > > > > >> > > > > > > > >> points > > >> > > > > > >> > > > > > > > >>>>> makes > > >> > > > > > >> > > > > > > > >>>>>> it > > >> > > > > > >> > > > > > > > >>>>>>> look roughly linear w.r.t the > number > > of > > >> > > > > consumers: > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>> > > >> > > > > > >> > > > > > > > >>>>> > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>> > > >> > > > > > >> > > > > > > > >> > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735) > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> Also note that these numbers aren't > > >> > averages > > >> > > > or > > >> > > > > > >> medians > > >> > > > > > >> > > or > > >> > > > > > >> > > > > > > > >> anything > > >> > > > > > >> > > > > > > > >>>>> like > > >> > > > > > >> > > > > > > > >>>>>>> that. It's just the byte size from > a > > >> given > > >> > > > run. > > >> > > > > I > > >> > > > > > >> did > > >> > > > > > >> > run > > >> > > > > > >> > > > > them > > >> > > > > > >> > > > > > a > > >> > > > > > >> > > > > > > > >>> few > > >> > > > > > >> > > > > > > > >>>>>> times > > >> > > > > > >> > > > > > > > >>>>>>> and saw similar results. > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> Impact: > > >> > > > > > >> > > > > > > > >>>>>>> Even after adding gzip to the > > >> > > > __consumer_offsets > > >> > > > > > >> topic > > >> > > > > > >> > > with > > >> > > > > > >> > > > > my > > >> > > > > > >> > > > > > > > >>>> pending > > >> > > > > > >> > > > > > > > >>>>>>> KAFKA-3718 patch, the AwaitingSync > > >> phase > > >> > of > > >> > > > the > > >> > > > > > >> group > > >> > > > > > >> > > fails > > >> > > > > > >> > > > > > with > > >> > > > > > >> > > > > > > > >>>>>>> RecordTooLargeException. This means > > the > > >> > > > combined > > >> > > > > > >> size > > >> > > > > > >> > of > > >> > > > > > >> > > > each > > >> > > > > > >> > > > > > > > >>>> member's > > >> > > > > > >> > > > > > > > >>>>>>> subscriptions and assignments > > exceeded > > >> the > > >> > > > > > >> > > > > > > > >>>> KafkaConfig.messageMaxBytes > > >> > > > > > >> > > > > > > > >>>>> of > > >> > > > > > >> > > > > > > > >>>>>>> 1000012 bytes. The group ends up > > dying. > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> Options: > > >> > > > > > >> > > > > > > > >>>>>>> 1. Config change: reduce the number > > of > > >> > > > consumers > > >> > > > > > in > > >> > > > > > >> the > > >> > > > > > >> > > > > group. > > >> > > > > > >> > > > > > > > >> This > > >> > > > > > >> > > > > > > > >>>>> isn't > > >> > > > > > >> > > > > > > > >>>>>>> always a realistic answer in more > > >> > strenuous > > >> > > > use > > >> > > > > > >> cases > > >> > > > > > >> > > like > > >> > > > > > >> > > > > > > > >>>> MirrorMaker > > >> > > > > > >> > > > > > > > >>>>>>> clusters or for auditing. > > >> > > > > > >> > > > > > > > >>>>>>> 2. Config change: split the group > > into > > >> > > smaller > > >> > > > > > >> groups > > >> > > > > > >> > > which > > >> > > > > > >> > > > > > > > >>> together > > >> > > > > > >> > > > > > > > >>>>> will > > >> > > > > > >> > > > > > > > >>>>>>> get full coverage of the topics. > This > > >> > gives > > >> > > > each > > >> > > > > > >> group > > >> > > > > > >> > > > > member a > > >> > > > > > >> > > > > > > > >>>> smaller > > >> > > > > > >> > > > > > > > >>>>>>> subscription.(ex: g1 has topics > > >> starting > > >> > > with > > >> > > > > a-m > > >> > > > > > >> while > > >> > > > > > >> > > g2 > > >> > > > > > >> > > > > has > > >> > > > > > >> > > > > > > > >>> topics > > >> > > > > > >> > > > > > > > >>>>>>> starting ith n-z). This would be > > >> > > operationally > > >> > > > > > >> painful > > >> > > > > > >> > to > > >> > > > > > >> > > > > > manage. > > >> > > > > > >> > > > > > > > >>>>>>> 3. Config change: split the topics > > >> among > > >> > > > members > > >> > > > > > of > > >> > > > > > >> the > > >> > > > > > >> > > > > group. > > >> > > > > > >> > > > > > > > >>> Again > > >> > > > > > >> > > > > > > > >>>>> this > > >> > > > > > >> > > > > > > > >>>>>>> gives each group member a smaller > > >> > > > subscription. > > >> > > > > > This > > >> > > > > > >> > > would > > >> > > > > > >> > > > > also > > >> > > > > > >> > > > > > > > >> be > > >> > > > > > >> > > > > > > > >>>>>>> operationally painful to manage. > > >> > > > > > >> > > > > > > > >>>>>>> 4. Config change: bump up > > >> > > > > > >> KafkaConfig.messageMaxBytes > > >> > > > > > >> > (a > > >> > > > > > >> > > > > > > > >>> topic-level > > >> > > > > > >> > > > > > > > >>>>>>> config) and > > >> > KafkaConfig.replicaFetchMaxBytes > > >> > > > (a > > >> > > > > > >> > > > broker-level > > >> > > > > > >> > > > > > > > >>> config). > > >> > > > > > >> > > > > > > > >>>>>>> Applying messageMaxBytes to just > the > > >> > > > > > >> __consumer_offsets > > >> > > > > > >> > > > topic > > >> > > > > > >> > > > > > > > >> seems > > >> > > > > > >> > > > > > > > >>>>>>> relatively harmless, but bumping up > > the > > >> > > > > > broker-level > > >> > > > > > >> > > > > > > > >>>>> replicaFetchMaxBytes > > >> > > > > > >> > > > > > > > >>>>>>> would probably need more attention. > > >> > > > > > >> > > > > > > > >>>>>>> 5. Config change: try different > > >> > compression > > >> > > > > > codecs. > > >> > > > > > >> > Based > > >> > > > > > >> > > > on > > >> > > > > > >> > > > > 2 > > >> > > > > > >> > > > > > > > >>>> minutes > > >> > > > > > >> > > > > > > > >>>>> of > > >> > > > > > >> > > > > > > > >>>>>>> googling, it seems like lz4 and > > snappy > > >> are > > >> > > > > faster > > >> > > > > > >> than > > >> > > > > > >> > > gzip > > >> > > > > > >> > > > > but > > >> > > > > > >> > > > > > > > >>> have > > >> > > > > > >> > > > > > > > >>>>>> worse > > >> > > > > > >> > > > > > > > >>>>>>> compression, so this probably won't > > >> help. > > >> > > > > > >> > > > > > > > >>>>>>> 6. Implementation change: support > > >> sending > > >> > > the > > >> > > > > > regex > > >> > > > > > >> > over > > >> > > > > > >> > > > the > > >> > > > > > >> > > > > > wire > > >> > > > > > >> > > > > > > > >>>>> instead > > >> > > > > > >> > > > > > > > >>>>>>> of the fully expanded topic > > >> > subscriptions. I > > >> > > > > think > > >> > > > > > >> > people > > >> > > > > > >> > > > > said > > >> > > > > > >> > > > > > in > > >> > > > > > >> > > > > > > > >>> the > > >> > > > > > >> > > > > > > > >>>>>> past > > >> > > > > > >> > > > > > > > >>>>>>> that different languages have > subtle > > >> > > > differences > > >> > > > > > in > > >> > > > > > >> > > regex, > > >> > > > > > >> > > > so > > >> > > > > > >> > > > > > > > >> this > > >> > > > > > >> > > > > > > > >>>>>> doesn't > > >> > > > > > >> > > > > > > > >>>>>>> play nicely with cross-language > > groups. > > >> > > > > > >> > > > > > > > >>>>>>> 7. Implementation change: maybe we > > can > > >> > > reverse > > >> > > > > the > > >> > > > > > >> > > mapping? > > >> > > > > > >> > > > > > > > >> Instead > > >> > > > > > >> > > > > > > > >>>> of > > >> > > > > > >> > > > > > > > >>>>>>> mapping from member to > subscriptions, > > >> we > > >> > can > > >> > > > > map a > > >> > > > > > >> > > > > subscription > > >> > > > > > >> > > > > > > > >> to > > >> > > > > > >> > > > > > > > >>> a > > >> > > > > > >> > > > > > > > >>>>> list > > >> > > > > > >> > > > > > > > >>>>>>> of members. > > >> > > > > > >> > > > > > > > >>>>>>> 8. Implementation change: maybe we > > can > > >> try > > >> > > to > > >> > > > > > break > > >> > > > > > >> > apart > > >> > > > > > >> > > > the > > >> > > > > > >> > > > > > > > >>>>>> subscription > > >> > > > > > >> > > > > > > > >>>>>>> and assignments from the same > > >> > > SyncGroupRequest > > >> > > > > > into > > >> > > > > > >> > > > multiple > > >> > > > > > >> > > > > > > > >>> records? > > >> > > > > > >> > > > > > > > >>>>>> They > > >> > > > > > >> > > > > > > > >>>>>>> can still go to the same message > set > > >> and > > >> > get > > >> > > > > > >> appended > > >> > > > > > >> > > > > together. > > >> > > > > > >> > > > > > > > >>> This > > >> > > > > > >> > > > > > > > >>>>> way > > >> > > > > > >> > > > > > > > >>>>>>> the limit become the segment size, > > >> which > > >> > > > > shouldn't > > >> > > > > > >> be a > > >> > > > > > >> > > > > > problem. > > >> > > > > > >> > > > > > > > >>> This > > >> > > > > > >> > > > > > > > >>>>> can > > >> > > > > > >> > > > > > > > >>>>>>> be tricky to get right because > we're > > >> > > currently > > >> > > > > > >> keying > > >> > > > > > >> > > these > > >> > > > > > >> > > > > > > > >>> messages > > >> > > > > > >> > > > > > > > >>>> on > > >> > > > > > >> > > > > > > > >>>>>> the > > >> > > > > > >> > > > > > > > >>>>>>> group, so I think records from the > > same > > >> > > > > rebalance > > >> > > > > > >> might > > >> > > > > > >> > > > > > > > >>> accidentally > > >> > > > > > >> > > > > > > > >>>>>>> compact one another, but my > > >> understanding > > >> > of > > >> > > > > > >> compaction > > >> > > > > > >> > > > isn't > > >> > > > > > >> > > > > > > > >> that > > >> > > > > > >> > > > > > > > >>>>> great. > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> Todo: > > >> > > > > > >> > > > > > > > >>>>>>> It would be interesting to rerun > the > > >> tests > > >> > > > with > > >> > > > > no > > >> > > > > > >> > > > > compression > > >> > > > > > >> > > > > > > > >> just > > >> > > > > > >> > > > > > > > >>>> to > > >> > > > > > >> > > > > > > > >>>>>> see > > >> > > > > > >> > > > > > > > >>>>>>> how much gzip is helping but it's > > >> getting > > >> > > > late. > > >> > > > > > >> Maybe > > >> > > > > > >> > > > > tomorrow? > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>>> - Onur > > >> > > > > > >> > > > > > > > >>>>>>> > > >> > > > > > >> > > > > > > > >>>>>> > > >> > > > > > >> > > > > > > > >>>>> > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>>> -- > > >> > > > > > >> > > > > > > > >>>> -- Guozhang > > >> > > > > > >> > > > > > > > >>>> > > >> > > > > > >> > > > > > > > >>> > > >> > > > > > >> > > > > > > > >> > > >> > > > > > >> > > > > > > > >> > > >> > > > > > >> > > > > > > > >> > > >> > > > > > >> > > > > > > > >> -- > > >> > > > > > >> > > > > > > > >> -- Guozhang > > >> > > > > > >> > > > > > > > >> > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > -- > > >> > > > > > >> > -- Guozhang > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >