Hi Guozhang, Patrik,

Yes, the global consumer setting is what needs to be changed for these
settings. The restore consumer configs aren't used at all since a separate
restore consumer is not initialized for global state store restoration -
the global consumer is used. I think it would be an improvement to allow
for using different configs for the global consumer between restoration and
regular processing.

I previously tried tweaking fetch.max.bytes and receive.buffer.bytes, but
if I recall correctly, I was still capped around 100K records/sec. I will
try tweaking them again when I get time.

Is there anything major that would prevent parallelizing the restoration of
each partition of the global state store? It looks like that would be a
decent chunk of work to refactor, but I think that would have the biggest
impact in reducing global state restoration times for the scenario where
the keyspace of the global state store is very large.

Taylor


On Thu, Feb 21, 2019 at 6:31 AM Patrik Kleindl <pklei...@gmail.com> wrote:

> Hello Guozhang,
>
> thanks, that might help us too.
> Just to confirm, this depends on KTable/GlobalKTable usage, right?
> I did a test with
>
>
> streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
> 65536);
>
> streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
> 52428800);
>
> streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(StreamsConfig.RECEIVE_BUFFER_CONFIG),
> 65536);
>
> streamsConfiguration.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
> 52428800);
>
> and only if I change the globalConsumerPrefix values I see a change for the
> GlobalKTable restoration.
>
> br, Patrik
>
> PS: Logs from the test, seems to work fine and get faster/slower depending
> on the change:
>
> Default Values:
>
> 743
>
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g
> 936
>
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> groupId=] Resetting offset for partition topic-0 to offset 2480157.
> 13378
>
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> groupId=] Resetting offset for partition topic-5 to offset 2311478.
> 26459
>
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> groupId=] Resetting offset for partition topic-10 to offset 2430617.
> 38178
>
> [globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-7c733295-b253-4c0a-a5bd-81e02b2bd4b7-global-consumer,
> groupId=] Resetting offset for partition topic-8 to offset 2295704.
>
> Default * 4:
> 714
>
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g
> 862
>
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> groupId=] Resetting offset for partition topic-0 to offset 2480157.
> 10465
>
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> groupId=] Resetting offset for partition topic-5 to offset 2311478.
> 20014
>
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> groupId=] Resetting offset for partition topic-10 to offset 2430617.
> 29570
>
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
> groupId=] Resetting offset for partition topic-8 to offset 2295704.
> 40066
>
> [globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-65bf45ed-d044-41fa-9346-90bf64cb0e0d-global-consumer,
>
> Default / 4:
> 679
>
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> INFO org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl -
> global-stream-thread
>
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> Restoring state for global store topic-STATE-STORE-0000000000
> 725
>
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> INFO org.apache.kafka.clients.Metadata - Cluster ID: s2467KdmTlKV5b2YGe831g
> 848
>
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> groupId=] Resetting offset for partition topic-0 to offset 2480157.
> 29283
>
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> groupId=] Resetting offset for partition topic-5 to offset 2311478.
> 56349
>
> [globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-GlobalStreamThread]
> INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
>
> clientId=globalTableRocksDB-ffdfb1ea-fba8-4034-af8e-159d35231ec6-global-consumer,
> groupId=] Resetting offset for partition topic-10 to offset 2430617.
>
> On Wed, 20 Feb 2019 at 19:16, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Taylor,
> >
> > Sorry for the late reply! And thanks for the updated information.
> >
> > I'd recommend overriding some consumer configs via `StreamsConfig` (you
> can
> > use the StreamsConfig#restoreConsumerPrefix for that) for the following
> > props:
> >
> > 1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return early
> than
> > necessary)
> > 2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size of your
> > maximum 2000 records to possibly exceed it (default is 50Mb).
> >
> >
> > Guozhang
> >
> >
> >
> > On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pklei...@gmail.com>
> wrote:
> >
> > > Hi Taylor
> > > You are right, the parallel processing is not mentioned in this issue,
> if
> > > I remember correctly it was in the thread that lead to it as a
> > possibility
> > > when changing to the restoration listeners.
> > > Best regards
> > > Patrik
> > >
> > > > Am 07.02.2019 um 00:47 schrieb Taylor P <tdp002...@gmail.com>:
> > > >
> > > > Hi Patrik,
> > > >
> > > > I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380
> > will
> > > > resolve this issue since our application is dependent on the global
> > store
> > > > being fully restored before the application can be considered
> healthy.
> > It
> > > > does not seem like KAFKA-7380 is aiming to address the nature of
> global
> > > > stores restoring each partition sequentially - it is aiming to change
> > the
> > > > blocking nature of #start(). Restoring the global store partitions in
> > > > parallel would definitely speed things up, though, and admittedly my
> > > first
> > > > thought when debugging this was "why isn't this restoring each
> > partition
> > > in
> > > > parallel?".
> > > >
> > > > Changing our streams topology to avoid using a global store for such
> a
> > > > large amount of data would be doable but it does seem like a
> > significant
> > > > amount of work. I am curious to know if anyone else is storing large
> > > > amounts of data in global stores and whether there are any inherent
> > > > limitations to the size of global stores.
> > > >
> > > > Our topic is already using compaction.
> > > >
> > > > Taylor
> > > >
> > > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pklei...@gmail.com>
> > > wrote:
> > > >>
> > > >> Hi Taylor
> > > >>
> > > >> We are facing the same issue, although on a smaller scale.
> > > >> The main problem as you found is that the restoration is running
> > > >> sequentially, this should be addressed in
> > > >> https://issues.apache.org/jira/browse/KAFKA-7380, although there
> has
> > > been
> > > >> no progress lately.
> > > >>
> > > >> On the other hand you could try re-evaluate if your problem can only
> > be
> > > >> solved with global state stores, in our case (both in streams as
> well
> > as
> > > >> for interactive queries) we could solve it with local state stores
> > too,
> > > >> although only with more changes and more complexity in the topology.
> > > >>
> > > >> Not sure if it is applicable for your case, but have you looked into
> > > >> compression for the topics?
> > > >>
> > > >> best regards
> > > >>
> > > >> Patrik
> > > >>
> > > >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> wrote:
> > > >>>
> > > >>> Hi,
> > > >>>
> > > >>> I am having issues with the global store taking a very long time to
> > > >> restore
> > > >>> during startup of a Kafka Streams 2.0.1 application. The global
> store
> > > is
> > > >>> backed by a RocksDB persistent store and is added to the Streams
> > > topology
> > > >>> in the following manner: https://pastebin.com/raw/VJutDyYe The
> > global
> > > >>> store
> > > >>> topic has approximately 15 million records per partition and 18
> > > >> partitions.
> > > >>> The following global consumer settings are specified:
> > > >>>
> > > >>>    poll.timeout.ms = 10
> > > >>>    max.poll.records = 2000
> > > >>>    max.partition.fetch.bytes = 1048576
> > > >>>    fetch.max.bytes = 52428800
> > > >>>    receive.buffer.bytes = 65536
> > > >>>
> > > >>> I have tried tweaking the settings above on the consumer side, such
> > as
> > > >>> increasing poll.timeout.ms to 2000, max.poll.records to 10000, and
> > > >>> max.partition.fetch.bytes to 52428800, but it seems that I keep
> > > hitting a
> > > >>> ceiling of restoring approximately 100,000 records per second. With
> > 15
> > > >>> million records per partition, it takes approximately 150 seconds
> to
> > > >>> restore a single partition. With 18 partitions, it takes roughly 45
> > > >> minutes
> > > >>> to fully restore the global store.
> > > >>>
> > > >>> Switching from HDDs to SSDs on the brokers' log directories made
> > > >>> restoration roughly 25% faster overall, but this still feels slow.
> It
> > > >> seems
> > > >>> that I am hitting IOPS limits on the disks and am not even close to
> > > >> hitting
> > > >>> the throughput limits of the disks on either the broker or streams
> > > >>> application side.
> > > >>>
> > > >>> How can I minimize restoration time of a global store? Are there
> > > settings
> > > >>> that can increase throughput with the same number of IOPS? Ideally
> > > >>> restoration of each partition could be done in parallel but I
> > recognize
> > > >>> there is only a single global store thread. Bringing up a new
> > instance
> > > of
> > > >>> the Kafka Streams application occurs on a potentially daily basis,
> so
> > > the
> > > >>> restoration time is becoming more and more of a hassle.
> > > >>>
> > > >>> Thanks.
> > > >>>
> > > >>> Taylor
> > > >>>
> > > >>
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to