Hello Taylor, Sorry for the late reply! And thanks for the updated information.
I'd recommend overriding some consumer configs via `StreamsConfig` (you can use the StreamsConfig#restoreConsumerPrefix for that) for the following props: 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return early than necessary) 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size of your maximum 2000 records to possibly exceed it (default is 50Mb). Guozhang On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pklei...@gmail.com> wrote: > Hi Taylor > You are right, the parallel processing is not mentioned in this issue, if > I remember correctly it was in the thread that lead to it as a possibility > when changing to the restoration listeners. > Best regards > Patrik > > > Am 07.02.2019 um 00:47 schrieb Taylor P <tdp002...@gmail.com>: > > > > Hi Patrik, > > > > I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380 will > > resolve this issue since our application is dependent on the global store > > being fully restored before the application can be considered healthy. It > > does not seem like KAFKA-7380 is aiming to address the nature of global > > stores restoring each partition sequentially - it is aiming to change the > > blocking nature of #start(). Restoring the global store partitions in > > parallel would definitely speed things up, though, and admittedly my > first > > thought when debugging this was "why isn't this restoring each partition > in > > parallel?". > > > > Changing our streams topology to avoid using a global store for such a > > large amount of data would be doable but it does seem like a significant > > amount of work. I am curious to know if anyone else is storing large > > amounts of data in global stores and whether there are any inherent > > limitations to the size of global stores. > > > > Our topic is already using compaction. > > > > Taylor > > > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pklei...@gmail.com> > wrote: > >> > >> Hi Taylor > >> > >> We are facing the same issue, although on a smaller scale. > >> The main problem as you found is that the restoration is running > >> sequentially, this should be addressed in > >> https://issues.apache.org/jira/browse/KAFKA-7380, although there has > been > >> no progress lately. > >> > >> On the other hand you could try re-evaluate if your problem can only be > >> solved with global state stores, in our case (both in streams as well as > >> for interactive queries) we could solve it with local state stores too, > >> although only with more changes and more complexity in the topology. > >> > >> Not sure if it is applicable for your case, but have you looked into > >> compression for the topics? > >> > >> best regards > >> > >> Patrik > >> > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> wrote: > >>> > >>> Hi, > >>> > >>> I am having issues with the global store taking a very long time to > >> restore > >>> during startup of a Kafka Streams 2.0.1 application. The global store > is > >>> backed by a RocksDB persistent store and is added to the Streams > topology > >>> in the following manner: https://pastebin.com/raw/VJutDyYe The global > >>> store > >>> topic has approximately 15 million records per partition and 18 > >> partitions. > >>> The following global consumer settings are specified: > >>> > >>> poll.timeout.ms = 10 > >>> max.poll.records = 2000 > >>> max.partition.fetch.bytes = 1048576 > >>> fetch.max.bytes = 52428800 > >>> receive.buffer.bytes = 65536 > >>> > >>> I have tried tweaking the settings above on the consumer side, such as > >>> increasing poll.timeout.ms to 2000, max.poll.records to 10000, and > >>> max.partition.fetch.bytes to 52428800, but it seems that I keep > hitting a > >>> ceiling of restoring approximately 100,000 records per second. With 15 > >>> million records per partition, it takes approximately 150 seconds to > >>> restore a single partition. With 18 partitions, it takes roughly 45 > >> minutes > >>> to fully restore the global store. > >>> > >>> Switching from HDDs to SSDs on the brokers' log directories made > >>> restoration roughly 25% faster overall, but this still feels slow. It > >> seems > >>> that I am hitting IOPS limits on the disks and am not even close to > >> hitting > >>> the throughput limits of the disks on either the broker or streams > >>> application side. > >>> > >>> How can I minimize restoration time of a global store? Are there > settings > >>> that can increase throughput with the same number of IOPS? Ideally > >>> restoration of each partition could be done in parallel but I recognize > >>> there is only a single global store thread. Bringing up a new instance > of > >>> the Kafka Streams application occurs on a potentially daily basis, so > the > >>> restoration time is becoming more and more of a hassle. > >>> > >>> Thanks. > >>> > >>> Taylor > >>> > >> > -- -- Guozhang