KAFKA-6721 <https://issues.apache.org/jira/browse/KAFKA-6721> should not need a KIP since it is for refactoring the internal classes only for cleaner code. However 8023 which is aimed for having multi-thread support would not be covered by 6721.
Guozhang On Fri, Mar 1, 2019 at 12:15 AM Patrik Kleindl <pklei...@gmail.com> wrote: > Hi Guozhang > > I have created https://issues.apache.org/jira/browse/KAFKA-8023 and by > accident found https://issues.apache.org/jira/browse/KAFKA-6721 which was > what I was looking for at the beginning. > Does this need a KIP? > I can maybe help with the writeup but I am not sure I should help with the > code ;-) > > 6721 might indirectly cover point 1) from above as currently (if I > understand Taylor correctly) it seems a bit inconsistent that the normal > Tables have split config for processing and restore while in the global > case both are shared, although it is understandably just a result of using > only one consumer for the global state stores. > > best regards > Patrik > > On Thu, 28 Feb 2019 at 23:46, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Taylor, > > > > 1) Yes we do allow users to have separate config values for global > > consumers / restore consumers via StreamsConfig#restoreConsumerPrefix and > > StreamsConfig#globalConsumerPrefix, as Patrik pointed out. > > > > 2) I think I agree with you that for global consumer, it is worth while > to > > allow one than one update thread (for restore consumer though we have the > > same stream thread for it by design, so that is much harder to > > re-architecture). Would you mind creating a JIRA ticket for it so we do > not > > forget about this potential improvement? > > > > Guozhang > > > > On Wed, Feb 27, 2019 at 2:02 PM Taylor P <tdp002...@gmail.com> wrote: > > > > > Hi Guozhang, Patrik, > > > > > > Yes, the global consumer setting is what needs to be changed for these > > > settings. The restore consumer configs aren't used at all since a > > separate > > > restore consumer is not initialized for global state store restoration > - > > > the global consumer is used. I think it would be an improvement to > allow > > > for using different configs for the global consumer between restoration > > and > > > regular processing. > > > > > > I previously tried tweaking fetch.max.bytes and receive.buffer.bytes, > but > > > if I recall correctly, I was still capped around 100K records/sec. I > will > > > try tweaking them again when I get time. > > > > > > Is there anything major that would prevent parallelizing the > restoration > > of > > > each partition of the global state store? It looks like that would be a > > > decent chunk of work to refactor, but I think that would have the > biggest > > > impact in reducing global state restoration times for the scenario > where > > > the keyspace of the global state store is very large. > > > > > > Taylor > > > > > > > > > On Thu, Feb 21, 2019 at 6:31 AM Patrik Kleindl <pklei...@gmail.com> > > wrote: > > > > > > > Hello Guozhang, > > > > > > > > thanks, that might help us too. > > > > Just to confirm, this depends on KTable/GlobalKTable usage, right? > > > > I did a test with > > > > > > > > > > > > > > > > > > streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG), > > > > 65536); > > > > > > > > > > > > > > streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), > > > > 52428800); > > > > > > > > > > > > > > streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG), > > > > 65536); > > > > > > > > > > > > > > streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), > > > > 52428800); > > > > > > > > and only if I change the globalConsumerPrefix values I see a change > for > > > the > > > > GlobalKTable restoration. > > > > > > > > br, Patrik > > > > > > > > PS: Logs from the test, seems to work fine and get faster/slower > > > depending > > > > on the change: > > > > > > > > Default Values: > > > > > > > > 743 > > > > > > > > > > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.Metadata - Cluster ID: > > > s2467KdmTlKV5b2YGe831g > > > > 936 > > > > > > > > > > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, > > > > groupId=] Resetting offset for partition topic-0 to offset 2480157. > > > > 13378 > > > > > > > > > > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, > > > > groupId=] Resetting offset for partition topic-5 to offset 2311478. > > > > 26459 > > > > > > > > > > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, > > > > groupId=] Resetting offset for partition topic-10 to offset 2430617. > > > > 38178 > > > > > > > > > > > > > > [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer, > > > > groupId=] Resetting offset for partition topic-8 to offset 2295704. > > > > > > > > Default * 4: > > > > 714 > > > > > > > > > > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.Metadata - Cluster ID: > > > s2467KdmTlKV5b2YGe831g > > > > 862 > > > > > > > > > > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > > > groupId=] Resetting offset for partition topic-0 to offset 2480157. > > > > 10465 > > > > > > > > > > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > > > groupId=] Resetting offset for partition topic-5 to offset 2311478. > > > > 20014 > > > > > > > > > > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > > > groupId=] Resetting offset for partition topic-10 to offset 2430617. > > > > 29570 > > > > > > > > > > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > > > groupId=] Resetting offset for partition topic-8 to offset 2295704. > > > > 40066 > > > > > > > > > > > > > > [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer, > > > > > > > > Default / 4: > > > > 679 > > > > > > > > > > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > > > INFO > > org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl > > > - > > > > global-stream-thread > > > > > > > > > > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > > > Restoring state for global store topic-STATE-STORE-0000000000 > > > > 725 > > > > > > > > > > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.Metadata - Cluster ID: > > > s2467KdmTlKV5b2YGe831g > > > > 848 > > > > > > > > > > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, > > > > groupId=] Resetting offset for partition topic-0 to offset 2480157. > > > > 29283 > > > > > > > > > > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, > > > > groupId=] Resetting offset for partition topic-5 to offset 2311478. > > > > 56349 > > > > > > > > > > > > > > [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread] > > > > INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer > > > > > > > > > > > > > > clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer, > > > > groupId=] Resetting offset for partition topic-10 to offset 2430617. > > > > > > > > On Wed, 20 Feb 2019 at 19:16, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > > > Hello Taylor, > > > > > > > > > > Sorry for the late reply! And thanks for the updated information. > > > > > > > > > > I'd recommend overriding some consumer configs via `StreamsConfig` > > (you > > > > can > > > > > use the StreamsConfig#restoreConsumerPrefix for that) for the > > following > > > > > props: > > > > > > > > > > 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return > early > > > > than > > > > > necessary) > > > > > 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size > of > > > your > > > > > maximum 2000 records to possibly exceed it (default is 50Mb). > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pklei...@gmail.com > > > > > > wrote: > > > > > > > > > > > Hi Taylor > > > > > > You are right, the parallel processing is not mentioned in this > > > issue, > > > > if > > > > > > I remember correctly it was in the thread that lead to it as a > > > > > possibility > > > > > > when changing to the restoration listeners. > > > > > > Best regards > > > > > > Patrik > > > > > > > > > > > > > Am 07.02.2019 um 00:47 schrieb Taylor P <tdp002...@gmail.com>: > > > > > > > > > > > > > > Hi Patrik, > > > > > > > > > > > > > > I am not sure that > > > https://issues.apache.org/jira/browse/KAFKA-7380 > > > > > will > > > > > > > resolve this issue since our application is dependent on the > > global > > > > > store > > > > > > > being fully restored before the application can be considered > > > > healthy. > > > > > It > > > > > > > does not seem like KAFKA-7380 is aiming to address the nature > of > > > > global > > > > > > > stores restoring each partition sequentially - it is aiming to > > > change > > > > > the > > > > > > > blocking nature of #start(). Restoring the global store > > partitions > > > in > > > > > > > parallel would definitely speed things up, though, and > admittedly > > > my > > > > > > first > > > > > > > thought when debugging this was "why isn't this restoring each > > > > > partition > > > > > > in > > > > > > > parallel?". > > > > > > > > > > > > > > Changing our streams topology to avoid using a global store for > > > such > > > > a > > > > > > > large amount of data would be doable but it does seem like a > > > > > significant > > > > > > > amount of work. I am curious to know if anyone else is storing > > > large > > > > > > > amounts of data in global stores and whether there are any > > inherent > > > > > > > limitations to the size of global stores. > > > > > > > > > > > > > > Our topic is already using compaction. > > > > > > > > > > > > > > Taylor > > > > > > > > > > > > > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl < > > pklei...@gmail.com > > > > > > > > > > wrote: > > > > > > >> > > > > > > >> Hi Taylor > > > > > > >> > > > > > > >> We are facing the same issue, although on a smaller scale. > > > > > > >> The main problem as you found is that the restoration is > running > > > > > > >> sequentially, this should be addressed in > > > > > > >> https://issues.apache.org/jira/browse/KAFKA-7380, although > > there > > > > has > > > > > > been > > > > > > >> no progress lately. > > > > > > >> > > > > > > >> On the other hand you could try re-evaluate if your problem > can > > > only > > > > > be > > > > > > >> solved with global state stores, in our case (both in streams > as > > > > well > > > > > as > > > > > > >> for interactive queries) we could solve it with local state > > stores > > > > > too, > > > > > > >> although only with more changes and more complexity in the > > > topology. > > > > > > >> > > > > > > >> Not sure if it is applicable for your case, but have you > looked > > > into > > > > > > >> compression for the topics? > > > > > > >> > > > > > > >> best regards > > > > > > >> > > > > > > >> Patrik > > > > > > >> > > > > > > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> > > > wrote: > > > > > > >>> > > > > > > >>> Hi, > > > > > > >>> > > > > > > >>> I am having issues with the global store taking a very long > > time > > > to > > > > > > >> restore > > > > > > >>> during startup of a Kafka Streams 2.0.1 application. The > global > > > > store > > > > > > is > > > > > > >>> backed by a RocksDB persistent store and is added to the > > Streams > > > > > > topology > > > > > > >>> in the following manner: https://pastebin.com/raw/VJutDyYe > The > > > > > global > > > > > > >>> store > > > > > > >>> topic has approximately 15 million records per partition and > 18 > > > > > > >> partitions. > > > > > > >>> The following global consumer settings are specified: > > > > > > >>> > > > > > > >>> poll.timeout.ms = 10 > > > > > > >>> max.poll.records = 2000 > > > > > > >>> max.partition.fetch.bytes = 1048576 > > > > > > >>> fetch.max.bytes = 52428800 > > > > > > >>> receive.buffer.bytes = 65536 > > > > > > >>> > > > > > > >>> I have tried tweaking the settings above on the consumer > side, > > > such > > > > > as > > > > > > >>> increasing poll.timeout.ms to 2000, max.poll.records to > 10000, > > > and > > > > > > >>> max.partition.fetch.bytes to 52428800, but it seems that I > keep > > > > > > hitting a > > > > > > >>> ceiling of restoring approximately 100,000 records per > second. > > > With > > > > > 15 > > > > > > >>> million records per partition, it takes approximately 150 > > seconds > > > > to > > > > > > >>> restore a single partition. With 18 partitions, it takes > > roughly > > > 45 > > > > > > >> minutes > > > > > > >>> to fully restore the global store. > > > > > > >>> > > > > > > >>> Switching from HDDs to SSDs on the brokers' log directories > > made > > > > > > >>> restoration roughly 25% faster overall, but this still feels > > > slow. > > > > It > > > > > > >> seems > > > > > > >>> that I am hitting IOPS limits on the disks and am not even > > close > > > to > > > > > > >> hitting > > > > > > >>> the throughput limits of the disks on either the broker or > > > streams > > > > > > >>> application side. > > > > > > >>> > > > > > > >>> How can I minimize restoration time of a global store? Are > > there > > > > > > settings > > > > > > >>> that can increase throughput with the same number of IOPS? > > > Ideally > > > > > > >>> restoration of each partition could be done in parallel but I > > > > > recognize > > > > > > >>> there is only a single global store thread. Bringing up a new > > > > > instance > > > > > > of > > > > > > >>> the Kafka Streams application occurs on a potentially daily > > > basis, > > > > so > > > > > > the > > > > > > >>> restoration time is becoming more and more of a hassle. > > > > > > >>> > > > > > > >>> Thanks. > > > > > > >>> > > > > > > >>> Taylor > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang