Hi Guozhang, It does not seem that RocksDB is the bottleneck because the global state restore listener logs show that the restore calls to globalConsumer#poll do not always return max.poll.records. A typical log looks like this:
21:20:22,530 Restored batch of 2000 records... 21:20:22,533 Restored batch of 819 records... 21:20:22,548 Restored batch of 2000 records... 21:20:22,552 Restored batch of 852 records... 21:20:22,566 Restored batch of 2000 records... 21:20:22,569 Restored batch of 824 records... 21:20:22,588 Restored batch of 0 records... 21:20:22,599 Restored batch of 2000 records... These logs lead me to think the records/sec bottleneck is somewhere from the Fetcher to the Broker and beyond, though I'm having a hard time pinpointing what the actual bottleneck is. Network metrics show both stream application and broker hosts are using at most 500 Mbit/s on a 1 Gbit/s network. Our records are on average around 400 bytes. Note that we have been using the default value of 1 for fetch.min.bytes. Yes, this large of a key space is expected. Taylor On Thu, Feb 7, 2019 at 8:14 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Taylor, > > Thanks for reporting the issue. One thing I can think of is that in 2.0.1 > RocksDB is already using batching-restorer to avoid restoring one record at > a time, so with larger batches of records, better throughput can be > achieved with given IOPS. > > On the other hand, since you mention compaction is already used but you > still got 15million per partition and 18 partitions. This means your key > space (assuming compactions leaves you one record per key already) is 15 * > 18 = 270 million, which is quite a large key space. Is that what you'd > expect? > > Guozhang > > On Wed, Feb 6, 2019 at 3:47 PM Taylor P <tdp002...@gmail.com> wrote: > > > Hi Patrik, > > > > I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380 will > > resolve this issue since our application is dependent on the global store > > being fully restored before the application can be considered healthy. It > > does not seem like KAFKA-7380 is aiming to address the nature of global > > stores restoring each partition sequentially - it is aiming to change the > > blocking nature of #start(). Restoring the global store partitions in > > parallel would definitely speed things up, though, and admittedly my > first > > thought when debugging this was "why isn't this restoring each partition > in > > parallel?". > > > > Changing our streams topology to avoid using a global store for such a > > large amount of data would be doable but it does seem like a significant > > amount of work. I am curious to know if anyone else is storing large > > amounts of data in global stores and whether there are any inherent > > limitations to the size of global stores. > > > > Our topic is already using compaction. > > > > Taylor > > > > On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pklei...@gmail.com> > wrote: > > > > > Hi Taylor > > > > > > We are facing the same issue, although on a smaller scale. > > > The main problem as you found is that the restoration is running > > > sequentially, this should be addressed in > > > https://issues.apache.org/jira/browse/KAFKA-7380, although there has > > been > > > no progress lately. > > > > > > On the other hand you could try re-evaluate if your problem can only be > > > solved with global state stores, in our case (both in streams as well > as > > > for interactive queries) we could solve it with local state stores too, > > > although only with more changes and more complexity in the topology. > > > > > > Not sure if it is applicable for your case, but have you looked into > > > compression for the topics? > > > > > > best regards > > > > > > Patrik > > > > > > On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > I am having issues with the global store taking a very long time to > > > restore > > > > during startup of a Kafka Streams 2.0.1 application. The global store > > is > > > > backed by a RocksDB persistent store and is added to the Streams > > topology > > > > in the following manner: https://pastebin.com/raw/VJutDyYe The > global > > > > store > > > > topic has approximately 15 million records per partition and 18 > > > partitions. > > > > The following global consumer settings are specified: > > > > > > > > poll.timeout.ms = 10 > > > > max.poll.records = 2000 > > > > max.partition.fetch.bytes = 1048576 > > > > fetch.max.bytes = 52428800 > > > > receive.buffer.bytes = 65536 > > > > > > > > I have tried tweaking the settings above on the consumer side, such > as > > > > increasing poll.timeout.ms to 2000, max.poll.records to 10000, and > > > > max.partition.fetch.bytes to 52428800, but it seems that I keep > > hitting a > > > > ceiling of restoring approximately 100,000 records per second. With > 15 > > > > million records per partition, it takes approximately 150 seconds to > > > > restore a single partition. With 18 partitions, it takes roughly 45 > > > minutes > > > > to fully restore the global store. > > > > > > > > Switching from HDDs to SSDs on the brokers' log directories made > > > > restoration roughly 25% faster overall, but this still feels slow. It > > > seems > > > > that I am hitting IOPS limits on the disks and am not even close to > > > hitting > > > > the throughput limits of the disks on either the broker or streams > > > > application side. > > > > > > > > How can I minimize restoration time of a global store? Are there > > settings > > > > that can increase throughput with the same number of IOPS? Ideally > > > > restoration of each partition could be done in parallel but I > recognize > > > > there is only a single global store thread. Bringing up a new > instance > > of > > > > the Kafka Streams application occurs on a potentially daily basis, so > > the > > > > restoration time is becoming more and more of a hassle. > > > > > > > > Thanks. > > > > > > > > Taylor > > > > > > > > > > > > -- > -- Guozhang >