To get a better sense of the limit and what we should be optimizing for, it
helps to look at the message format:
private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
  new Field("client_id", STRING),
  new Field("client_host", STRING),
  new Field("session_timeout", INT32),
  new Field("subscription", BYTES),
  new Field("assignment", BYTES))
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new
Field("protocol_type", STRING),
  new Field("generation", INT32),
  new Field("protocol", STRING),
  new Field("leader", STRING),
  new Field("members", new ArrayOf(MEMBER_METADATA_V0)))

Subscription => Version Topics UserData
  Version    => Int16
  Topics     => [String]
  UserData   => Bytes
Assignment => Version TopicPartitions
  Version         => int16
  TopicPartitions => [Topic Partitions]
    Topic         => String
    Partitions    => [int32]

The UserData isn't being used for range and roundrobin.

message size ~ sum(c_i*s_i + c_i*a_i for c_i in C)
C = the consumer group
c_i = the ith consumer in C
s_i = the subscription size for c_i
a_i = the assignment size for c_i

With today's implementation, trying out different assignment strategies
wouldn't make any difference since they all share the same subscription and
assignment serialization defined in ConsumerProtocol. As proof, I hit the
same limit with both range and roundrobin. My results shown in the original
post were using roundrobin.

There are basically two pieces we can optimize:
1. the subscriptions
2. the assignments

Subscriptions can be optimized in a couple ways:
1. allowing regex. this only helps consumers with pattern-based
subscriptions.
2. Option 7 that I had mentioned in the original post: broker-side
deduplication of subscriptions by mapping subscriptions to a list of
members.

Assignments also can be optimized with some tricks like the ones Jason
mentioned, but I think these end up being specific to the assignment
strategy, making it hard to keep a generic ConsumerProtocol.

I think it's worth thinking through our options some more.

