Hi Guozhang,

It does not seem that RocksDB is the bottleneck because the global state
restore listener logs show that the restore calls to globalConsumer#poll do
not always return max.poll.records. A typical log looks like this:

21:20:22,530 Restored batch of 2000 records...
21:20:22,533 Restored batch of 819 records...
21:20:22,548 Restored batch of 2000 records...
21:20:22,552 Restored batch of 852 records...
21:20:22,566 Restored batch of 2000 records...
21:20:22,569 Restored batch of 824 records...
21:20:22,588 Restored batch of 0 records...
21:20:22,599 Restored batch of 2000 records...

These logs lead me to think the records/sec bottleneck is somewhere from
the Fetcher to the Broker and beyond, though I'm having a hard time
pinpointing what the actual bottleneck is. Network metrics show both stream
application and broker hosts are using at most 500 Mbit/s on a 1 Gbit/s
network. Our records are on average around 400 bytes. Note that we have
been using the default value of 1 for fetch.min.bytes.

Yes, this large of a key space is expected.

Taylor

On Thu, Feb 7, 2019 at 8:14 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Taylor,
>
> Thanks for reporting the issue. One thing I can think of is that in 2.0.1
> RocksDB is already using batching-restorer to avoid restoring one record at
> a time, so with larger batches of records, better throughput can be
> achieved with given IOPS.
>
> On the other hand, since you mention compaction is already used but you
> still got 15million per partition and 18 partitions. This means your key
> space (assuming compactions leaves you one record per key already) is 15 *
> 18 = 270 million, which is quite a large key space. Is that what you'd
> expect?
>
> Guozhang
>
> On Wed, Feb 6, 2019 at 3:47 PM Taylor P <tdp002...@gmail.com> wrote:
>
> > Hi Patrik,
> >
> > I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380 will
> > resolve this issue since our application is dependent on the global store
> > being fully restored before the application can be considered healthy. It
> > does not seem like KAFKA-7380 is aiming to address the nature of global
> > stores restoring each partition sequentially - it is aiming to change the
> > blocking nature of #start(). Restoring the global store partitions in
> > parallel would definitely speed things up, though, and admittedly my
> first
> > thought when debugging this was "why isn't this restoring each partition
> in
> > parallel?".
> >
> > Changing our streams topology to avoid using a global store for such a
> > large amount of data would be doable but it does seem like a significant
> > amount of work. I am curious to know if anyone else is storing large
> > amounts of data in global stores and whether there are any inherent
> > limitations to the size of global stores.
> >
> > Our topic is already using compaction.
> >
> > Taylor
> >
> > On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pklei...@gmail.com>
> wrote:
> >
> > > Hi Taylor
> > >
> > > We are facing the same issue, although on a smaller scale.
> > > The main problem as you found is that the restoration is running
> > > sequentially, this should be addressed in
> > > https://issues.apache.org/jira/browse/KAFKA-7380, although there has
> > been
> > > no progress lately.
> > >
> > > On the other hand you could try re-evaluate if your problem can only be
> > > solved with global state stores, in our case (both in streams as well
> as
> > > for interactive queries) we could solve it with local state stores too,
> > > although only with more changes and more complexity in the topology.
> > >
> > > Not sure if it is applicable for your case, but have you looked into
> > > compression for the topics?
> > >
> > > best regards
> > >
> > > Patrik
> > >
> > > On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I am having issues with the global store taking a very long time to
> > > restore
> > > > during startup of a Kafka Streams 2.0.1 application. The global store
> > is
> > > > backed by a RocksDB persistent store and is added to the Streams
> > topology
> > > > in the following manner: https://pastebin.com/raw/VJutDyYe The
> global
> > > > store
> > > > topic has approximately 15 million records per partition and 18
> > > partitions.
> > > > The following global consumer settings are specified:
> > > >
> > > >     poll.timeout.ms = 10
> > > >     max.poll.records = 2000
> > > >     max.partition.fetch.bytes = 1048576
> > > >     fetch.max.bytes = 52428800
> > > >     receive.buffer.bytes = 65536
> > > >
> > > > I have tried tweaking the settings above on the consumer side, such
> as
> > > > increasing poll.timeout.ms to 2000, max.poll.records to 10000, and
> > > > max.partition.fetch.bytes to 52428800, but it seems that I keep
> > hitting a
> > > > ceiling of restoring approximately 100,000 records per second. With
> 15
> > > > million records per partition, it takes approximately 150 seconds to
> > > > restore a single partition. With 18 partitions, it takes roughly 45
> > > minutes
> > > > to fully restore the global store.
> > > >
> > > > Switching from HDDs to SSDs on the brokers' log directories made
> > > > restoration roughly 25% faster overall, but this still feels slow. It
> > > seems
> > > > that I am hitting IOPS limits on the disks and am not even close to
> > > hitting
> > > > the throughput limits of the disks on either the broker or streams
> > > > application side.
> > > >
> > > > How can I minimize restoration time of a global store? Are there
> > settings
> > > > that can increase throughput with the same number of IOPS? Ideally
> > > > restoration of each partition could be done in parallel but I
> recognize
> > > > there is only a single global store thread. Bringing up a new
> instance
> > of
> > > > the Kafka Streams application occurs on a potentially daily basis, so
> > the
> > > > restoration time is becoming more and more of a hassle.
> > > >
> > > > Thanks.
> > > >
> > > > Taylor
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to