Hello Guozhang, thanks, that might help us too. Just to confirm, this depends on KTable/GlobalKTable usage, right? I did a test with
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG), 65536); streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), 52428800); streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG), 65536); streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), 52428800); and only if I change the globalConsumerPrefix values I see a change for the GlobalKTable restoration. br, Patrik PS: Logs from the test, seems to work fine and get faster/slower depending on the change: Default Values: 743 [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g 936 [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, groupId=] Resetting offset for partition topic-0 to offset 2480157. 13378 [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, groupId=] Resetting offset for partition topic-5 to offset 2311478. 26459 [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, groupId=] Resetting offset for partition topic-10 to offset 2430617. 38178 [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, groupId=] Resetting offset for partition topic-8 to offset 2295704. Default * 4: 714 [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g 862 [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, groupId=] Resetting offset for partition topic-0 to offset 2480157. 10465 [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, groupId=] Resetting offset for partition topic-5 to offset 2311478. 20014 [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, groupId=] Resetting offset for partition topic-10 to offset 2430617. 29570 [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, groupId=] Resetting offset for partition topic-8 to offset 2295704. 40066 [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, Default / 4: 679 [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] INFO org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl - global-stream-thread [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] Restoring state for global store topic-STATE-STORE-0000000000 725 [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g 848 [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, groupId=] Resetting offset for partition topic-0 to offset 2480157. 29283 [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, groupId=] Resetting offset for partition topic-5 to offset 2311478. 56349 [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, groupId=] Resetting offset for partition topic-10 to offset 2430617. On Wed, 20 Feb 2019 at 19:16, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Taylor, > > Sorry for the late reply! And thanks for the updated information. > > I'd recommend overriding some consumer configs via `StreamsConfig` (you can > use the StreamsConfig#restoreConsumerPrefix for that) for the following > props: > > 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return early than > necessary) > 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size of your > maximum 2000 records to possibly exceed it (default is 50Mb). > > > Guozhang > > > > On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pklei...@gmail.com> wrote: > > > Hi Taylor > > You are right, the parallel processing is not mentioned in this issue, if > > I remember correctly it was in the thread that lead to it as a > possibility > > when changing to the restoration listeners. > > Best regards > > Patrik > > > > > Am 07.02.2019 um 00:47 schrieb Taylor P <tdp002...@gmail.com>: > > > > > > Hi Patrik, > > > > > > I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380 > will > > > resolve this issue since our application is dependent on the global > store > > > being fully restored before the application can be considered healthy. > It > > > does not seem like KAFKA-7380 is aiming to address the nature of global > > > stores restoring each partition sequentially - it is aiming to change > the > > > blocking nature of #start(). Restoring the global store partitions in > > > parallel would definitely speed things up, though, and admittedly my > > first > > > thought when debugging this was "why isn't this restoring each > partition > > in > > > parallel?". > > > > > > Changing our streams topology to avoid using a global store for such a > > > large amount of data would be doable but it does seem like a > significant > > > amount of work. I am curious to know if anyone else is storing large > > > amounts of data in global stores and whether there are any inherent > > > limitations to the size of global stores. > > > > > > Our topic is already using compaction. > > > > > > Taylor > > > > > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pklei...@gmail.com> > > wrote: > > >> > > >> Hi Taylor > > >> > > >> We are facing the same issue, although on a smaller scale. > > >> The main problem as you found is that the restoration is running > > >> sequentially, this should be addressed in > > >> https://issues.apache.org/jira/browse/KAFKA-7380, although there has > > been > > >> no progress lately. > > >> > > >> On the other hand you could try re-evaluate if your problem can only > be > > >> solved with global state stores, in our case (both in streams as well > as > > >> for interactive queries) we could solve it with local state stores > too, > > >> although only with more changes and more complexity in the topology. > > >> > > >> Not sure if it is applicable for your case, but have you looked into > > >> compression for the topics? > > >> > > >> best regards > > >> > > >> Patrik > > >> > > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> wrote: > > >>> > > >>> Hi, > > >>> > > >>> I am having issues with the global store taking a very long time to > > >> restore > > >>> during startup of a Kafka Streams 2.0.1 application. The global store > > is > > >>> backed by a RocksDB persistent store and is added to the Streams > > topology > > >>> in the following manner: https://pastebin.com/raw/VJutDyYe The > global > > >>> store > > >>> topic has approximately 15 million records per partition and 18 > > >> partitions. > > >>> The following global consumer settings are specified: > > >>> > > >>> poll.timeout.ms = 10 > > >>> max.poll.records = 2000 > > >>> max.partition.fetch.bytes = 1048576 > > >>> fetch.max.bytes = 52428800 > > >>> receive.buffer.bytes = 65536 > > >>> > > >>> I have tried tweaking the settings above on the consumer side, such > as > > >>> increasing poll.timeout.ms to 2000, max.poll.records to 10000, and > > >>> max.partition.fetch.bytes to 52428800, but it seems that I keep > > hitting a > > >>> ceiling of restoring approximately 100,000 records per second. With > 15 > > >>> million records per partition, it takes approximately 150 seconds to > > >>> restore a single partition. With 18 partitions, it takes roughly 45 > > >> minutes > > >>> to fully restore the global store. > > >>> > > >>> Switching from HDDs to SSDs on the brokers' log directories made > > >>> restoration roughly 25% faster overall, but this still feels slow. It > > >> seems > > >>> that I am hitting IOPS limits on the disks and am not even close to > > >> hitting > > >>> the throughput limits of the disks on either the broker or streams > > >>> application side. > > >>> > > >>> How can I minimize restoration time of a global store? Are there > > settings > > >>> that can increase throughput with the same number of IOPS? Ideally > > >>> restoration of each partition could be done in parallel but I > recognize > > >>> there is only a single global store thread. Bringing up a new > instance > > of > > >>> the Kafka Streams application occurs on a potentially daily basis, so > > the > > >>> restoration time is becoming more and more of a hassle. > > >>> > > >>> Thanks. > > >>> > > >>> Taylor > > >>> > > >> > > > > > -- > -- Guozhang >