On Mon, May 23, 2016 at 11:10 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Jason,
>
> It would definitely be interesting to try a few of these optimisations on a
> real world example to quantify the impact.
>
> Ismael
>
> On Mon, May 23, 2016 at 6:59 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Onur,
> >
> > Thanks for the investigation. I agree with Ismael that pushing regex or
> > some kind of patterns into the protocol would help for communicating
> > subscriptions and for avoiding unnecessary overhead when fetching topic
> > metadata, but it doesn't seem like it would address the main issue here
> > since the leader would still have to split the regex into the individual
> > topics in order to do the partition assignment. It feels like we may
> need a
> > different approach for representing assignments at this scale.
> >
> > One minor question I had is which assignment strategy you used in these
> > tests? Do you see any difference in overhead overall if you change
> between
> > "range" and "round-robin"? One thought that occurred to me is that you
> > could reduce the redundancy in the leader sync group by trying to limit
> the
> > number of consumers that received partitions from each topic. Ideally,
> each
> > topic would be given to exactly one member so that its name was repeated
> > only once in the leader's sync group. I guess the compression before
> > writing the group metadata would end up removing this redundancy anyway,
> > but still might be worth investigating.
> >
> > It actually seems to me that there's quite a bit of room for cleverness
> > here without changing the protocol. Here are a couple ideas:
> >
> > 1. Currently we list all of the assigned partitions explicitly in the
> > assignment:
> >
> > Assignment -> [Topic [Partition]]
> >
> > Alternatively, you could let the assignment contain just a minimum and
> > maximum partition:
> >
> > Assignment -> [Topic MinPartition MaxPartition]
> >
> > Obviously this would only fit range-based assignment approaches, but if
> you
> > have a lot of partitions per topic, this could be a big win.
> >
> > 2. Maybe there's a better way to lay out the assignment without needing
> to
> > explicitly repeat the topic? For example, the leader could sort the
> topics
> > for each member and just use an integer to represent the index of each
> > topic within the sorted list (note this depends on the subscription
> > including the full topic list).
> >
> > Assignment -> [TopicIndex [Partition]]
> >
> > You could even combine these two options so that you have only 3 integers
> > for each topic assignment:
> >
> > Assignment -> [TopicIndex MinPartition MaxPartition]
> >
> > There may even be better options with a little more thought. All of this
> is
> > just part of the client-side protocol, so it wouldn't require any version
> > bumps on the broker. What do you think?
> >
> > Thanks,
> > Jason
> >
> >
> >
> >
> > On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > The original concern is that regex may not be efficiently supported
> > > across-languages, but if there is a neat workaround I would love to
> > learn.
> > >
> > > Guozhang
> > >
> > > On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <ism...@juma.me.uk>
> wrote:
> > >
> > > > +1 to Jun's suggestion.
> > > >
> > > > Having said that, as a general point, I think we should consider
> > > supporting
> > > > topic patterns in the wire protocol. It requires some thinking for
> > > > cross-language support, but it seems surmountable and it could make
> > > certain
> > > > operations a lot more efficient (the fact that a basic regex
> > subscription
> > > > causes the consumer to request metadata for all topics is not great).
> > > >
> > > > Ismael
> > > >
> > > > On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > >
> > > > > I like Jun's suggestion in changing the handling logics of single
> > large
> > > > > message on the consumer side.
> > > > >
> > > > > As for the case of "a single group subscribing to 3000 topics",
> with
> > > 100
> > > > > consumers the 2.5Mb Gzip size is reasonable to me (when storing in
> > ZK,
> > > we
> > > > > also have the znode limit which is set to 1Mb by default, though
> > > > admittedly
> > > > > it is only for one consumer). And if we do the change as Jun
> > suggested,
> > > > > 2.5Mb on follower's memory pressure is OK I think.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
> > > > > onurkaraman.apa...@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Results without compression:
> > > > > > 1 consumer 292383 bytes
> > > > > > 5 consumers 1079579 bytes * the tipping point
> > > > > > 10 consumers 1855018 bytes
> > > > > > 20 consumers 2780220 bytes
> > > > > > 30 consumers 3705422 bytes
> > > > > > 40 consumers 4630624 bytes
> > > > > > 50 consumers 5555826 bytes
> > > > > > 60 consumers 6480788 bytes
> > > > > > 70 consumers 7405750 bytes
> > > > > > 80 consumers 8330712 bytes
> > > > > > 90 consumers 9255674 bytes
> > > > > > 100 consumers 10180636 bytes
> > > > > >
> > > > > > So it looks like gzip compression shrinks the message size by 4x.
> > > > > >
> > > > > > On Sat, May 21, 2016 at 9:47 AM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > > > Onur,
> > > > > > >
> > > > > > > Thanks for the investigation.
> > > > > > >
> > > > > > > Another option is to just fix how we deal with the case when a
> > > > message
> > > > > is
> > > > > > > larger than the fetch size. Today, if the fetch size is smaller
> > > than
> > > > > the
> > > > > > > fetch size, the consumer will get stuck. Instead, we can simply
> > > > return
> > > > > > the
> > > > > > > full message if it's larger than the fetch size w/o requiring
> the
> > > > > > consumer
> > > > > > > to manually adjust the fetch size. On the broker side, to
> serve a
> > > > fetch
> > > > > > > request, we already do an index lookup and then scan the log a
> > bit
> > > to
> > > > > > find
> > > > > > > the message with the requested offset. We can just check the
> size
> > > of
> > > > > that
> > > > > > > message and return the full message if its size is larger than
> > the
> > > > > fetch
> > > > > > > size. This way, fetch size is really for performance
> > optimization,
> > > > i.e.
> > > > > > in
> > > > > > > the common case, we will not return more bytes than fetch size,
> > but
> > > > if
> > > > > > > there is a large message, we will return more bytes than the
> > > > specified
> > > > > > > fetch size. In practice, large messages are rare. So, it
> > shouldn't
> > > > > > increase
> > > > > > > the memory consumption on the client too much.
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
> > > > > > > onurkaraman.apa...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey everyone. So I started doing some tests on the new
> > > > > > > consumer/coordinator
> > > > > > > > to see if it could handle more strenuous use cases like
> > mirroring
> > > > > > > clusters
> > > > > > > > with thousands of topics and thought I'd share whatever I
> have
> > so
> > > > > far.
> > > > > > > >
> > > > > > > > The scalability limit: the amount of group metadata we can
> fit
> > > into
> > > > > one
> > > > > > > > message
> > > > > > > >
> > > > > > > > Some background:
> > > > > > > > Client-side assignment is implemented in two phases
> > > > > > > > 1. a PreparingRebalance phase that identifies members of the
> > > group
> > > > > and
> > > > > > > > aggregates member subscriptions.
> > > > > > > > 2. an AwaitingSync phase that waits for the group leader to
> > > decide
> > > > > > member
> > > > > > > > assignments based on the member subscriptions across the
> group.
> > > > > > > >   - The leader announces this decision with a
> SyncGroupRequest.
> > > The
> > > > > > > > GroupCoordinator handles SyncGroupRequests by appending all
> > group
> > > > > state
> > > > > > > > into a single message under the __consumer_offsets topic.
> This
> > > > > message
> > > > > > is
> > > > > > > > keyed on the group id and contains each member subscription
> as
> > > well
> > > > > as
> > > > > > > the
> > > > > > > > decided assignment for each member.
> > > > > > > >
> > > > > > > > The environment:
> > > > > > > > - one broker
> > > > > > > > - one __consumer_offsets partition
> > > > > > > > - offsets.topic.compression.codec=1 // this is gzip
> > > > > > > > - broker has my pending KAFKA-3718 patch that actually makes
> > use
> > > of
> > > > > > > > offsets.topic.compression.codec:
> > > > > > > https://github.com/apache/kafka/pull/1394
> > > > > > > > - around 3000 topics. This is an actual subset of topics from
> > one
> > > > of
> > > > > > our
> > > > > > > > clusters.
> > > > > > > > - topics have 8 partitions
> > > > > > > > - topics are 25 characters long on average
> > > > > > > > - one group with a varying number of consumers each hardcoded
> > > with
> > > > > all
> > > > > > > the
> > > > > > > > topics just to make the tests more consistent. wildcarding
> with
> > > .*
> > > > > > should
> > > > > > > > have the same effect once the subscription hits the
> coordinator
> > > as
> > > > > the
> > > > > > > > subscription has already been fully expanded out to the list
> of
> > > > > topics
> > > > > > by
> > > > > > > > the consumers.
> > > > > > > > - I added some log messages to Log.scala to print out the
> > message
> > > > > sizes
> > > > > > > > after compression
> > > > > > > > - there are no producers at all and auto commits are
> disabled.
> > > The
> > > > > only
> > > > > > > > topic with messages getting added is the __consumer_offsets
> > topic
> > > > and
> > > > > > > > they're only from storing group metadata while processing
> > > > > > > > SyncGroupRequests.
> > > > > > > >
> > > > > > > > Results:
> > > > > > > > The results below show that we exceed the 1000012 byte
> > > > > > > > KafkaConfig.messageMaxBytes limit relatively quickly (between
> > > 30-40
> > > > > > > > consumers):
> > > > > > > > 1 consumer 54739 bytes
> > > > > > > > 5 consumers 261524 bytes
> > > > > > > > 10 consumers 459804 bytes
> > > > > > > > 20 consumers 702499 bytes
> > > > > > > > 30 consumers 930525 bytes
> > > > > > > > 40 consumers 1115657 bytes * the tipping point
> > > > > > > > 50 consumers 1363112 bytes
> > > > > > > > 60 consumers 1598621 bytes
> > > > > > > > 70 consumers 1837359 bytes
> > > > > > > > 80 consumers 2066934 bytes
> > > > > > > > 90 consumers 2310970 bytes
> > > > > > > > 100 consumers 2542735 bytes
> > > > > > > >
> > > > > > > > Note that the growth itself is pretty gradual. Plotting the
> > > points
> > > > > > makes
> > > > > > > it
> > > > > > > > look roughly linear w.r.t the number of consumers:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)
> > > > > > > >
> > > > > > > > Also note that these numbers aren't averages or medians or
> > > anything
> > > > > > like
> > > > > > > > that. It's just the byte size from a given run. I did run
> them
> > a
> > > > few
> > > > > > > times
> > > > > > > > and saw similar results.
> > > > > > > >
> > > > > > > > Impact:
> > > > > > > > Even after adding gzip to the __consumer_offsets topic with
> my
> > > > > pending
> > > > > > > > KAFKA-3718 patch, the AwaitingSync phase of the group fails
> > with
> > > > > > > > RecordTooLargeException. This means the combined size of each
> > > > > member's
> > > > > > > > subscriptions and assignments exceeded the
> > > > > KafkaConfig.messageMaxBytes
> > > > > > of
> > > > > > > > 1000012 bytes. The group ends up dying.
> > > > > > > >
> > > > > > > > Options:
> > > > > > > > 1. Config change: reduce the number of consumers in the
> group.
> > > This
> > > > > > isn't
> > > > > > > > always a realistic answer in more strenuous use cases like
> > > > > MirrorMaker
> > > > > > > > clusters or for auditing.
> > > > > > > > 2. Config change: split the group into smaller groups which
> > > > together
> > > > > > will
> > > > > > > > get full coverage of the topics. This gives each group
> member a
> > > > > smaller
> > > > > > > > subscription.(ex: g1 has topics starting with a-m while g2
> has
> > > > topics
> > > > > > > > starting ith n-z). This would be operationally painful to
> > manage.
> > > > > > > > 3. Config change: split the topics among members of the
> group.
> > > > Again
> > > > > > this
> > > > > > > > gives each group member a smaller subscription. This would
> also
> > > be
> > > > > > > > operationally painful to manage.
> > > > > > > > 4. Config change: bump up KafkaConfig.messageMaxBytes (a
> > > > topic-level
> > > > > > > > config) and KafkaConfig.replicaFetchMaxBytes (a broker-level
> > > > config).
> > > > > > > > Applying messageMaxBytes to just the __consumer_offsets topic
> > > seems
> > > > > > > > relatively harmless, but bumping up the broker-level
> > > > > > replicaFetchMaxBytes
> > > > > > > > would probably need more attention.
> > > > > > > > 5. Config change: try different compression codecs. Based on
> 2
> > > > > minutes
> > > > > > of
> > > > > > > > googling, it seems like lz4 and snappy are faster than gzip
> but
> > > > have
> > > > > > > worse
> > > > > > > > compression, so this probably won't help.
> > > > > > > > 6. Implementation change: support sending the regex over the
> > wire
> > > > > > instead
> > > > > > > > of the fully expanded topic subscriptions. I think people
> said
> > in
> > > > the
> > > > > > > past
> > > > > > > > that different languages have subtle differences in regex, so
> > > this
> > > > > > > doesn't
> > > > > > > > play nicely with cross-language groups.
> > > > > > > > 7. Implementation change: maybe we can reverse the mapping?
> > > Instead
> > > > > of
> > > > > > > > mapping from member to subscriptions, we can map a
> subscription
> > > to
> > > > a
> > > > > > list
> > > > > > > > of members.
> > > > > > > > 8. Implementation change: maybe we can try to break apart the
> > > > > > > subscription
> > > > > > > > and assignments from the same SyncGroupRequest into multiple
> > > > records?
> > > > > > > They
> > > > > > > > can still go to the same message set and get appended
> together.
> > > > This
> > > > > > way
> > > > > > > > the limit become the segment size, which shouldn't be a
> > problem.
> > > > This
> > > > > > can
> > > > > > > > be tricky to get right because we're currently keying these
> > > > messages
> > > > > on
> > > > > > > the
> > > > > > > > group, so I think records from the same rebalance might
> > > > accidentally
> > > > > > > > compact one another, but my understanding of compaction isn't
> > > that
> > > > > > great.
> > > > > > > >
> > > > > > > > Todo:
> > > > > > > > It would be interesting to rerun the tests with no
> compression
> > > just
> > > > > to
> > > > > > > see
> > > > > > > > how much gzip is helping but it's getting late. Maybe
> tomorrow?
> > > > > > > >
> > > > > > > > - Onur
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to