> On May 23, 2016, at 10:59 AM, Jason Gustafson <ja...@confluent.io> wrote:
> 
> 2. Maybe there's a better way to lay out the assignment without needing to
> explicitly repeat the topic? For example, the leader could sort the topics
> for each member and just use an integer to represent the index of each
> topic within the sorted list (note this depends on the subscription
> including the full topic list).
> 
> Assignment -> [TopicIndex [Partition]]
> 

Jason, doesn't gzip (or other compression) basically do this? If the topic is a 
string and the topic is repeated throughout, won't compression basically 
replace all repeated instances of it with an index reference to the full string?

-James

> You could even combine these two options so that you have only 3 integers
> for each topic assignment:
> 
> Assignment -> [TopicIndex MinPartition MaxPartition]
> 
> There may even be better options with a little more thought. All of this is
> just part of the client-side protocol, so it wouldn't require any version
> bumps on the broker. What do you think?
> 
> Thanks,
> Jason
> 
> 
> 
> 
> On Mon, May 23, 2016 at 9:17 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> The original concern is that regex may not be efficiently supported
>> across-languages, but if there is a neat workaround I would love to learn.
>> 
>> Guozhang
>> 
>> On Mon, May 23, 2016 at 5:31 AM, Ismael Juma <ism...@juma.me.uk> wrote:
>> 
>>> +1 to Jun's suggestion.
>>> 
>>> Having said that, as a general point, I think we should consider
>> supporting
>>> topic patterns in the wire protocol. It requires some thinking for
>>> cross-language support, but it seems surmountable and it could make
>> certain
>>> operations a lot more efficient (the fact that a basic regex subscription
>>> causes the consumer to request metadata for all topics is not great).
>>> 
>>> Ismael
>>> 
>>> On Sun, May 22, 2016 at 11:49 PM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>> 
>>>> I like Jun's suggestion in changing the handling logics of single large
>>>> message on the consumer side.
>>>> 
>>>> As for the case of "a single group subscribing to 3000 topics", with
>> 100
>>>> consumers the 2.5Mb Gzip size is reasonable to me (when storing in ZK,
>> we
>>>> also have the znode limit which is set to 1Mb by default, though
>>> admittedly
>>>> it is only for one consumer). And if we do the change as Jun suggested,
>>>> 2.5Mb on follower's memory pressure is OK I think.
>>>> 
>>>> 
>>>> Guozhang
>>>> 
>>>> 
>>>> On Sat, May 21, 2016 at 12:51 PM, Onur Karaman <
>>>> onurkaraman.apa...@gmail.com
>>>>> wrote:
>>>> 
>>>>> Results without compression:
>>>>> 1 consumer 292383 bytes
>>>>> 5 consumers 1079579 bytes * the tipping point
>>>>> 10 consumers 1855018 bytes
>>>>> 20 consumers 2780220 bytes
>>>>> 30 consumers 3705422 bytes
>>>>> 40 consumers 4630624 bytes
>>>>> 50 consumers 5555826 bytes
>>>>> 60 consumers 6480788 bytes
>>>>> 70 consumers 7405750 bytes
>>>>> 80 consumers 8330712 bytes
>>>>> 90 consumers 9255674 bytes
>>>>> 100 consumers 10180636 bytes
>>>>> 
>>>>> So it looks like gzip compression shrinks the message size by 4x.
>>>>> 
>>>>> On Sat, May 21, 2016 at 9:47 AM, Jun Rao <j...@confluent.io> wrote:
>>>>> 
>>>>>> Onur,
>>>>>> 
>>>>>> Thanks for the investigation.
>>>>>> 
>>>>>> Another option is to just fix how we deal with the case when a
>>> message
>>>> is
>>>>>> larger than the fetch size. Today, if the fetch size is smaller
>> than
>>>> the
>>>>>> fetch size, the consumer will get stuck. Instead, we can simply
>>> return
>>>>> the
>>>>>> full message if it's larger than the fetch size w/o requiring the
>>>>> consumer
>>>>>> to manually adjust the fetch size. On the broker side, to serve a
>>> fetch
>>>>>> request, we already do an index lookup and then scan the log a bit
>> to
>>>>> find
>>>>>> the message with the requested offset. We can just check the size
>> of
>>>> that
>>>>>> message and return the full message if its size is larger than the
>>>> fetch
>>>>>> size. This way, fetch size is really for performance optimization,
>>> i.e.
>>>>> in
>>>>>> the common case, we will not return more bytes than fetch size, but
>>> if
>>>>>> there is a large message, we will return more bytes than the
>>> specified
>>>>>> fetch size. In practice, large messages are rare. So, it shouldn't
>>>>> increase
>>>>>> the memory consumption on the client too much.
>>>>>> 
>>>>>> Jun
>>>>>> 
>>>>>> On Sat, May 21, 2016 at 3:34 AM, Onur Karaman <
>>>>>> onurkaraman.apa...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hey everyone. So I started doing some tests on the new
>>>>>> consumer/coordinator
>>>>>>> to see if it could handle more strenuous use cases like mirroring
>>>>>> clusters
>>>>>>> with thousands of topics and thought I'd share whatever I have so
>>>> far.
>>>>>>> 
>>>>>>> The scalability limit: the amount of group metadata we can fit
>> into
>>>> one
>>>>>>> message
>>>>>>> 
>>>>>>> Some background:
>>>>>>> Client-side assignment is implemented in two phases
>>>>>>> 1. a PreparingRebalance phase that identifies members of the
>> group
>>>> and
>>>>>>> aggregates member subscriptions.
>>>>>>> 2. an AwaitingSync phase that waits for the group leader to
>> decide
>>>>> member
>>>>>>> assignments based on the member subscriptions across the group.
>>>>>>>  - The leader announces this decision with a SyncGroupRequest.
>> The
>>>>>>> GroupCoordinator handles SyncGroupRequests by appending all group
>>>> state
>>>>>>> into a single message under the __consumer_offsets topic. This
>>>> message
>>>>> is
>>>>>>> keyed on the group id and contains each member subscription as
>> well
>>>> as
>>>>>> the
>>>>>>> decided assignment for each member.
>>>>>>> 
>>>>>>> The environment:
>>>>>>> - one broker
>>>>>>> - one __consumer_offsets partition
>>>>>>> - offsets.topic.compression.codec=1 // this is gzip
>>>>>>> - broker has my pending KAFKA-3718 patch that actually makes use
>> of
>>>>>>> offsets.topic.compression.codec:
>>>>>> https://github.com/apache/kafka/pull/1394
>>>>>>> - around 3000 topics. This is an actual subset of topics from one
>>> of
>>>>> our
>>>>>>> clusters.
>>>>>>> - topics have 8 partitions
>>>>>>> - topics are 25 characters long on average
>>>>>>> - one group with a varying number of consumers each hardcoded
>> with
>>>> all
>>>>>> the
>>>>>>> topics just to make the tests more consistent. wildcarding with
>> .*
>>>>> should
>>>>>>> have the same effect once the subscription hits the coordinator
>> as
>>>> the
>>>>>>> subscription has already been fully expanded out to the list of
>>>> topics
>>>>> by
>>>>>>> the consumers.
>>>>>>> - I added some log messages to Log.scala to print out the message
>>>> sizes
>>>>>>> after compression
>>>>>>> - there are no producers at all and auto commits are disabled.
>> The
>>>> only
>>>>>>> topic with messages getting added is the __consumer_offsets topic
>>> and
>>>>>>> they're only from storing group metadata while processing
>>>>>>> SyncGroupRequests.
>>>>>>> 
>>>>>>> Results:
>>>>>>> The results below show that we exceed the 1000012 byte
>>>>>>> KafkaConfig.messageMaxBytes limit relatively quickly (between
>> 30-40
>>>>>>> consumers):
>>>>>>> 1 consumer 54739 bytes
>>>>>>> 5 consumers 261524 bytes
>>>>>>> 10 consumers 459804 bytes
>>>>>>> 20 consumers 702499 bytes
>>>>>>> 30 consumers 930525 bytes
>>>>>>> 40 consumers 1115657 bytes * the tipping point
>>>>>>> 50 consumers 1363112 bytes
>>>>>>> 60 consumers 1598621 bytes
>>>>>>> 70 consumers 1837359 bytes
>>>>>>> 80 consumers 2066934 bytes
>>>>>>> 90 consumers 2310970 bytes
>>>>>>> 100 consumers 2542735 bytes
>>>>>>> 
>>>>>>> Note that the growth itself is pretty gradual. Plotting the
>> points
>>>>> makes
>>>>>> it
>>>>>>> look roughly linear w.r.t the number of consumers:
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)
>>>>>>> 
>>>>>>> Also note that these numbers aren't averages or medians or
>> anything
>>>>> like
>>>>>>> that. It's just the byte size from a given run. I did run them a
>>> few
>>>>>> times
>>>>>>> and saw similar results.
>>>>>>> 
>>>>>>> Impact:
>>>>>>> Even after adding gzip to the __consumer_offsets topic with my
>>>> pending
>>>>>>> KAFKA-3718 patch, the AwaitingSync phase of the group fails with
>>>>>>> RecordTooLargeException. This means the combined size of each
>>>> member's
>>>>>>> subscriptions and assignments exceeded the
>>>> KafkaConfig.messageMaxBytes
>>>>> of
>>>>>>> 1000012 bytes. The group ends up dying.
>>>>>>> 
>>>>>>> Options:
>>>>>>> 1. Config change: reduce the number of consumers in the group.
>> This
>>>>> isn't
>>>>>>> always a realistic answer in more strenuous use cases like
>>>> MirrorMaker
>>>>>>> clusters or for auditing.
>>>>>>> 2. Config change: split the group into smaller groups which
>>> together
>>>>> will
>>>>>>> get full coverage of the topics. This gives each group member a
>>>> smaller
>>>>>>> subscription.(ex: g1 has topics starting with a-m while g2 has
>>> topics
>>>>>>> starting ith n-z). This would be operationally painful to manage.
>>>>>>> 3. Config change: split the topics among members of the group.
>>> Again
>>>>> this
>>>>>>> gives each group member a smaller subscription. This would also
>> be
>>>>>>> operationally painful to manage.
>>>>>>> 4. Config change: bump up KafkaConfig.messageMaxBytes (a
>>> topic-level
>>>>>>> config) and KafkaConfig.replicaFetchMaxBytes (a broker-level
>>> config).
>>>>>>> Applying messageMaxBytes to just the __consumer_offsets topic
>> seems
>>>>>>> relatively harmless, but bumping up the broker-level
>>>>> replicaFetchMaxBytes
>>>>>>> would probably need more attention.
>>>>>>> 5. Config change: try different compression codecs. Based on 2
>>>> minutes
>>>>> of
>>>>>>> googling, it seems like lz4 and snappy are faster than gzip but
>>> have
>>>>>> worse
>>>>>>> compression, so this probably won't help.
>>>>>>> 6. Implementation change: support sending the regex over the wire
>>>>> instead
>>>>>>> of the fully expanded topic subscriptions. I think people said in
>>> the
>>>>>> past
>>>>>>> that different languages have subtle differences in regex, so
>> this
>>>>>> doesn't
>>>>>>> play nicely with cross-language groups.
>>>>>>> 7. Implementation change: maybe we can reverse the mapping?
>> Instead
>>>> of
>>>>>>> mapping from member to subscriptions, we can map a subscription
>> to
>>> a
>>>>> list
>>>>>>> of members.
>>>>>>> 8. Implementation change: maybe we can try to break apart the
>>>>>> subscription
>>>>>>> and assignments from the same SyncGroupRequest into multiple
>>> records?
>>>>>> They
>>>>>>> can still go to the same message set and get appended together.
>>> This
>>>>> way
>>>>>>> the limit become the segment size, which shouldn't be a problem.
>>> This
>>>>> can
>>>>>>> be tricky to get right because we're currently keying these
>>> messages
>>>> on
>>>>>> the
>>>>>>> group, so I think records from the same rebalance might
>>> accidentally
>>>>>>> compact one another, but my understanding of compaction isn't
>> that
>>>>> great.
>>>>>>> 
>>>>>>> Todo:
>>>>>>> It would be interesting to rerun the tests with no compression
>> just
>>>> to
>>>>>> see
>>>>>>> how much gzip is helping but it's getting late. Maybe tomorrow?
>>>>>>> 
>>>>>>> - Onur
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> -- Guozhang
>>>> 
>>> 
>> 
>> 
>> 
>> --
>> -- Guozhang
>> 

Reply via email to