I think the value of adding a "offsets.replica.fetch.max.bytes" config is
that we don't break/change the meaning of "replica.fetch.max.bytes".

We can also set "offsets.replica.fetch.max.bytes" to be a value safely
larger than what we expect to ever allow the __consumer_offsets topic max
message size to be without doing the larger change of bumping up the global
"replica.fetch.max.bytes".

On Thu, Jun 9, 2016 at 10:40 AM, Becket Qin <becket....@gmail.com> wrote:

> I think taking bigger one of the fetch size and message size limit is
> probably good enough. If we have a separate
> "offset.replica.fetch.max.bytes", I guess the value will always be set to
> max message size of the __consumer_offsets topic, which does not seem to
> have much value.
>
> On Thu, Jun 9, 2016 at 3:15 AM, Onur Karaman <okara...@linkedin.com.invalid
> >
> wrote:
>
> > Maybe another approach can be to add a new
> > "offsets.replica.fetch.max.bytes" config on the brokers.
> >
> > On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman <okara...@linkedin.com>
> > wrote:
> >
> > > I made a PR with a tweak to Jun's/Becket's proposal:
> > > https://github.com/apache/kafka/pull/1484
> > >
> > > It just tweaks the fetch behavior specifically for replicas fetching
> from
> > > the __consumer_offsets topic when the fetcher's
> "replica.fetch.max.bytes"
> > > is less than the __consumer_offset leader's "message.max.bytes" to take
> > the
> > > max of the two.
> > >
> > > I'm honestly not that happy with this solution, as I'd rather not
> change
> > > the "replica.fetch.max.bytes" config from being a limit to a
> > > recommendation. I'd definitely be happy to hear other alternatives!
> > >
> > > On Sun, May 29, 2016 at 1:57 PM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com> wrote:
> > >
> > >> Sorry I know next to nothing about Kafka Connect. I didn't understand
> > the
> > >> Kafka Connect / MM idea you brought up. Can you go into more detail?
> > >>
> > >> Otherwise I think our remaining options are:
> > >> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for
> > >> __consumer_offsets topic and change the fetch behavior when message
> size
> > >> is
> > >> larger than fetch size
> > >> - option 6: support sending the regex over the wire instead of the
> fully
> > >> expanded topic subscriptions. This should cut down the message size
> from
> > >> the subscription side. Again this only helps when pattern-based
> > >> subscriptions are done.
> > >>
> > >> minor correction to an earlier comment I made regarding the message
> > size:
> > >> message size ~ sum(s_i + a_i for i in range [1, |C|])
> > >>
> > >> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson <ja...@confluent.io>
> > >> wrote:
> > >>
> > >> > Hey Onur,
> > >> >
> > >> > Thanks for the investigation. It seems the conclusion is that the
> > >> compact
> > >> > format helps, but perhaps not enough to justify adding a new
> > assignment
> > >> > schema? I'm not sure there's much more room for savings unless we
> > change
> > >> > something more fundamental in the assignment approach. We spent some
> > >> time
> > >> > thinking before about whether we could let the consumers compute
> their
> > >> > assignment locally from a smaller set of information, but the
> > difficulty
> > >> > (I'm sure you remember) is reaching consensus on topic metadata.
> Kafka
> > >> > Connect has a similar problem where all the workers need to agree on
> > >> > connector configurations. Since all configs are stored in a single
> > topic
> > >> > partition, the approach we take there is to propagate the offset in
> > the
> > >> > assignment protocol. Not sure if we could do something similar for
> > MM...
> > >> > Anyway, it seems like the best workaround at the moment is Jun's
> > initial
> > >> > suggestion. What do you think?
> > >> >
> > >> > -Jason
> > >> >
> > >> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman <
> > >> > onurkaraman.apa...@gmail.com
> > >> > > wrote:
> > >> >
> > >> > > I gave the topic index assignment trick a try against the same
> > >> > environment.
> > >> > > The implementation just changed the assignment serialization and
> > >> > > deserialization logic. It didn't change SyncGroupResponse, meaning
> > it
> > >> > > continues to exclude the subscription from the SyncGroupResponse
> and
> > >> > > assumes the member has kept track of its last subscription.
> > >> > >
> > >> > > Assignment topic indexing with compression:
> > >> > > 1 consumer 34346 bytes
> > >> > > 5 consumers 177687 bytes
> > >> > > 10 consumers 331897 bytes
> > >> > > 20 consumers 572467 bytes
> > >> > > 30 consumers 811269 bytes
> > >> > > 40 consumers 1047188 bytes * the tipping point
> > >> > > 50 consumers 1290092 bytes
> > >> > > 60 consumers 1527806 bytes
> > >> > > 70 consumers 1769259 bytes
> > >> > > 80 consumers 2000118 bytes
> > >> > > 90 consumers 2244392 bytes
> > >> > > 100 consumers 2482415 bytes
> > >> > >
> > >> > > Assignment topic indexing without compression:
> > >> > > 1 consumer 211904 bytes
> > >> > > 5 consumers 677184 bytes
> > >> > > 10 consumers 1211154 bytes * the tipping point
> > >> > > 20 consumers 2136196 bytes
> > >> > > 30 consumers 3061238 bytes
> > >> > > 40 consumers 3986280 bytes
> > >> > > 50 consumers 4911322 bytes
> > >> > > 60 consumers 5836284 bytes
> > >> > > 70 consumers 6761246 bytes
> > >> > > 80 consumers 7686208 bytes
> > >> > > 90 consumers 8611170 bytes
> > >> > > 100 consumers 9536132 bytes
> > >> > >
> > >> > > Assignment topic indexing seems to reduce the size by 500KB
> without
> > >> > > compression and 80KB with compression. So assignment topic
> indexing
> > >> makes
> > >> > > some difference in both with and without compression but in our
> case
> > >> was
> > >> > > not nearly enough.
> > >> > >
> > >> > > This can be explained by the fact that we aren't actually hitting
> > the
> > >> > worst
> > >> > > case scenario of each consumer being assigned a partition from
> every
> > >> > topic.
> > >> > > The reason is simple: a topic can only fully span all the
> consumers
> > >> if it
> > >> > > has at least as many partitions as there are consumers. Given that
> > >> there
> > >> > > are 8 partitions per topic and we have 100 consumers, it makes
> sense
> > >> that
> > >> > > we aren't close to this worse case scenario where topic indexing
> > would
> > >> > make
> > >> > > a bigger difference.
> > >> > >
> > >> > > I tweaked the group leader's assignment code to print out the
> > >> assignments
> > >> > > and found that each consumer was getting either 238 or 239
> > partitions.
> > >> > Each
> > >> > > of these partitions were from unique topics. So the consumers were
> > >> really
> > >> > > getting partitions from 239 topics instead of the full worst case
> > >> > scenario
> > >> > > of 3000 topics.
> > >> > >
> > >> > > On Wed, May 25, 2016 at 1:42 PM, Jason Gustafson <
> > ja...@confluent.io>
> > >> > > wrote:
> > >> > >
> > >> > > > Gwen, Joel:
> > >> > > >
> > >> > > > That's correct. The protocol does allow us to give an assignor
> its
> > >> own
> > >> > > > assignment schema, but I think this will require a couple
> internal
> > >> > > changes
> > >> > > > to the consumer to make use of the full generality.
> > >> > > >
> > >> > > > One thing I'm a little uncertain about is whether we should use
> a
> > >> > > different
> > >> > > > protocol type. For a little context, the group membership
> protocol
> > >> > allows
> > >> > > > the client to provide a "protocol type" when joining the group
> to
> > >> > ensure
> > >> > > > that all members have some basic semantic compatibility. For
> > >> example,
> > >> > the
> > >> > > > consumer uses "consumer" and Kafka Connect uses "connect."
> > Currently
> > >> > all
> > >> > > > assignors using the "consumer" protocol share a common schema
> for
> > >> > > > representing subscriptions and assignment. This is convenient
> for
> > >> tools
> > >> > > > (like consumer-groups.sh) since they just need to know how to
> > parse
> > >> the
> > >> > > > "consumer" protocol type without knowing anything about the
> > >> assignors.
> > >> > So
> > >> > > > introducing another schema would break that assumption and we'd
> > need
> > >> > > those
> > >> > > > tools to do assignor-specific parsing. Maybe this is OK?
> > >> Alternatively,
> > >> > > we
> > >> > > > could use a separate protocol type (e.g. "compact-consumer"),
> but
> > >> that
> > >> > > > seems less than desirable.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Jason
> > >> > > >
> > >> > > > On Wed, May 25, 2016 at 11:00 AM, Gwen Shapira <
> g...@confluent.io
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > > ah, right - we can add as many strategies as we want.
> > >> > > > >
> > >> > > > > On Wed, May 25, 2016 at 10:54 AM, Joel Koshy <
> > jjkosh...@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >
> > >> > > > > > > Yes it would be a protocol bump.
> > >> > > > > > >
> > >> > > > > >
> > >> > > > > > Sorry - I'm officially confused. I think it may not be
> > required
> > >> -
> > >> > > since
> > >> > > > > the
> > >> > > > > > more compact format would be associated with a new
> assignment
> > >> > > strategy
> > >> > > > -
> > >> > > > > > right?
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > > smaller than the plaintext PAL, but the post-compressed
> > binary
> > >> > PAL
> > >> > > is
> > >> > > > > > just
> > >> > > > > > > 25% smaller than the post-compressed plaintext PAL. IOW
> > using
> > >> a
> > >> > > > symbol
> > >> > > > > > > table helps a lot but further compression on that already
> > >> compact
> > >> > > > > format
> > >> > > > > > > would yield only marginal return.
> > >> > > > > > >
> > >> > > > > >
> > >> > > > > > > So basically I feel we could get pretty far with a more
> > >> compact
> > >> > > field
> > >> > > > > > > format for assignment and if we do that then we would
> > >> potentially
> > >> > > not
> > >> > > > > > even
> > >> > > > > > > want to do any compression.
> > >> > > > > > >
> > >> > > > > >
> > >> > > > > > Also just wanted to add that this compression on the binary
> > PAL
> > >> did
> > >> > > > help
> > >> > > > > > but the compression ratio was obviously not as high as
> > plaintext
> > >> > > > > > compression.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > >
> > >> > > > > > > On Tue, May 24, 2016 at 4:19 PM, Gwen Shapira <
> > >> g...@confluent.io
> > >> > >
> > >> > > > > wrote:
> > >> > > > > > >
> > >> > > > > > >> Regarding the change to the assignment field. It would
> be a
> > >> > > protocol
> > >> > > > > > bump,
> > >> > > > > > >> otherwise consumers will not know how to parse the bytes
> > the
> > >> > > broker
> > >> > > > is
> > >> > > > > > >> returning, right?
> > >> > > > > > >> Or did I misunderstand the suggestion?
> > >> > > > > > >>
> > >> > > > > > >> On Tue, May 24, 2016 at 2:52 PM, Guozhang Wang <
> > >> > > wangg...@gmail.com>
> > >> > > > > > >> wrote:
> > >> > > > > > >>
> > >> > > > > > >> > I think for just solving issue 1), Jun's suggestion is
> > >> > > sufficient
> > >> > > > > and
> > >> > > > > > >> > simple. So I'd prefer that approach.
> > >> > > > > > >> >
> > >> > > > > > >> > In addition, Jason's optimization on the assignment
> field
> > >> > would
> > >> > > be
> > >> > > > > > good
> > >> > > > > > >> for
> > >> > > > > > >> > 2) and 3) as well, and I like that optimization for its
> > >> > > simplicity
> > >> > > > > and
> > >> > > > > > >> no
> > >> > > > > > >> > format change as well. And in the future I'm in favor
> of
> > >> > > > considering
> > >> > > > > > to
> > >> > > > > > >> > change the in-memory cache format as Jiangjie
> suggested.
> > >> > > > > > >> >
> > >> > > > > > >> > Guozhang
> > >> > > > > > >> >
> > >> > > > > > >> >
> > >> > > > > > >> > On Tue, May 24, 2016 at 12:42 PM, Becket Qin <
> > >> > > > becket....@gmail.com>
> > >> > > > > > >> wrote:
> > >> > > > > > >> >
> > >> > > > > > >> > > Hi Jason,
> > >> > > > > > >> > >
> > >> > > > > > >> > > There are a few problems we want to solve here:
> > >> > > > > > >> > > 1. The group metadata is too big to be appended to
> the
> > >> log.
> > >> > > > > > >> > > 2. Reduce the memory footprint on the broker
> > >> > > > > > >> > > 3. Reduce the bytes transferred over the wire.
> > >> > > > > > >> > >
> > >> > > > > > >> > > To solve (1), I like your idea of having separate
> > >> messages
> > >> > per
> > >> > > > > > member.
> > >> > > > > > >> > The
> > >> > > > > > >> > > proposal (Onur's option 8) is to break metadata into
> > >> small
> > >> > > > records
> > >> > > > > > in
> > >> > > > > > >> the
> > >> > > > > > >> > > same uncompressed message set so each record is
> small.
> > I
> > >> > agree
> > >> > > > it
> > >> > > > > > >> would
> > >> > > > > > >> > be
> > >> > > > > > >> > > ideal if we are able to store the metadata separately
> > for
> > >> > each
> > >> > > > > > >> member. I
> > >> > > > > > >> > > was also thinking about storing the metadata into
> > >> multiple
> > >> > > > > messages,
> > >> > > > > > >> too.
> > >> > > > > > >> > > What concerns me was that having multiple messages
> > seems
> > >> > > > breaking
> > >> > > > > > the
> > >> > > > > > >> > > atomicity. I am not sure how we are going to deal
> with
> > >> the
> > >> > > > > potential
> > >> > > > > > >> > > issues. For example, What if group metadata is
> > replicated
> > >> > but
> > >> > > > the
> > >> > > > > > >> member
> > >> > > > > > >> > > metadata is not? It might be fine depending on the
> > >> > > > implementation
> > >> > > > > > >> though,
> > >> > > > > > >> > > but I am not sure.
> > >> > > > > > >> > >
> > >> > > > > > >> > > For (2) we want to store the metadata onto the disk,
> > >> which
> > >> > is
> > >> > > > what
> > >> > > > > > we
> > >> > > > > > >> > have
> > >> > > > > > >> > > to do anyway. The only question is in what format
> > should
> > >> we
> > >> > > > store
> > >> > > > > > >> them.
> > >> > > > > > >> > >
> > >> > > > > > >> > > To address (3) we want to have the metadata to be
> > >> > compressed,
> > >> > > > > which
> > >> > > > > > is
> > >> > > > > > >> > > contradict to the the above solution of (1).
> > >> > > > > > >> > >
> > >> > > > > > >> > > I think Jun's suggestion is probably still the
> > simplest.
> > >> To
> > >> > > > avoid
> > >> > > > > > >> > changing
> > >> > > > > > >> > > the behavior for consumers, maybe we can do that only
> > for
> > >> > > > > > >> offset_topic,
> > >> > > > > > >> > > i.e, if the max fetch bytes of the fetch request is
> > >> smaller
> > >> > > than
> > >> > > > > the
> > >> > > > > > >> > > message size on the offset topic, we always return at
> > >> least
> > >> > > one
> > >> > > > > full
> > >> > > > > > >> > > message. This should avoid the unexpected problem on
> > the
> > >> > > client
> > >> > > > > side
> > >> > > > > > >> > > because supposedly only tools and brokers will fetch
> > from
> > >> > the
> > >> > > > the
> > >> > > > > > >> > internal
> > >> > > > > > >> > > topics,
> > >> > > > > > >> > >
> > >> > > > > > >> > > As a modification to what you suggested, one
> solution I
> > >> was
> > >> > > > > thinking
> > >> > > > > > >> was
> > >> > > > > > >> > to
> > >> > > > > > >> > > have multiple messages in a single compressed
> message.
> > >> That
> > >> > > > means
> > >> > > > > > for
> > >> > > > > > >> > > SyncGroupResponse we still need to read the entire
> > >> > compressed
> > >> > > > > > messages
> > >> > > > > > >> > and
> > >> > > > > > >> > > extract the inner messages, which seems not quite
> > >> different
> > >> > > from
> > >> > > > > > >> having a
> > >> > > > > > >> > > single message containing everything. But let me just
> > >> put it
> > >> > > > here
> > >> > > > > > and
> > >> > > > > > >> see
> > >> > > > > > >> > > if that makes sense.
> > >> > > > > > >> > >
> > >> > > > > > >> > > We can have a map of GroupMetadataKey ->
> > >> > > > GroupMetadataValueOffset.
> > >> > > > > > >> > >
> > >> > > > > > >> > > The GroupMetadataValue is stored in a compressed
> > message.
> > >> > The
> > >> > > > > inner
> > >> > > > > > >> > > messages are the following:
> > >> > > > > > >> > >
> > >> > > > > > >> > > Inner Message 0: Version GroupId Generation
> > >> > > > > > >> > >
> > >> > > > > > >> > > Inner Message 1: MemberId MemberMetadata_1 (we can
> > >> compress
> > >> > > the
> > >> > > > > > bytes
> > >> > > > > > >> > here)
> > >> > > > > > >> > >
> > >> > > > > > >> > > Inner Message 2: MemberId MemberMetadata_2
> > >> > > > > > >> > > ....
> > >> > > > > > >> > > Inner Message N: MemberId MemberMetadata_N
> > >> > > > > > >> > >
> > >> > > > > > >> > > The MemberMetadata format is the following:
> > >> > > > > > >> > >   MemberMetadata => Version Generation ClientId Host
> > >> > > > Subscription
> > >> > > > > > >> > > Assignment
> > >> > > > > > >> > >
> > >> > > > > > >> > > So DescribeGroupResponse will just return the entire
> > >> > > compressed
> > >> > > > > > >> > > GroupMetadataMessage. SyncGroupResponse will return
> the
> > >> > > > > > corresponding
> > >> > > > > > >> > inner
> > >> > > > > > >> > > message.
> > >> > > > > > >> > >
> > >> > > > > > >> > > Thanks,
> > >> > > > > > >> > >
> > >> > > > > > >> > > Jiangjie (Becket) Qin
> > >> > > > > > >> > >
> > >> > > > > > >> > >
> > >> > > > > > >> > >
> > >> > > > > > >> > > On Tue, May 24, 2016 at 9:14 AM, Jason Gustafson <
> > >> > > > > > ja...@confluent.io>
> > >> > > > > > >> > > wrote:
> > >> > > > > > >> > >
> > >> > > > > > >> > > > Hey Becket,
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > I like your idea to store only the offset for the
> > group
> > >> > > > metadata
> > >> > > > > > in
> > >> > > > > > >> > > memory.
> > >> > > > > > >> > > > I think it would be safe to keep it in memory for a
> > >> short
> > >> > > time
> > >> > > > > > after
> > >> > > > > > >> > the
> > >> > > > > > >> > > > rebalance completes, but after that, it's only real
> > >> > purpose
> > >> > > is
> > >> > > > > to
> > >> > > > > > >> > answer
> > >> > > > > > >> > > > DescribeGroup requests, so your proposal makes a
> lot
> > of
> > >> > > sense
> > >> > > > to
> > >> > > > > > me.
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > As for the specific problem with the size of the
> > group
> > >> > > > metadata
> > >> > > > > > >> message
> > >> > > > > > >> > > for
> > >> > > > > > >> > > > the MM case, if we cannot succeed in reducing the
> > size
> > >> of
> > >> > > the
> > >> > > > > > >> > > > subscription/assignment (which I think is still
> > >> probably
> > >> > the
> > >> > > > > best
> > >> > > > > > >> > > > alternative if it can work), then I think there are
> > >> some
> > >> > > > options
> > >> > > > > > for
> > >> > > > > > >> > > > changing the message format (option #8 in Onur's
> > >> initial
> > >> > > > > e-mail).
> > >> > > > > > >> > > > Currently, the key used for storing the group
> > metadata
> > >> is
> > >> > > > this:
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > GroupMetadataKey => Version GroupId
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > And the value is something like this (some details
> > >> > elided):
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > GroupMetadataValue => Version GroupId Generation
> > >> > > > > [MemberMetadata]
> > >> > > > > > >> > > >   MemberMetadata => ClientId Host Subscription
> > >> Assignment
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > I don't think we can change the key without a lot
> of
> > >> pain,
> > >> > > but
> > >> > > > > it
> > >> > > > > > >> seems
> > >> > > > > > >> > > > like we can change the value format. Maybe we can
> > take
> > >> the
> > >> > > > > > >> > > > subscription/assignment payloads out of the value
> and
> > >> > > > introduce
> > >> > > > > a
> > >> > > > > > >> new
> > >> > > > > > >> > > > "MemberMetadata" message for each member in the
> > group.
> > >> For
> > >> > > > > > example:
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > MemberMetadataKey => Version GroupId MemberId
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > MemberMetadataValue => Version Generation ClientId
> > Host
> > >> > > > > > Subscription
> > >> > > > > > >> > > > Assignment
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > When a new generation is created, we would first
> > write
> > >> the
> > >> > > > group
> > >> > > > > > >> > metadata
> > >> > > > > > >> > > > message which includes the generation and all of
> the
> > >> > > > memberIds,
> > >> > > > > > and
> > >> > > > > > >> > then
> > >> > > > > > >> > > > we'd write the member metadata messages. To answer
> > the
> > >> > > > > > DescribeGroup
> > >> > > > > > >> > > > request, we'd read the group metadata at the cached
> > >> offset
> > >> > > > and,
> > >> > > > > > >> > depending
> > >> > > > > > >> > > > on the version, all of the following member
> metadata.
> > >> This
> > >> > > > would
> > >> > > > > > be
> > >> > > > > > >> > more
> > >> > > > > > >> > > > complex to maintain, but it seems doable if it
> comes
> > to
> > >> > it.
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > Thanks,
> > >> > > > > > >> > > > Jason
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > On Mon, May 23, 2016 at 6:15 PM, Becket Qin <
> > >> > > > > becket....@gmail.com
> > >> > > > > > >
> > >> > > > > > >> > > wrote:
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > > It might worth thinking a little further. We have
> > >> > > discussed
> > >> > > > > this
> > >> > > > > > >> > before
> > >> > > > > > >> > > > > that we want to avoid holding all the group
> > metadata
> > >> in
> > >> > > > > memory.
> > >> > > > > > >> > > > >
> > >> > > > > > >> > > > > I am thinking about the following end state:
> > >> > > > > > >> > > > >
> > >> > > > > > >> > > > > 1. Enable compression on the offset topic.
> > >> > > > > > >> > > > > 2. Instead of holding the entire group metadata
> in
> > >> > memory
> > >> > > on
> > >> > > > > the
> > >> > > > > > >> > > brokers,
> > >> > > > > > >> > > > > each broker only keeps a [group -> Offset] map,
> the
> > >> > offset
> > >> > > > > > points
> > >> > > > > > >> to
> > >> > > > > > >> > > the
> > >> > > > > > >> > > > > message in the offset topic which holds the
> latest
> > >> > > metadata
> > >> > > > of
> > >> > > > > > the
> > >> > > > > > >> > > group.
> > >> > > > > > >> > > > > 3. DescribeGroupResponse will read from the
> offset
> > >> topic
> > >> > > > > > directly
> > >> > > > > > >> > like
> > >> > > > > > >> > > a
> > >> > > > > > >> > > > > normal consumption, except that only exactly one
> > >> message
> > >> > > > will
> > >> > > > > be
> > >> > > > > > >> > > > returned.
> > >> > > > > > >> > > > > 4. SyncGroupResponse will read the message,
> extract
> > >> the
> > >> > > > > > assignment
> > >> > > > > > >> > part
> > >> > > > > > >> > > > and
> > >> > > > > > >> > > > > send back the partition assignment. We can
> compress
> > >> the
> > >> > > > > > partition
> > >> > > > > > >> > > > > assignment before sends it out if we want.
> > >> > > > > > >> > > > >
> > >> > > > > > >> > > > > Jiangjie (Becket) Qin
> > >> > > > > > >> > > > >
> > >> > > > > > >> > > > > On Mon, May 23, 2016 at 5:08 PM, Jason Gustafson
> <
> > >> > > > > > >> ja...@confluent.io
> > >> > > > > > >> > >
> > >> > > > > > >> > > > > wrote:
> > >> > > > > > >> > > > >
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > > > Jason, doesn't gzip (or other compression)
> > >> basically
> > >> > > do
> > >> > > > > > this?
> > >> > > > > > >> If
> > >> > > > > > >> > > the
> > >> > > > > > >> > > > > > topic
> > >> > > > > > >> > > > > > > is a string and the topic is repeated
> > throughout,
> > >> > > won't
> > >> > > > > > >> > compression
> > >> > > > > > >> > > > > > > basically replace all repeated instances of
> it
> > >> with
> > >> > an
> > >> > > > > index
> > >> > > > > > >> > > > reference
> > >> > > > > > >> > > > > to
> > >> > > > > > >> > > > > > > the full string?
> > >> > > > > > >> > > > > >
> > >> > > > > > >> > > > > >
> > >> > > > > > >> > > > > > Hey James, yeah, that's probably true, but keep
> > in
> > >> > mind
> > >> > > > that
> > >> > > > > > the
> > >> > > > > > >> > > > > > compression happens on the broker side. It
> would
> > be
> > >> > nice
> > >> > > > to
> > >> > > > > > >> have a
> > >> > > > > > >> > > more
> > >> > > > > > >> > > > > > compact representation so that get some benefit
> > >> over
> > >> > the
> > >> > > > > wire
> > >> > > > > > as
> > >> > > > > > >> > > well.
> > >> > > > > > >> > > > > This
> > >> > > > > > >> > > > > > seems to be less of a concern here, so the
> bigger
> > >> > gains
> > >> > > > are
> > >> > > > > > >> > probably
> > >> > > > > > >> > > > from
> > >> > > > > > >> > > > > > reducing the number of partitions that need to
> be
> > >> > listed
> > >> > > > > > >> > > individually.
> > >> > > > > > >> > > > > >
> > >> > > > > > >> > > > > > -Jason
> > >> > > > > > >> > > > > >
> > >> > > > > > >> > > > > > On Mon, May 23, 2016 at 4:23 PM, Onur Karaman <
> > >> > > > > > >> > > > > > onurkaraman.apa...@gmail.com>
> > >> > > > > > >> > > > > > wrote:
> > >> > > > > > >> > > > > >
> > >> > > > > > >> > > > > > > When figuring out these optimizations, it's
> > worth
> > >> > > > keeping
> > >> > > > > in
> > >> > > > > > >> mind
> > >> > > > > > >> > > the
> > >> > > > > > >> > > > > > > improvements when the message is uncompressed
> > vs
> > >> > when
> > >> > > > it's
> > >> > > > > > >> > > > compressed.
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > > > When uncompressed:
> > >> > > > > > >> > > > > > > Fixing the Assignment serialization to
> instead
> > >> be a
> > >> > > > topic
> > >> > > > > > >> index
> > >> > > > > > >> > > into
> > >> > > > > > >> > > > > the
> > >> > > > > > >> > > > > > > corresponding member's subscription list
> would
> > >> > usually
> > >> > > > be
> > >> > > > > a
> > >> > > > > > >> good
> > >> > > > > > >> > > > thing.
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > > > I think the proposal is only worse when the
> > topic
> > >> > > names
> > >> > > > > are
> > >> > > > > > >> > small.
> > >> > > > > > >> > > > The
> > >> > > > > > >> > > > > > > Type.STRING we use in our protocol for the
> > >> > > assignment's
> > >> > > > > > >> > > > TOPIC_KEY_NAME
> > >> > > > > > >> > > > > is
> > >> > > > > > >> > > > > > > limited in length to Short.MAX_VALUE, so our
> > >> strings
> > >> > > are
> > >> > > > > > first
> > >> > > > > > >> > > > > prepended
> > >> > > > > > >> > > > > > > with 2 bytes to indicate the string size.
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > > > The new proposal does worse when:
> > >> > > > > > >> > > > > > > 2 + utf_encoded_string_payload_size <
> > >> > index_type_size
> > >> > > > > > >> > > > > > > in other words when:
> > >> > > > > > >> > > > > > > utf_encoded_string_payload_size <
> > >> index_type_size -
> > >> > 2
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > > > If the index type ends up being Type.INT32,
> > then
> > >> the
> > >> > > > > > proposal
> > >> > > > > > >> is
> > >> > > > > > >> > > > worse
> > >> > > > > > >> > > > > > when
> > >> > > > > > >> > > > > > > the topic is length 1.
> > >> > > > > > >> > > > > > > If the index type ends up being Type.INT64,
> > then
> > >> the
> > >> > > > > > proposal
> > >> > > > > > >> is
> > >> > > > > > >> > > > worse
> > >> > > > > > >> > > > > > when
> > >> > > > > > >> > > > > > > the topic is less than length 6.
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > > > When compressed:
> > >> > > > > > >> > > > > > > As James Cheng brought up, I'm not sure how
> > >> things
> > >> > > > change
> > >> > > > > > when
> > >> > > > > > >> > > > > > compression
> > >> > > > > > >> > > > > > > comes into the picture. This would be worth
> > >> > > > investigating.
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > > > On Mon, May 23, 2016 at 4:05 PM, James Cheng
> <
> > >> > > > > > >> > wushuja...@gmail.com
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > > > wrote:
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > > > >
> > >> > > > > > >> > > > > > > > > On May 23, 2016, at 10:59 AM, Jason
> > >> Gustafson <
> > >> > > > > > >> > > > ja...@confluent.io>
> > >> > > > > > >> > > > > > > > wrote:
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > > 2. Maybe there's a better way to lay out
> > the
> > >> > > > > assignment
> > >> > > > > > >> > without
> > >> > > > > > >> > > > > > needing
> > >> > > > > > >> > > > > > > > to
> > >> > > > > > >> > > > > > > > > explicitly repeat the topic? For example,
> > the
> > >> > > leader
> > >> > > > > > could
> > >> > > > > > >> > sort
> > >> > > > > > >> > > > the
> > >> > > > > > >> > > > > > > > topics
> > >> > > > > > >> > > > > > > > > for each member and just use an integer
> to
> > >> > > represent
> > >> > > > > the
> > >> > > > > > >> > index
> > >> > > > > > >> > > of
> > >> > > > > > >> > > > > > each
> > >> > > > > > >> > > > > > > > > topic within the sorted list (note this
> > >> depends
> > >> > on
> > >> > > > the
> > >> > > > > > >> > > > subscription
> > >> > > > > > >> > > > > > > > > including the full topic list).
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > > Assignment -> [TopicIndex [Partition]]
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > >
> > >> > > > > > >> > > > > > > > Jason, doesn't gzip (or other compression)
> > >> > basically
> > >> > > > do
> > >> > > > > > >> this?
> > >> > > > > > >> > If
> > >> > > > > > >> > > > the
> > >> > > > > > >> > > > > > > topic
> > >> > > > > > >> > > > > > > > is a string and the topic is repeated
> > >> throughout,
> > >> > > > won't
> > >> > > > > > >> > > compression
> > >> > > > > > >> > > > > > > > basically replace all repeated instances of
> > it
> > >> > with
> > >> > > an
> > >> > > > > > index
> > >> > > > > > >> > > > > reference
> > >> > > > > > >> > > > > > to
> > >> > > > > > >> > > > > > > > the full string?
> > >> > > > > > >> > > > > > > >
> > >> > > > > > >> > > > > > > > -James
> > >> > > > > > >> > > > > > > >
> > >> > > > > > >> > > > > > > > > You could even combine these two options
> so
> > >> that
> > >> > > you
> > >> > > > > > have
> > >> > > > > > >> > only
> > >> > > > > > >> > > 3
> > >> > > > > > >> > > > > > > integers
> > >> > > > > > >> > > > > > > > > for each topic assignment:
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > > Assignment -> [TopicIndex MinPartition
> > >> > > MaxPartition]
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > > There may even be better options with a
> > >> little
> > >> > > more
> > >> > > > > > >> thought.
> > >> > > > > > >> > > All
> > >> > > > > > >> > > > of
> > >> > > > > > >> > > > > > > this
> > >> > > > > > >> > > > > > > > is
> > >> > > > > > >> > > > > > > > > just part of the client-side protocol, so
> > it
> > >> > > > wouldn't
> > >> > > > > > >> require
> > >> > > > > > >> > > any
> > >> > > > > > >> > > > > > > version
> > >> > > > > > >> > > > > > > > > bumps on the broker. What do you think?
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > > Thanks,
> > >> > > > > > >> > > > > > > > > Jason
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang
> > >> Wang <
> > >> > > > > > >> > > > wangg...@gmail.com
> > >> > > > > > >> > > > > >
> > >> > > > > > >> > > > > > > > wrote:
> > >> > > > > > >> > > > > > > > >
> > >> > > > > > >> > > > > > > > >> The original concern is that regex may
> not
> > >> be
> > >> > > > > > efficiently
> > >> > > > > > >> > > > > supported
> > >> > > > > > >> > > > > > > > >> across-languages, but if there is a neat
> > >> > > > workaround I
> > >> > > > > > >> would
> > >> > > > > > >> > > love
> > >> > > > > > >> > > > > to
> > >> > > > > > >> > > > > > > > learn.
> > >> > > > > > >> > > > > > > > >>
> > >> > > > > > >> > > > > > > > >> Guozhang
> > >> > > > > > >> > > > > > > > >>
> > >> > > > > > >> > > > > > > > >> On Mon, May 23, 2016 at 5:31 AM, Ismael
> > >> Juma <
> > >> > > > > > >> > > ism...@juma.me.uk
> > >> > > > > > >> > > > >
> > >> > > > > > >> > > > > > > wrote:
> > >> > > > > > >> > > > > > > > >>
> > >> > > > > > >> > > > > > > > >>> +1 to Jun's suggestion.
> > >> > > > > > >> > > > > > > > >>>
> > >> > > > > > >> > > > > > > > >>> Having said that, as a general point, I
> > >> think
> > >> > we
> > >> > > > > > should
> > >> > > > > > >> > > > consider
> > >> > > > > > >> > > > > > > > >> supporting
> > >> > > > > > >> > > > > > > > >>> topic patterns in the wire protocol. It
> > >> > requires
> > >> > > > > some
> > >> > > > > > >> > > thinking
> > >> > > > > > >> > > > > for
> > >> > > > > > >> > > > > > > > >>> cross-language support, but it seems
> > >> > > surmountable
> > >> > > > > and
> > >> > > > > > it
> > >> > > > > > >> > > could
> > >> > > > > > >> > > > > make
> > >> > > > > > >> > > > > > > > >> certain
> > >> > > > > > >> > > > > > > > >>> operations a lot more efficient (the
> fact
> > >> > that a
> > >> > > > > basic
> > >> > > > > > >> > regex
> > >> > > > > > >> > > > > > > > subscription
> > >> > > > > > >> > > > > > > > >>> causes the consumer to request metadata
> > for
> > >> > all
> > >> > > > > topics
> > >> > > > > > >> is
> > >> > > > > > >> > not
> > >> > > > > > >> > > > > > great).
> > >> > > > > > >> > > > > > > > >>>
> > >> > > > > > >> > > > > > > > >>> Ismael
> > >> > > > > > >> > > > > > > > >>>
> > >> > > > > > >> > > > > > > > >>> On Sun, May 22, 2016 at 11:49 PM,
> > Guozhang
> > >> > Wang
> > >> > > <
> > >> > > > > > >> > > > > > wangg...@gmail.com>
> > >> > > > > > >> > > > > > > > >>> wrote:
> > >> > > > > > >> > > > > > > > >>>
> > >> > > > > > >> > > > > > > > >>>> I like Jun's suggestion in changing
> the
> > >> > > handling
> > >> > > > > > >> logics of
> > >> > > > > > >> > > > > single
> > >> > > > > > >> > > > > > > > large
> > >> > > > > > >> > > > > > > > >>>> message on the consumer side.
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>> As for the case of "a single group
> > >> > subscribing
> > >> > > to
> > >> > > > > > 3000
> > >> > > > > > >> > > > topics",
> > >> > > > > > >> > > > > > with
> > >> > > > > > >> > > > > > > > >> 100
> > >> > > > > > >> > > > > > > > >>>> consumers the 2.5Mb Gzip size is
> > >> reasonable
> > >> > to
> > >> > > me
> > >> > > > > > (when
> > >> > > > > > >> > > > storing
> > >> > > > > > >> > > > > in
> > >> > > > > > >> > > > > > > ZK,
> > >> > > > > > >> > > > > > > > >> we
> > >> > > > > > >> > > > > > > > >>>> also have the znode limit which is set
> > to
> > >> 1Mb
> > >> > > by
> > >> > > > > > >> default,
> > >> > > > > > >> > > > though
> > >> > > > > > >> > > > > > > > >>> admittedly
> > >> > > > > > >> > > > > > > > >>>> it is only for one consumer). And if
> we
> > do
> > >> > the
> > >> > > > > change
> > >> > > > > > >> as
> > >> > > > > > >> > Jun
> > >> > > > > > >> > > > > > > > suggested,
> > >> > > > > > >> > > > > > > > >>>> 2.5Mb on follower's memory pressure is
> > OK
> > >> I
> > >> > > > think.
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>> Guozhang
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>> On Sat, May 21, 2016 at 12:51 PM, Onur
> > >> > Karaman
> > >> > > <
> > >> > > > > > >> > > > > > > > >>>> onurkaraman.apa...@gmail.com
> > >> > > > > > >> > > > > > > > >>>>> wrote:
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>>> Results without compression:
> > >> > > > > > >> > > > > > > > >>>>> 1 consumer 292383 bytes
> > >> > > > > > >> > > > > > > > >>>>> 5 consumers 1079579 bytes * the
> tipping
> > >> > point
> > >> > > > > > >> > > > > > > > >>>>> 10 consumers 1855018 bytes
> > >> > > > > > >> > > > > > > > >>>>> 20 consumers 2780220 bytes
> > >> > > > > > >> > > > > > > > >>>>> 30 consumers 3705422 bytes
> > >> > > > > > >> > > > > > > > >>>>> 40 consumers 4630624 bytes
> > >> > > > > > >> > > > > > > > >>>>> 50 consumers 5555826 bytes
> > >> > > > > > >> > > > > > > > >>>>> 60 consumers 6480788 bytes
> > >> > > > > > >> > > > > > > > >>>>> 70 consumers 7405750 bytes
> > >> > > > > > >> > > > > > > > >>>>> 80 consumers 8330712 bytes
> > >> > > > > > >> > > > > > > > >>>>> 90 consumers 9255674 bytes
> > >> > > > > > >> > > > > > > > >>>>> 100 consumers 10180636 bytes
> > >> > > > > > >> > > > > > > > >>>>>
> > >> > > > > > >> > > > > > > > >>>>> So it looks like gzip compression
> > shrinks
> > >> > the
> > >> > > > > > message
> > >> > > > > > >> > size
> > >> > > > > > >> > > by
> > >> > > > > > >> > > > > 4x.
> > >> > > > > > >> > > > > > > > >>>>>
> > >> > > > > > >> > > > > > > > >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun
> > Rao
> > >> <
> > >> > > > > > >> > j...@confluent.io
> > >> > > > > > >> > > >
> > >> > > > > > >> > > > > > wrote:
> > >> > > > > > >> > > > > > > > >>>>>
> > >> > > > > > >> > > > > > > > >>>>>> Onur,
> > >> > > > > > >> > > > > > > > >>>>>>
> > >> > > > > > >> > > > > > > > >>>>>> Thanks for the investigation.
> > >> > > > > > >> > > > > > > > >>>>>>
> > >> > > > > > >> > > > > > > > >>>>>> Another option is to just fix how we
> > >> deal
> > >> > > with
> > >> > > > > the
> > >> > > > > > >> case
> > >> > > > > > >> > > > when a
> > >> > > > > > >> > > > > > > > >>> message
> > >> > > > > > >> > > > > > > > >>>> is
> > >> > > > > > >> > > > > > > > >>>>>> larger than the fetch size. Today,
> if
> > >> the
> > >> > > fetch
> > >> > > > > > size
> > >> > > > > > >> is
> > >> > > > > > >> > > > > smaller
> > >> > > > > > >> > > > > > > > >> than
> > >> > > > > > >> > > > > > > > >>>> the
> > >> > > > > > >> > > > > > > > >>>>>> fetch size, the consumer will get
> > stuck.
> > >> > > > Instead,
> > >> > > > > > we
> > >> > > > > > >> can
> > >> > > > > > >> > > > > simply
> > >> > > > > > >> > > > > > > > >>> return
> > >> > > > > > >> > > > > > > > >>>>> the
> > >> > > > > > >> > > > > > > > >>>>>> full message if it's larger than the
> > >> fetch
> > >> > > size
> > >> > > > > w/o
> > >> > > > > > >> > > > requiring
> > >> > > > > > >> > > > > > the
> > >> > > > > > >> > > > > > > > >>>>> consumer
> > >> > > > > > >> > > > > > > > >>>>>> to manually adjust the fetch size.
> On
> > >> the
> > >> > > > broker
> > >> > > > > > >> side,
> > >> > > > > > >> > to
> > >> > > > > > >> > > > > serve
> > >> > > > > > >> > > > > > a
> > >> > > > > > >> > > > > > > > >>> fetch
> > >> > > > > > >> > > > > > > > >>>>>> request, we already do an index
> lookup
> > >> and
> > >> > > then
> > >> > > > > > scan
> > >> > > > > > >> the
> > >> > > > > > >> > > > log a
> > >> > > > > > >> > > > > > bit
> > >> > > > > > >> > > > > > > > >> to
> > >> > > > > > >> > > > > > > > >>>>> find
> > >> > > > > > >> > > > > > > > >>>>>> the message with the requested
> offset.
> > >> We
> > >> > can
> > >> > > > > just
> > >> > > > > > >> check
> > >> > > > > > >> > > the
> > >> > > > > > >> > > > > > size
> > >> > > > > > >> > > > > > > > >> of
> > >> > > > > > >> > > > > > > > >>>> that
> > >> > > > > > >> > > > > > > > >>>>>> message and return the full message
> if
> > >> its
> > >> > > size
> > >> > > > > is
> > >> > > > > > >> > larger
> > >> > > > > > >> > > > than
> > >> > > > > > >> > > > > > the
> > >> > > > > > >> > > > > > > > >>>> fetch
> > >> > > > > > >> > > > > > > > >>>>>> size. This way, fetch size is really
> > for
> > >> > > > > > performance
> > >> > > > > > >> > > > > > optimization,
> > >> > > > > > >> > > > > > > > >>> i.e.
> > >> > > > > > >> > > > > > > > >>>>> in
> > >> > > > > > >> > > > > > > > >>>>>> the common case, we will not return
> > more
> > >> > > bytes
> > >> > > > > than
> > >> > > > > > >> > fetch
> > >> > > > > > >> > > > > size,
> > >> > > > > > >> > > > > > > but
> > >> > > > > > >> > > > > > > > >>> if
> > >> > > > > > >> > > > > > > > >>>>>> there is a large message, we will
> > return
> > >> > more
> > >> > > > > bytes
> > >> > > > > > >> than
> > >> > > > > > >> > > the
> > >> > > > > > >> > > > > > > > >>> specified
> > >> > > > > > >> > > > > > > > >>>>>> fetch size. In practice, large
> > messages
> > >> are
> > >> > > > rare.
> > >> > > > > > >> So, it
> > >> > > > > > >> > > > > > shouldn't
> > >> > > > > > >> > > > > > > > >>>>> increase
> > >> > > > > > >> > > > > > > > >>>>>> the memory consumption on the client
> > too
> > >> > > much.
> > >> > > > > > >> > > > > > > > >>>>>>
> > >> > > > > > >> > > > > > > > >>>>>> Jun
> > >> > > > > > >> > > > > > > > >>>>>>
> > >> > > > > > >> > > > > > > > >>>>>> On Sat, May 21, 2016 at 3:34 AM,
> Onur
> > >> > > Karaman <
> > >> > > > > > >> > > > > > > > >>>>>> onurkaraman.apa...@gmail.com>
> > >> > > > > > >> > > > > > > > >>>>>> wrote:
> > >> > > > > > >> > > > > > > > >>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> Hey everyone. So I started doing
> some
> > >> > tests
> > >> > > on
> > >> > > > > the
> > >> > > > > > >> new
> > >> > > > > > >> > > > > > > > >>>>>> consumer/coordinator
> > >> > > > > > >> > > > > > > > >>>>>>> to see if it could handle more
> > >> strenuous
> > >> > use
> > >> > > > > cases
> > >> > > > > > >> like
> > >> > > > > > >> > > > > > mirroring
> > >> > > > > > >> > > > > > > > >>>>>> clusters
> > >> > > > > > >> > > > > > > > >>>>>>> with thousands of topics and
> thought
> > >> I'd
> > >> > > share
> > >> > > > > > >> > whatever I
> > >> > > > > > >> > > > > have
> > >> > > > > > >> > > > > > so
> > >> > > > > > >> > > > > > > > >>>> far.
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> The scalability limit: the amount
> of
> > >> group
> > >> > > > > > metadata
> > >> > > > > > >> we
> > >> > > > > > >> > > can
> > >> > > > > > >> > > > > fit
> > >> > > > > > >> > > > > > > > >> into
> > >> > > > > > >> > > > > > > > >>>> one
> > >> > > > > > >> > > > > > > > >>>>>>> message
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> Some background:
> > >> > > > > > >> > > > > > > > >>>>>>> Client-side assignment is
> implemented
> > >> in
> > >> > two
> > >> > > > > > phases
> > >> > > > > > >> > > > > > > > >>>>>>> 1. a PreparingRebalance phase that
> > >> > > identifies
> > >> > > > > > >> members
> > >> > > > > > >> > of
> > >> > > > > > >> > > > the
> > >> > > > > > >> > > > > > > > >> group
> > >> > > > > > >> > > > > > > > >>>> and
> > >> > > > > > >> > > > > > > > >>>>>>> aggregates member subscriptions.
> > >> > > > > > >> > > > > > > > >>>>>>> 2. an AwaitingSync phase that waits
> > for
> > >> > the
> > >> > > > > group
> > >> > > > > > >> > leader
> > >> > > > > > >> > > to
> > >> > > > > > >> > > > > > > > >> decide
> > >> > > > > > >> > > > > > > > >>>>> member
> > >> > > > > > >> > > > > > > > >>>>>>> assignments based on the member
> > >> > > subscriptions
> > >> > > > > > across
> > >> > > > > > >> > the
> > >> > > > > > >> > > > > group.
> > >> > > > > > >> > > > > > > > >>>>>>>  - The leader announces this
> decision
> > >> > with a
> > >> > > > > > >> > > > > SyncGroupRequest.
> > >> > > > > > >> > > > > > > > >> The
> > >> > > > > > >> > > > > > > > >>>>>>> GroupCoordinator handles
> > >> SyncGroupRequests
> > >> > > by
> > >> > > > > > >> appending
> > >> > > > > > >> > > all
> > >> > > > > > >> > > > > > group
> > >> > > > > > >> > > > > > > > >>>> state
> > >> > > > > > >> > > > > > > > >>>>>>> into a single message under the
> > >> > > > > __consumer_offsets
> > >> > > > > > >> > topic.
> > >> > > > > > >> > > > > This
> > >> > > > > > >> > > > > > > > >>>> message
> > >> > > > > > >> > > > > > > > >>>>> is
> > >> > > > > > >> > > > > > > > >>>>>>> keyed on the group id and contains
> > each
> > >> > > member
> > >> > > > > > >> > > subscription
> > >> > > > > > >> > > > > as
> > >> > > > > > >> > > > > > > > >> well
> > >> > > > > > >> > > > > > > > >>>> as
> > >> > > > > > >> > > > > > > > >>>>>> the
> > >> > > > > > >> > > > > > > > >>>>>>> decided assignment for each member.
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> The environment:
> > >> > > > > > >> > > > > > > > >>>>>>> - one broker
> > >> > > > > > >> > > > > > > > >>>>>>> - one __consumer_offsets partition
> > >> > > > > > >> > > > > > > > >>>>>>> - offsets.topic.compression.codec=1
> > //
> > >> > this
> > >> > > is
> > >> > > > > > gzip
> > >> > > > > > >> > > > > > > > >>>>>>> - broker has my pending KAFKA-3718
> > >> patch
> > >> > > that
> > >> > > > > > >> actually
> > >> > > > > > >> > > > makes
> > >> > > > > > >> > > > > > use
> > >> > > > > > >> > > > > > > > >> of
> > >> > > > > > >> > > > > > > > >>>>>>> offsets.topic.compression.codec:
> > >> > > > > > >> > > > > > > > >>>>>>
> > >> https://github.com/apache/kafka/pull/1394
> > >> > > > > > >> > > > > > > > >>>>>>> - around 3000 topics. This is an
> > actual
> > >> > > subset
> > >> > > > > of
> > >> > > > > > >> > topics
> > >> > > > > > >> > > > from
> > >> > > > > > >> > > > > > one
> > >> > > > > > >> > > > > > > > >>> of
> > >> > > > > > >> > > > > > > > >>>>> our
> > >> > > > > > >> > > > > > > > >>>>>>> clusters.
> > >> > > > > > >> > > > > > > > >>>>>>> - topics have 8 partitions
> > >> > > > > > >> > > > > > > > >>>>>>> - topics are 25 characters long on
> > >> average
> > >> > > > > > >> > > > > > > > >>>>>>> - one group with a varying number
> of
> > >> > > consumers
> > >> > > > > > each
> > >> > > > > > >> > > > hardcoded
> > >> > > > > > >> > > > > > > > >> with
> > >> > > > > > >> > > > > > > > >>>> all
> > >> > > > > > >> > > > > > > > >>>>>> the
> > >> > > > > > >> > > > > > > > >>>>>>> topics just to make the tests more
> > >> > > consistent.
> > >> > > > > > >> > > wildcarding
> > >> > > > > > >> > > > > with
> > >> > > > > > >> > > > > > > > >> .*
> > >> > > > > > >> > > > > > > > >>>>> should
> > >> > > > > > >> > > > > > > > >>>>>>> have the same effect once the
> > >> subscription
> > >> > > > hits
> > >> > > > > > the
> > >> > > > > > >> > > > > coordinator
> > >> > > > > > >> > > > > > > > >> as
> > >> > > > > > >> > > > > > > > >>>> the
> > >> > > > > > >> > > > > > > > >>>>>>> subscription has already been fully
> > >> > expanded
> > >> > > > out
> > >> > > > > > to
> > >> > > > > > >> the
> > >> > > > > > >> > > > list
> > >> > > > > > >> > > > > of
> > >> > > > > > >> > > > > > > > >>>> topics
> > >> > > > > > >> > > > > > > > >>>>> by
> > >> > > > > > >> > > > > > > > >>>>>>> the consumers.
> > >> > > > > > >> > > > > > > > >>>>>>> - I added some log messages to
> > >> Log.scala
> > >> > to
> > >> > > > > print
> > >> > > > > > >> out
> > >> > > > > > >> > the
> > >> > > > > > >> > > > > > message
> > >> > > > > > >> > > > > > > > >>>> sizes
> > >> > > > > > >> > > > > > > > >>>>>>> after compression
> > >> > > > > > >> > > > > > > > >>>>>>> - there are no producers at all and
> > >> auto
> > >> > > > commits
> > >> > > > > > are
> > >> > > > > > >> > > > > disabled.
> > >> > > > > > >> > > > > > > > >> The
> > >> > > > > > >> > > > > > > > >>>> only
> > >> > > > > > >> > > > > > > > >>>>>>> topic with messages getting added
> is
> > >> the
> > >> > > > > > >> > > __consumer_offsets
> > >> > > > > > >> > > > > > topic
> > >> > > > > > >> > > > > > > > >>> and
> > >> > > > > > >> > > > > > > > >>>>>>> they're only from storing group
> > >> metadata
> > >> > > while
> > >> > > > > > >> > processing
> > >> > > > > > >> > > > > > > > >>>>>>> SyncGroupRequests.
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> Results:
> > >> > > > > > >> > > > > > > > >>>>>>> The results below show that we
> exceed
> > >> the
> > >> > > > > 1000012
> > >> > > > > > >> byte
> > >> > > > > > >> > > > > > > > >>>>>>> KafkaConfig.messageMaxBytes limit
> > >> > relatively
> > >> > > > > > quickly
> > >> > > > > > >> > > > (between
> > >> > > > > > >> > > > > > > > >> 30-40
> > >> > > > > > >> > > > > > > > >>>>>>> consumers):
> > >> > > > > > >> > > > > > > > >>>>>>> 1 consumer 54739 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 5 consumers 261524 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 10 consumers 459804 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 20 consumers 702499 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 30 consumers 930525 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 40 consumers 1115657 bytes * the
> > >> tipping
> > >> > > point
> > >> > > > > > >> > > > > > > > >>>>>>> 50 consumers 1363112 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 60 consumers 1598621 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 70 consumers 1837359 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 80 consumers 2066934 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 90 consumers 2310970 bytes
> > >> > > > > > >> > > > > > > > >>>>>>> 100 consumers 2542735 bytes
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> Note that the growth itself is
> pretty
> > >> > > gradual.
> > >> > > > > > >> Plotting
> > >> > > > > > >> > > the
> > >> > > > > > >> > > > > > > > >> points
> > >> > > > > > >> > > > > > > > >>>>> makes
> > >> > > > > > >> > > > > > > > >>>>>> it
> > >> > > > > > >> > > > > > > > >>>>>>> look roughly linear w.r.t the
> number
> > of
> > >> > > > > consumers:
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>
> > >> > > > > > >> > > > > > > > >>>>>
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>
> > >> > > > > > >> > > > > > > > >>
> > >> > > > > > >> > > > > > > >
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > >
> > >> > > > > > >> > > > >
> > >> > > > > > >> > > >
> > >> > > > > > >> > >
> > >> > > > > > >> >
> > >> > > > > > >>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> Also note that these numbers aren't
> > >> > averages
> > >> > > > or
> > >> > > > > > >> medians
> > >> > > > > > >> > > or
> > >> > > > > > >> > > > > > > > >> anything
> > >> > > > > > >> > > > > > > > >>>>> like
> > >> > > > > > >> > > > > > > > >>>>>>> that. It's just the byte size from
> a
> > >> given
> > >> > > > run.
> > >> > > > > I
> > >> > > > > > >> did
> > >> > > > > > >> > run
> > >> > > > > > >> > > > > them
> > >> > > > > > >> > > > > > a
> > >> > > > > > >> > > > > > > > >>> few
> > >> > > > > > >> > > > > > > > >>>>>> times
> > >> > > > > > >> > > > > > > > >>>>>>> and saw similar results.
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> Impact:
> > >> > > > > > >> > > > > > > > >>>>>>> Even after adding gzip to the
> > >> > > > __consumer_offsets
> > >> > > > > > >> topic
> > >> > > > > > >> > > with
> > >> > > > > > >> > > > > my
> > >> > > > > > >> > > > > > > > >>>> pending
> > >> > > > > > >> > > > > > > > >>>>>>> KAFKA-3718 patch, the AwaitingSync
> > >> phase
> > >> > of
> > >> > > > the
> > >> > > > > > >> group
> > >> > > > > > >> > > fails
> > >> > > > > > >> > > > > > with
> > >> > > > > > >> > > > > > > > >>>>>>> RecordTooLargeException. This means
> > the
> > >> > > > combined
> > >> > > > > > >> size
> > >> > > > > > >> > of
> > >> > > > > > >> > > > each
> > >> > > > > > >> > > > > > > > >>>> member's
> > >> > > > > > >> > > > > > > > >>>>>>> subscriptions and assignments
> > exceeded
> > >> the
> > >> > > > > > >> > > > > > > > >>>> KafkaConfig.messageMaxBytes
> > >> > > > > > >> > > > > > > > >>>>> of
> > >> > > > > > >> > > > > > > > >>>>>>> 1000012 bytes. The group ends up
> > dying.
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> Options:
> > >> > > > > > >> > > > > > > > >>>>>>> 1. Config change: reduce the number
> > of
> > >> > > > consumers
> > >> > > > > > in
> > >> > > > > > >> the
> > >> > > > > > >> > > > > group.
> > >> > > > > > >> > > > > > > > >> This
> > >> > > > > > >> > > > > > > > >>>>> isn't
> > >> > > > > > >> > > > > > > > >>>>>>> always a realistic answer in more
> > >> > strenuous
> > >> > > > use
> > >> > > > > > >> cases
> > >> > > > > > >> > > like
> > >> > > > > > >> > > > > > > > >>>> MirrorMaker
> > >> > > > > > >> > > > > > > > >>>>>>> clusters or for auditing.
> > >> > > > > > >> > > > > > > > >>>>>>> 2. Config change: split the group
> > into
> > >> > > smaller
> > >> > > > > > >> groups
> > >> > > > > > >> > > which
> > >> > > > > > >> > > > > > > > >>> together
> > >> > > > > > >> > > > > > > > >>>>> will
> > >> > > > > > >> > > > > > > > >>>>>>> get full coverage of the topics.
> This
> > >> > gives
> > >> > > > each
> > >> > > > > > >> group
> > >> > > > > > >> > > > > member a
> > >> > > > > > >> > > > > > > > >>>> smaller
> > >> > > > > > >> > > > > > > > >>>>>>> subscription.(ex: g1 has topics
> > >> starting
> > >> > > with
> > >> > > > > a-m
> > >> > > > > > >> while
> > >> > > > > > >> > > g2
> > >> > > > > > >> > > > > has
> > >> > > > > > >> > > > > > > > >>> topics
> > >> > > > > > >> > > > > > > > >>>>>>> starting ith n-z). This would be
> > >> > > operationally
> > >> > > > > > >> painful
> > >> > > > > > >> > to
> > >> > > > > > >> > > > > > manage.
> > >> > > > > > >> > > > > > > > >>>>>>> 3. Config change: split the topics
> > >> among
> > >> > > > members
> > >> > > > > > of
> > >> > > > > > >> the
> > >> > > > > > >> > > > > group.
> > >> > > > > > >> > > > > > > > >>> Again
> > >> > > > > > >> > > > > > > > >>>>> this
> > >> > > > > > >> > > > > > > > >>>>>>> gives each group member a smaller
> > >> > > > subscription.
> > >> > > > > > This
> > >> > > > > > >> > > would
> > >> > > > > > >> > > > > also
> > >> > > > > > >> > > > > > > > >> be
> > >> > > > > > >> > > > > > > > >>>>>>> operationally painful to manage.
> > >> > > > > > >> > > > > > > > >>>>>>> 4. Config change: bump up
> > >> > > > > > >> KafkaConfig.messageMaxBytes
> > >> > > > > > >> > (a
> > >> > > > > > >> > > > > > > > >>> topic-level
> > >> > > > > > >> > > > > > > > >>>>>>> config) and
> > >> > KafkaConfig.replicaFetchMaxBytes
> > >> > > > (a
> > >> > > > > > >> > > > broker-level
> > >> > > > > > >> > > > > > > > >>> config).
> > >> > > > > > >> > > > > > > > >>>>>>> Applying messageMaxBytes to just
> the
> > >> > > > > > >> __consumer_offsets
> > >> > > > > > >> > > > topic
> > >> > > > > > >> > > > > > > > >> seems
> > >> > > > > > >> > > > > > > > >>>>>>> relatively harmless, but bumping up
> > the
> > >> > > > > > broker-level
> > >> > > > > > >> > > > > > > > >>>>> replicaFetchMaxBytes
> > >> > > > > > >> > > > > > > > >>>>>>> would probably need more attention.
> > >> > > > > > >> > > > > > > > >>>>>>> 5. Config change: try different
> > >> > compression
> > >> > > > > > codecs.
> > >> > > > > > >> > Based
> > >> > > > > > >> > > > on
> > >> > > > > > >> > > > > 2
> > >> > > > > > >> > > > > > > > >>>> minutes
> > >> > > > > > >> > > > > > > > >>>>> of
> > >> > > > > > >> > > > > > > > >>>>>>> googling, it seems like lz4 and
> > snappy
> > >> are
> > >> > > > > faster
> > >> > > > > > >> than
> > >> > > > > > >> > > gzip
> > >> > > > > > >> > > > > but
> > >> > > > > > >> > > > > > > > >>> have
> > >> > > > > > >> > > > > > > > >>>>>> worse
> > >> > > > > > >> > > > > > > > >>>>>>> compression, so this probably won't
> > >> help.
> > >> > > > > > >> > > > > > > > >>>>>>> 6. Implementation change: support
> > >> sending
> > >> > > the
> > >> > > > > > regex
> > >> > > > > > >> > over
> > >> > > > > > >> > > > the
> > >> > > > > > >> > > > > > wire
> > >> > > > > > >> > > > > > > > >>>>> instead
> > >> > > > > > >> > > > > > > > >>>>>>> of the fully expanded topic
> > >> > subscriptions. I
> > >> > > > > think
> > >> > > > > > >> > people
> > >> > > > > > >> > > > > said
> > >> > > > > > >> > > > > > in
> > >> > > > > > >> > > > > > > > >>> the
> > >> > > > > > >> > > > > > > > >>>>>> past
> > >> > > > > > >> > > > > > > > >>>>>>> that different languages have
> subtle
> > >> > > > differences
> > >> > > > > > in
> > >> > > > > > >> > > regex,
> > >> > > > > > >> > > > so
> > >> > > > > > >> > > > > > > > >> this
> > >> > > > > > >> > > > > > > > >>>>>> doesn't
> > >> > > > > > >> > > > > > > > >>>>>>> play nicely with cross-language
> > groups.
> > >> > > > > > >> > > > > > > > >>>>>>> 7. Implementation change: maybe we
> > can
> > >> > > reverse
> > >> > > > > the
> > >> > > > > > >> > > mapping?
> > >> > > > > > >> > > > > > > > >> Instead
> > >> > > > > > >> > > > > > > > >>>> of
> > >> > > > > > >> > > > > > > > >>>>>>> mapping from member to
> subscriptions,
> > >> we
> > >> > can
> > >> > > > > map a
> > >> > > > > > >> > > > > subscription
> > >> > > > > > >> > > > > > > > >> to
> > >> > > > > > >> > > > > > > > >>> a
> > >> > > > > > >> > > > > > > > >>>>> list
> > >> > > > > > >> > > > > > > > >>>>>>> of members.
> > >> > > > > > >> > > > > > > > >>>>>>> 8. Implementation change: maybe we
> > can
> > >> try
> > >> > > to
> > >> > > > > > break
> > >> > > > > > >> > apart
> > >> > > > > > >> > > > the
> > >> > > > > > >> > > > > > > > >>>>>> subscription
> > >> > > > > > >> > > > > > > > >>>>>>> and assignments from the same
> > >> > > SyncGroupRequest
> > >> > > > > > into
> > >> > > > > > >> > > > multiple
> > >> > > > > > >> > > > > > > > >>> records?
> > >> > > > > > >> > > > > > > > >>>>>> They
> > >> > > > > > >> > > > > > > > >>>>>>> can still go to the same message
> set
> > >> and
> > >> > get
> > >> > > > > > >> appended
> > >> > > > > > >> > > > > together.
> > >> > > > > > >> > > > > > > > >>> This
> > >> > > > > > >> > > > > > > > >>>>> way
> > >> > > > > > >> > > > > > > > >>>>>>> the limit become the segment size,
> > >> which
> > >> > > > > shouldn't
> > >> > > > > > >> be a
> > >> > > > > > >> > > > > > problem.
> > >> > > > > > >> > > > > > > > >>> This
> > >> > > > > > >> > > > > > > > >>>>> can
> > >> > > > > > >> > > > > > > > >>>>>>> be tricky to get right because
> we're
> > >> > > currently
> > >> > > > > > >> keying
> > >> > > > > > >> > > these
> > >> > > > > > >> > > > > > > > >>> messages
> > >> > > > > > >> > > > > > > > >>>> on
> > >> > > > > > >> > > > > > > > >>>>>> the
> > >> > > > > > >> > > > > > > > >>>>>>> group, so I think records from the
> > same
> > >> > > > > rebalance
> > >> > > > > > >> might
> > >> > > > > > >> > > > > > > > >>> accidentally
> > >> > > > > > >> > > > > > > > >>>>>>> compact one another, but my
> > >> understanding
> > >> > of
> > >> > > > > > >> compaction
> > >> > > > > > >> > > > isn't
> > >> > > > > > >> > > > > > > > >> that
> > >> > > > > > >> > > > > > > > >>>>> great.
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> Todo:
> > >> > > > > > >> > > > > > > > >>>>>>> It would be interesting to rerun
> the
> > >> tests
> > >> > > > with
> > >> > > > > no
> > >> > > > > > >> > > > > compression
> > >> > > > > > >> > > > > > > > >> just
> > >> > > > > > >> > > > > > > > >>>> to
> > >> > > > > > >> > > > > > > > >>>>>> see
> > >> > > > > > >> > > > > > > > >>>>>>> how much gzip is helping but it's
> > >> getting
> > >> > > > late.
> > >> > > > > > >> Maybe
> > >> > > > > > >> > > > > tomorrow?
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>> - Onur
> > >> > > > > > >> > > > > > > > >>>>>>>
> > >> > > > > > >> > > > > > > > >>>>>>
> > >> > > > > > >> > > > > > > > >>>>>
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>> --
> > >> > > > > > >> > > > > > > > >>>> -- Guozhang
> > >> > > > > > >> > > > > > > > >>>>
> > >> > > > > > >> > > > > > > > >>>
> > >> > > > > > >> > > > > > > > >>
> > >> > > > > > >> > > > > > > > >>
> > >> > > > > > >> > > > > > > > >>
> > >> > > > > > >> > > > > > > > >> --
> > >> > > > > > >> > > > > > > > >> -- Guozhang
> > >> > > > > > >> > > > > > > > >>
> > >> > > > > > >> > > > > > > >
> > >> > > > > > >> > > > > > > >
> > >> > > > > > >> > > > > > >
> > >> > > > > > >> > > > > >
> > >> > > > > > >> > > > >
> > >> > > > > > >> > > >
> > >> > > > > > >> > >
> > >> > > > > > >> >
> > >> > > > > > >> >
> > >> > > > > > >> >
> > >> > > > > > >> > --
> > >> > > > > > >> > -- Guozhang
> > >> > > > > > >> >
> > >> > > > > > >>
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to