Hello Taylor,

Sorry for the late reply! And thanks for the updated information.

I'd recommend overriding some consumer configs via `StreamsConfig` (you can
use the StreamsConfig#restoreConsumerPrefix for that) for the following
props:

1) increase RECEIVE_BUFFER_CONFIG (64K may cause poll to return early than
necessary)
2) increase FETCH_MAX_BYTES_CONFIG if you'd expect the total size of your
maximum 2000 records to possibly exceed it (default is 50Mb).


Guozhang



On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl <pklei...@gmail.com> wrote:

> Hi Taylor
> You are right, the parallel processing is not mentioned in this issue, if
> I remember correctly it was in the thread that lead to it as a possibility
> when changing to the restoration listeners.
> Best regards
> Patrik
>
> > Am 07.02.2019 um 00:47 schrieb Taylor P <tdp002...@gmail.com>:
> >
> > Hi Patrik,
> >
> > I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380 will
> > resolve this issue since our application is dependent on the global store
> > being fully restored before the application can be considered healthy. It
> > does not seem like KAFKA-7380 is aiming to address the nature of global
> > stores restoring each partition sequentially - it is aiming to change the
> > blocking nature of #start(). Restoring the global store partitions in
> > parallel would definitely speed things up, though, and admittedly my
> first
> > thought when debugging this was "why isn't this restoring each partition
> in
> > parallel?".
> >
> > Changing our streams topology to avoid using a global store for such a
> > large amount of data would be doable but it does seem like a significant
> > amount of work. I am curious to know if anyone else is storing large
> > amounts of data in global stores and whether there are any inherent
> > limitations to the size of global stores.
> >
> > Our topic is already using compaction.
> >
> > Taylor
> >
> >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl <pklei...@gmail.com>
> wrote:
> >>
> >> Hi Taylor
> >>
> >> We are facing the same issue, although on a smaller scale.
> >> The main problem as you found is that the restoration is running
> >> sequentially, this should be addressed in
> >> https://issues.apache.org/jira/browse/KAFKA-7380, although there has
> been
> >> no progress lately.
> >>
> >> On the other hand you could try re-evaluate if your problem can only be
> >> solved with global state stores, in our case (both in streams as well as
> >> for interactive queries) we could solve it with local state stores too,
> >> although only with more changes and more complexity in the topology.
> >>
> >> Not sure if it is applicable for your case, but have you looked into
> >> compression for the topics?
> >>
> >> best regards
> >>
> >> Patrik
> >>
> >>> On Tue, 5 Feb 2019 at 22:37, Taylor P <tdp002...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am having issues with the global store taking a very long time to
> >> restore
> >>> during startup of a Kafka Streams 2.0.1 application. The global store
> is
> >>> backed by a RocksDB persistent store and is added to the Streams
> topology
> >>> in the following manner: https://pastebin.com/raw/VJutDyYe The global
> >>> store
> >>> topic has approximately 15 million records per partition and 18
> >> partitions.
> >>> The following global consumer settings are specified:
> >>>
> >>>    poll.timeout.ms = 10
> >>>    max.poll.records = 2000
> >>>    max.partition.fetch.bytes = 1048576
> >>>    fetch.max.bytes = 52428800
> >>>    receive.buffer.bytes = 65536
> >>>
> >>> I have tried tweaking the settings above on the consumer side, such as
> >>> increasing poll.timeout.ms to 2000, max.poll.records to 10000, and
> >>> max.partition.fetch.bytes to 52428800, but it seems that I keep
> hitting a
> >>> ceiling of restoring approximately 100,000 records per second. With 15
> >>> million records per partition, it takes approximately 150 seconds to
> >>> restore a single partition. With 18 partitions, it takes roughly 45
> >> minutes
> >>> to fully restore the global store.
> >>>
> >>> Switching from HDDs to SSDs on the brokers' log directories made
> >>> restoration roughly 25% faster overall, but this still feels slow. It
> >> seems
> >>> that I am hitting IOPS limits on the disks and am not even close to
> >> hitting
> >>> the throughput limits of the disks on either the broker or streams
> >>> application side.
> >>>
> >>> How can I minimize restoration time of a global store? Are there
> settings
> >>> that can increase throughput with the same number of IOPS? Ideally
> >>> restoration of each partition could be done in parallel but I recognize
> >>> there is only a single global store thread. Bringing up a new instance
> of
> >>> the Kafka Streams application occurs on a potentially daily basis, so
> the
> >>> restoration time is becoming more and more of a hassle.
> >>>
> >>> Thanks.
> >>>
> >>> Taylor
> >>>
> >>
>


-- 
-- Guozhang

Reply via email to