Hello Guozhang,

thanks, that might help us too.
Just to confirm, this depends on KTable/GlobalKTable usage, right?
I did a test with

streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
65536);
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
52428800);
streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
65536);
streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
52428800);

and only if I change the globalConsumerPrefix values I see a change for the
GlobalKTable restoration.

br, Patrik

PS: Logs from the test, seems to work fine and get faster/slower depending
on the change:

Default Values:

743
[globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g
936
[globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
groupId=] Resetting offset for partition topic-0 to offset 2480157.
13378
[globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
groupId=] Resetting offset for partition topic-5 to offset 2311478.
26459
[globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
groupId=] Resetting offset for partition topic-10 to offset 2430617.
38178
[globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
groupId=] Resetting offset for partition topic-8 to offset 2295704.

Default * 4:
714
[globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g
862
[globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
groupId=] Resetting offset for partition topic-0 to offset 2480157.
10465
[globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
groupId=] Resetting offset for partition topic-5 to offset 2311478.
20014
[globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
groupId=] Resetting offset for partition topic-10 to offset 2430617.
29570
[globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
groupId=] Resetting offset for partition topic-8 to offset 2295704.
40066
[globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,

Default / 4:
679
[globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
INFO org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl -
global-stream-thread
[globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
Restoring state for global store topic-STATE-STORE-0000000000
725
[globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g
848
[globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
groupId=] Resetting offset for partition topic-0 to offset 2480157.
29283
[globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
groupId=] Resetting offset for partition topic-5 to offset 2311478.
56349
[globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
groupId=] Resetting offset for partition topic-10 to offset 2430617.

On Wed, 20 Feb 2019 at 19:16, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Taylor,
>
> Sorry for the late reply! And thanks for the updated information.
>
> I'd recommend overriding some consumer configs via `StreamsConfig` (you can
> use the StreamsConfig#restoreConsumerPrefix for that) for the following
> props:
>
> 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return early than
> necessary)
> 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size of your
> maximum 2000 records to possibly exceed it (default is 50Mb).
>
>
> Guozhang
>
>
>
> On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pklei...@gmail.com> wrote:
>
> > Hi Taylor
> > You are right, the parallel processing is not mentioned in this issue, if
> > I remember correctly it was in the thread that lead to it as a
> possibility
> > when changing to the restoration listeners.
> > Best regards
> > Patrik
> >
> > > Am 07.02.2019 um 00:47 schrieb Taylor P <tdp002...@gmail.com>:
> > >
> > > Hi Patrik,
> > >
> > > I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380
> will
> > > resolve this issue since our application is dependent on the global
> store
> > > being fully restored before the application can be considered healthy.
> It
> > > does not seem like KAFKA-7380 is aiming to address the nature of global
> > > stores restoring each partition sequentially - it is aiming to change
> the
> > > blocking nature of #start(). Restoring the global store partitions in
> > > parallel would definitely speed things up, though, and admittedly my
> > first
> > > thought when debugging this was "why isn't this restoring each
> partition
> > in
> > > parallel?".
> > >
> > > Changing our streams topology to avoid using a global store for such a
> > > large amount of data would be doable but it does seem like a
> significant
> > > amount of work. I am curious to know if anyone else is storing large
> > > amounts of data in global stores and whether there are any inherent
> > > limitations to the size of global stores.
> > >
> > > Our topic is already using compaction.
> > >
> > > Taylor
> > >
> > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pklei...@gmail.com>
> > wrote:
> > >>
> > >> Hi Taylor
> > >>
> > >> We are facing the same issue, although on a smaller scale.
> > >> The main problem as you found is that the restoration is running
> > >> sequentially, this should be addressed in
> > >> https://issues.apache.org/jira/browse/KAFKA-7380, although there has
> > been
> > >> no progress lately.
> > >>
> > >> On the other hand you could try re-evaluate if your problem can only
> be
> > >> solved with global state stores, in our case (both in streams as well
> as
> > >> for interactive queries) we could solve it with local state stores
> too,
> > >> although only with more changes and more complexity in the topology.
> > >>
> > >> Not sure if it is applicable for your case, but have you looked into
> > >> compression for the topics?
> > >>
> > >> best regards
> > >>
> > >> Patrik
> > >>
> > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> I am having issues with the global store taking a very long time to
> > >> restore
> > >>> during startup of a Kafka Streams 2.0.1 application. The global store
> > is
> > >>> backed by a RocksDB persistent store and is added to the Streams
> > topology
> > >>> in the following manner: https://pastebin.com/raw/VJutDyYe The
> global
> > >>> store
> > >>> topic has approximately 15 million records per partition and 18
> > >> partitions.
> > >>> The following global consumer settings are specified:
> > >>>
> > >>>    poll.timeout.ms = 10
> > >>>    max.poll.records = 2000
> > >>>    max.partition.fetch.bytes = 1048576
> > >>>    fetch.max.bytes = 52428800
> > >>>    receive.buffer.bytes = 65536
> > >>>
> > >>> I have tried tweaking the settings above on the consumer side, such
> as
> > >>> increasing poll.timeout.ms to 2000, max.poll.records to 10000, and
> > >>> max.partition.fetch.bytes to 52428800, but it seems that I keep
> > hitting a
> > >>> ceiling of restoring approximately 100,000 records per second. With
> 15
> > >>> million records per partition, it takes approximately 150 seconds to
> > >>> restore a single partition. With 18 partitions, it takes roughly 45
> > >> minutes
> > >>> to fully restore the global store.
> > >>>
> > >>> Switching from HDDs to SSDs on the brokers' log directories made
> > >>> restoration roughly 25% faster overall, but this still feels slow. It
> > >> seems
> > >>> that I am hitting IOPS limits on the disks and am not even close to
> > >> hitting
> > >>> the throughput limits of the disks on either the broker or streams
> > >>> application side.
> > >>>
> > >>> How can I minimize restoration time of a global store? Are there
> > settings
> > >>> that can increase throughput with the same number of IOPS? Ideally
> > >>> restoration of each partition could be done in parallel but I
> recognize
> > >>> there is only a single global store thread. Bringing up a new
> instance
> > of
> > >>> the Kafka Streams application occurs on a potentially daily basis, so
> > the
> > >>> restoration time is becoming more and more of a hassle.
> > >>>
> > >>> Thanks.
> > >>>
> > >>> Taylor
> > >>>
> > >>
> >
>
>
> --
> -- Guozhang
>

Reply via email to