Hi Taylor, 1) Yes we do allow users to have separate config values for global consumers / restore consumers via StreamsConfig#restoreConsumerPrefix and StreamsConfig#globalConsumerPrefix, as Patrik pointed out.
2) I think I agree with you that for global consumer, it is worth while to allow one than one update thread (for restore consumer though we have the same stream thread for it by design, so that is much harder to re-architecture). Would you mind creating a JIRA ticket for it so we do not forget about this potential improvement? Guozhang On Wed, Feb 27, 2019 at 2:02 PM Taylor P <tdp002...@gmail.com> wrote: > Hi Guozhang, Patrik, > > Yes, the global consumer setting is what needs to be changed for these > settings. The restore consumer configs aren't used at all since a separate > restore consumer is not initialized for global state store restoration - > the global consumer is used. I think it would be an improvement to allow > for using different configs for the global consumer between restoration and > regular processing. > > I previously tried tweaking fetch.max.bytes and receive.buffer.bytes, but > if I recall correctly, I was still capped around 100K records/sec. I will > try tweaking them again when I get time. > > Is there anything major that would prevent parallelizing the restoration of > each partition of the global state store? It looks like that would be a > decent chunk of work to refactor, but I think that would have the biggest > impact in reducing global state restoration times for the scenario where > the keyspace of the global state store is very large. > > Taylor > > > On Thu, Feb 21, 2019 at 6:31 AM Patrik Kleindl <pklei...@gmail.com> wrote: > > > Hello Guozhang, > > > > thanks, that might help us too. > > Just to confirm, this depends on KTable/GlobalKTable usage, right? > > I did a test with > > > > > > > streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG), > > 65536); > > > > > streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), > > 52428800); > > > > > streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG), > > 65536); > > > > > streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), > > 52428800); > > > > and only if I change the globalConsumerPrefix values I see a change for > the > > GlobalKTable restoration. > > > > br, Patrik > > > > PS: Logs from the test, seems to work fine and get faster/slower > depending > > on the change: > > > > Default Values: > > > > 743 > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > INFO org.apache.kafka.clients.Metadata - Cluster ID: > s2467KdmTlKV5b2YGe831g > > 936 > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, > > groupId=] Resetting offset for partition topic-0 to offset 2480157. > > 13378 > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, > > groupId=] Resetting offset for partition topic-5 to offset 2311478. > > 26459 > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, > > groupId=] Resetting offset for partition topic-10 to offset 2430617. > > 38178 > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, > > groupId=] Resetting offset for partition topic-8 to offset 2295704. > > > > Default * 4: > > 714 > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > INFO org.apache.kafka.clients.Metadata - Cluster ID: > s2467KdmTlKV5b2YGe831g > > 862 > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > groupId=] Resetting offset for partition topic-0 to offset 2480157. > > 10465 > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > groupId=] Resetting offset for partition topic-5 to offset 2311478. > > 20014 > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > groupId=] Resetting offset for partition topic-10 to offset 2430617. > > 29570 > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > groupId=] Resetting offset for partition topic-8 to offset 2295704. > > 40066 > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > > > Default / 4: > > 679 > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > INFO org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl > - > > global-stream-thread > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > Restoring state for global store topic-STATE-STORE-0000000000 > > 725 > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > INFO org.apache.kafka.clients.Metadata - Cluster ID: > s2467KdmTlKV5b2YGe831g > > 848 > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, > > groupId=] Resetting offset for partition topic-0 to offset 2480157. > > 29283 > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, > > groupId=] Resetting offset for partition topic-5 to offset 2311478. > > 56349 > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, > > groupId=] Resetting offset for partition topic-10 to offset 2430617. > > > > On Wed, 20 Feb 2019 at 19:16, Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hello Taylor, > > > > > > Sorry for the late reply! And thanks for the updated information. > > > > > > I'd recommend overriding some consumer configs via `StreamsConfig` (you > > can > > > use the StreamsConfig#restoreConsumerPrefix for that) for the following > > > props: > > > > > > 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return early > > than > > > necessary) > > > 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size of > your > > > maximum 2000 records to possibly exceed it (default is 50Mb). > > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pklei...@gmail.com> > > wrote: > > > > > > > Hi Taylor > > > > You are right, the parallel processing is not mentioned in this > issue, > > if > > > > I remember correctly it was in the thread that lead to it as a > > > possibility > > > > when changing to the restoration listeners. > > > > Best regards > > > > Patrik > > > > > > > > > Am 07.02.2019 um 00:47 schrieb Taylor P <tdp002...@gmail.com>: > > > > > > > > > > Hi Patrik, > > > > > > > > > > I am not sure that > https://issues.apache.org/jira/browse/KAFKA-7380 > > > will > > > > > resolve this issue since our application is dependent on the global > > > store > > > > > being fully restored before the application can be considered > > healthy. > > > It > > > > > does not seem like KAFKA-7380 is aiming to address the nature of > > global > > > > > stores restoring each partition sequentially - it is aiming to > change > > > the > > > > > blocking nature of #start(). Restoring the global store partitions > in > > > > > parallel would definitely speed things up, though, and admittedly > my > > > > first > > > > > thought when debugging this was "why isn't this restoring each > > > partition > > > > in > > > > > parallel?". > > > > > > > > > > Changing our streams topology to avoid using a global store for > such > > a > > > > > large amount of data would be doable but it does seem like a > > > significant > > > > > amount of work. I am curious to know if anyone else is storing > large > > > > > amounts of data in global stores and whether there are any inherent > > > > > limitations to the size of global stores. > > > > > > > > > > Our topic is already using compaction. > > > > > > > > > > Taylor > > > > > > > > > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pklei...@gmail.com > > > > > > wrote: > > > > >> > > > > >> Hi Taylor > > > > >> > > > > >> We are facing the same issue, although on a smaller scale. > > > > >> The main problem as you found is that the restoration is running > > > > >> sequentially, this should be addressed in > > > > >> https://issues.apache.org/jira/browse/KAFKA-7380, although there > > has > > > > been > > > > >> no progress lately. > > > > >> > > > > >> On the other hand you could try re-evaluate if your problem can > only > > > be > > > > >> solved with global state stores, in our case (both in streams as > > well > > > as > > > > >> for interactive queries) we could solve it with local state stores > > > too, > > > > >> although only with more changes and more complexity in the > topology. > > > > >> > > > > >> Not sure if it is applicable for your case, but have you looked > into > > > > >> compression for the topics? > > > > >> > > > > >> best regards > > > > >> > > > > >> Patrik > > > > >> > > > > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> > wrote: > > > > >>> > > > > >>> Hi, > > > > >>> > > > > >>> I am having issues with the global store taking a very long time > to > > > > >> restore > > > > >>> during startup of a Kafka Streams 2.0.1 application. The global > > store > > > > is > > > > >>> backed by a RocksDB persistent store and is added to the Streams > > > > topology > > > > >>> in the following manner: https://pastebin.com/raw/VJutDyYe The > > > global > > > > >>> store > > > > >>> topic has approximately 15 million records per partition and 18 > > > > >> partitions. > > > > >>> The following global consumer settings are specified: > > > > >>> > > > > >>> poll.timeout.ms = 10 > > > > >>> max.poll.records = 2000 > > > > >>> max.partition.fetch.bytes = 1048576 > > > > >>> fetch.max.bytes = 52428800 > > > > >>> receive.buffer.bytes = 65536 > > > > >>> > > > > >>> I have tried tweaking the settings above on the consumer side, > such > > > as > > > > >>> increasing poll.timeout.ms to 2000, max.poll.records to 10000, > and > > > > >>> max.partition.fetch.bytes to 52428800, but it seems that I keep > > > > hitting a > > > > >>> ceiling of restoring approximately 100,000 records per second. > With > > > 15 > > > > >>> million records per partition, it takes approximately 150 seconds > > to > > > > >>> restore a single partition. With 18 partitions, it takes roughly > 45 > > > > >> minutes > > > > >>> to fully restore the global store. > > > > >>> > > > > >>> Switching from HDDs to SSDs on the brokers' log directories made > > > > >>> restoration roughly 25% faster overall, but this still feels > slow. > > It > > > > >> seems > > > > >>> that I am hitting IOPS limits on the disks and am not even close > to > > > > >> hitting > > > > >>> the throughput limits of the disks on either the broker or > streams > > > > >>> application side. > > > > >>> > > > > >>> How can I minimize restoration time of a global store? Are there > > > > settings > > > > >>> that can increase throughput with the same number of IOPS? > Ideally > > > > >>> restoration of each partition could be done in parallel but I > > > recognize > > > > >>> there is only a single global store thread. Bringing up a new > > > instance > > > > of > > > > >>> the Kafka Streams application occurs on a potentially daily > basis, > > so > > > > the > > > > >>> restoration time is becoming more and more of a hassle. > > > > >>> > > > > >>> Thanks. > > > > >>> > > > > >>> Taylor > > > > >>> > > > > >> > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > -- -- Guozhang