And it seems that for some reason after a while caching works again without
a restart of the streams application

[image: Screen Shot 2019-12-08 at 11.59.30 PM.png]

I'll try to enable debug metrics and see if I can find something useful
there.
Any idea is appreciated in the meantime :)

--
Alessandro Tagliapietra

On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> It seems that even with caching enabled, after a while the sent bytes stil
> go up
>
> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
>
> you can see the deploy when I've enabled caching but it looks like it's
> still a temporary solution.
>
> --
> Alessandro Tagliapietra
>
>
> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
>> Could be, but since we have a limite amount of input keys (~30),
>> windowing generates new keys but old ones are never touched again since the
>> data per key is in order, I assume it shouldn't be a big deal for it to
>> handle 30 keys
>> I'll have a look at cache metrics and see if something pops out
>>
>> Thanks
>>
>> --
>> Alessandro Tagliapietra
>>
>>
>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler <vvcep...@apache.org> wrote:
>>
>>> Hmm, that’s a good question. Now that we’re talking about caching, I
>>> wonder if the cache was just too small. It’s not very big by default.
>>>
>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
>>> > Ok I'll check on that!
>>> >
>>> > Now I can see that with caching we went from 3-4MB/s to 400KB/s, that
>>> will
>>> > help with the bill.
>>> >
>>> > Last question, any reason why after a while the regular windowed stream
>>> > starts sending every update instead of caching?
>>> > Could it be because it doesn't have any more memory available? Any
>>> other
>>> > possible reason?
>>> >
>>> > Thank you so much for your help
>>> >
>>> > --
>>> > Alessandro Tagliapietra
>>> >
>>> >
>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <vvcep...@apache.org>
>>> wrote:
>>> >
>>> > > Ah, yes. Glad you figured it out!
>>> > >
>>> > > Caching does not reduce EOS guarantees at all. I highly recommend
>>> using
>>> > > it. You might even want to take a look at the caching metrics to
>>> make sure
>>> > > you have a good hit ratio.
>>> > >
>>> > > -John
>>> > >
>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote:
>>> > > > Never mind I've found out I can use `.withCachingEnabled` on the
>>> store
>>> > > > builder to achieve the same thing as the windowing example as
>>> > > > `Materialized.as` turns that on by default.
>>> > > >
>>> > > > Does caching in any way reduces the EOS guarantees?
>>> > > >
>>> > > > --
>>> > > > Alessandro Tagliapietra
>>> > > >
>>> > > >
>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
>>> > > > tagliapietra.alessan...@gmail.com> wrote:
>>> > > >
>>> > > > > Seems my journey with this isn't done just yet,
>>> > > > >
>>> > > > > This seems very complicated to me but I'll try to explain it as
>>> best I
>>> > > can.
>>> > > > > To better understand the streams network usage I've used
>>> prometheus
>>> > > with
>>> > > > > the JMX exporter to export kafka metrics.
>>> > > > > To check the amount of data we use I'm looking at the increments
>>> > > > > of kafka_producer_topic_metrics_byte_total and
>>> > > > > kafka_producer_producer_topic_metrics_record_send_total,
>>> > > > >
>>> > > > > Our current (before the change mentioned above) code looks like
>>> this:
>>> > > > >
>>> > > > > // This transformers just pairs a value with the previous one
>>> storing
>>> > > the
>>> > > > > temporary one in a store
>>> > > > > val pairsStream = metricStream
>>> > > > >   .transformValues(ValueTransformerWithKeySupplier {
>>> PairTransformer()
>>> > > },
>>> > > > > "LastValueStore")
>>> > > > >   .filter { _, value: MetricSequence? -> value != null }
>>> > > > >
>>> > > > > // Create a store to store suppressed windows until a new one is
>>> > > received
>>> > > > > val suppressStoreSupplier =
>>> > > > >
>>> > >
>>> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
>>> > > > > ......
>>> > > > >
>>> > > > > // Window and aggregate data in 1 minute intervals
>>> > > > > val aggregatedStream = pairsStream
>>> > > > >   .groupByKey()
>>> > > > >   .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
>>> > > > >   .aggregate(
>>> > > > >           { MetricSequenceList(ArrayList()) },
>>> > > > >           { key, value, aggregate ->
>>> > > > >               aggregate.getRecords().add(value)
>>> > > > >               aggregate
>>> > > > >           },
>>> > > > >           Materialized.`as`<String, MetricSequenceList,
>>> > > WindowStore<Bytes,
>>> > > > >
>>> > >
>>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>>> > > > >   )
>>> > > > >   .toStream()
>>> > > > >   .flatTransform(TransformerSupplier {
>>> > > > >       // This transformer basically waits until a new window is
>>> > > received
>>> > > > > to emit the previous one
>>> > > > >   }, "suppress-store")
>>> > > > >   .map { sensorId: String, suppressedOutput: SuppressedOutput ->
>>> > > > >       .... etc ....
>>> > > > >
>>> > > > >
>>> > > > > Basically:
>>> > > > >  - all data goes through LastValueStore store that stores each
>>> message
>>> > > and
>>> > > > > emits a pair with the previous one
>>> > > > >  - the aggregate-store is used to store the per-window list of
>>> > > messages in
>>> > > > > the aggregate method
>>> > > > >  - the suppress store is used to store each received window
>>> which is
>>> > > > > emitted only after a newer one is received
>>> > > > >
>>> > > > > What I'm experiencing is that:
>>> > > > >  - during normal execution, the streams app sends to the
>>> lastvalue
>>> > > store
>>> > > > > changelog topic 5k messages/min, the aggregate and suppress store
>>> > > changelog
>>> > > > > topics only about 100
>>> > > > >  - at some point (after many hours of operation), the streams app
>>> > > starts
>>> > > > > sending to the aggregate and suppress store changelog topic the
>>> same
>>> > > amount
>>> > > > > of messages going through the lastvaluestore
>>> > > > >  - if I restart the streams app it goes back to the initial
>>> behavior
>>> > > > >
>>> > > > > You can see the behavior in this graph https://imgur.com/dJcUNSf
>>> > > > > You can also see that after a restart everything goes back to
>>> normal
>>> > > > > levels.
>>> > > > > Regarding other metrics, process latency increases, poll latency
>>> > > > > decreases, poll rate decreases, commit rate stays the same while
>>> commit
>>> > > > > latency increases.
>>> > > > >
>>> > > > > Now, I've these questions:
>>> > > > >  - why isn't the aggregate/suppress store changelog topic
>>> throughput
>>> > > the
>>> > > > > same as the LastValueStore? Shouldn't every time it aggregates
>>> send a
>>> > > > > record to the changelog?
>>> > > > >  - is the windowing doing some internal caching like not sending
>>> every
>>> > > > > aggregation record until the window time is passed? (if so,
>>> where can I
>>> > > > > find that code since I would like to use that also for our new
>>> > > > > implementation)
>>> > > > >
>>> > > > > Thank you in advance
>>> > > > >
>>> > > > > --
>>> > > > > Alessandro Tagliapietra
>>> > > > >
>>> > > > >
>>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <vvcep...@apache.org
>>> >
>>> > > wrote:
>>> > > > >
>>> > > > >> Oh, good!
>>> > > > >>
>>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote:
>>> > > > >> > Testing on staging shows that a restart on exception is much
>>> faster
>>> > > and
>>> > > > >> the
>>> > > > >> > stream starts right away which I think means we're reading
>>> way less
>>> > > data
>>> > > > >> > than before!
>>> > > > >> >
>>> > > > >> > What I was referring to is that, in Streams, the keys for
>>> window
>>> > > > >> > > aggregation state is actually composed of both the window
>>> itself
>>> > > and
>>> > > > >> the
>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results
>>> in the
>>> > > > >> store
>>> > > > >> > > having a unique key per window for each K, which is why we
>>> need
>>> > > > >> retention
>>> > > > >> > > as well as compaction for our changelogs. But for you, if
>>> you just
>>> > > > >> make the
>>> > > > >> > > key "K", then compaction alone should do the trick.
>>> > > > >> >
>>> > > > >> > Yes we had compact,delete as cleanup policy but probably it
>>> still
>>> > > had a
>>> > > > >> too
>>> > > > >> > long retention value, also the rocksdb store is probably much
>>> > > faster now
>>> > > > >> > having only one key per key instead of one key per window per
>>> key.
>>> > > > >> >
>>> > > > >> > Thanks a lot for helping! I'm now going to setup a
>>> prometheus-jmx
>>> > > > >> > monitoring so we can keep better track of what's going on :)
>>> > > > >> >
>>> > > > >> > --
>>> > > > >> > Alessandro Tagliapietra
>>> > > > >> >
>>> > > > >> >
>>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <
>>> vvcep...@apache.org>
>>> > > > >> wrote:
>>> > > > >> >
>>> > > > >> > > Oh, yeah, I remember that conversation!
>>> > > > >> > >
>>> > > > >> > > Yes, then, I agree, if you're only storing state of the most
>>> > > recent
>>> > > > >> window
>>> > > > >> > > for each key, and the key you use for that state is
>>> actually the
>>> > > key
>>> > > > >> of the
>>> > > > >> > > records, then an aggressive compaction policy plus your
>>> custom
>>> > > > >> transformer
>>> > > > >> > > seems like a good way forward.
>>> > > > >> > >
>>> > > > >> > > What I was referring to is that, in Streams, the keys for
>>> window
>>> > > > >> > > aggregation state is actually composed of both the window
>>> itself
>>> > > and
>>> > > > >> the
>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results
>>> in the
>>> > > > >> store
>>> > > > >> > > having a unique key per window for each K, which is why we
>>> need
>>> > > > >> retention
>>> > > > >> > > as well as compaction for our changelogs. But for you, if
>>> you just
>>> > > > >> make the
>>> > > > >> > > key "K", then compaction alone should do the trick.
>>> > > > >> > >
>>> > > > >> > > And yes, if you manage the topic yourself, then Streams
>>> won't
>>> > > adjust
>>> > > > >> the
>>> > > > >> > > retention time. I think it might validate that the retention
>>> > > isn't too
>>> > > > >> > > short, but I don't remember offhand.
>>> > > > >> > >
>>> > > > >> > > Cheers, and let me know how it goes!
>>> > > > >> > > -John
>>> > > > >> > >
>>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro Tagliapietra
>>> wrote:
>>> > > > >> > > > Hi John,
>>> > > > >> > > >
>>> > > > >> > > > afaik grace period uses stream time
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >>
>>> > >
>>> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
>>> > > > >> > > > which is
>>> > > > >> > > > per partition, unfortunately we process data that's not
>>> in sync
>>> > > > >> between
>>> > > > >> > > > keys so each key needs to be independent and a key can
>>> have much
>>> > > > >> older
>>> > > > >> > > > data
>>> > > > >> > > > than the other.
>>> > > > >> > > >
>>> > > > >> > > > Having a small grace period would probably close old
>>> windows
>>> > > sooner
>>> > > > >> than
>>> > > > >> > > > expected. That's also why in my use case a custom store
>>> that
>>> > > just
>>> > > > >> stores
>>> > > > >> > > > the last window data for each key might work better. I
>>> had the
>>> > > same
>>> > > > >> issue
>>> > > > >> > > > with suppression and it has been reported here
>>> > > > >> > > > https://issues.apache.org/jira/browse/KAFKA-8769
>>> > > > >> > > > Oh I just saw that you're the one that helped me on slack
>>> and
>>> > > > >> created the
>>> > > > >> > > > issue (thanks again for that).
>>> > > > >> > > >
>>> > > > >> > > > The behavior that you mention about streams setting
>>> changelog
>>> > > > >> retention
>>> > > > >> > > > time is something they do on creation of the topic when
>>> the
>>> > > broker
>>> > > > >> has
>>> > > > >> > > auto
>>> > > > >> > > > creation enabled? Because we're using confluent cloud and
>>> I had
>>> > > to
>>> > > > >> create
>>> > > > >> > > > it manually.
>>> > > > >> > > > Regarding the change in the recovery behavior, with
>>> compact
>>> > > cleanup
>>> > > > >> > > policy
>>> > > > >> > > > shouldn't the changelog only keep the last value? That
>>> would
>>> > > make
>>> > > > >> the
>>> > > > >> > > > recovery faster and "cheaper" as it would only need to
>>> read a
>>> > > single
>>> > > > >> > > value
>>> > > > >> > > > per key (if the cleanup just happened) right?
>>> > > > >> > > >
>>> > > > >> > > > --
>>> > > > >> > > > Alessandro Tagliapietra
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler <
>>> > > vvcep...@apache.org>
>>> > > > >> wrote:
>>> > > > >> > > >
>>> > > > >> > > > > Hey Alessandro,
>>> > > > >> > > > >
>>> > > > >> > > > > That sounds also like it would work. I'm wondering if
>>> it would
>>> > > > >> actually
>>> > > > >> > > > > change what you observe w.r.t. recovery behavior,
>>> though.
>>> > > Streams
>>> > > > >> > > already
>>> > > > >> > > > > sets the retention time on the changelog to equal the
>>> > > retention
>>> > > > >> time
>>> > > > >> > > of the
>>> > > > >> > > > > windows, for windowed aggregations, so you shouldn't be
>>> > > loading a
>>> > > > >> lot
>>> > > > >> > > of
>>> > > > >> > > > > window data for old windows you no longer care about.
>>> > > > >> > > > >
>>> > > > >> > > > > Have you set the "grace period" on your window
>>> definition? By
>>> > > > >> default,
>>> > > > >> > > it
>>> > > > >> > > > > is set to 24 hours, but you can set it as low as you
>>> like.
>>> > > E.g.,
>>> > > > >> if you
>>> > > > >> > > > > want to commit to having in-order data only, then you
>>> can set
>>> > > the
>>> > > > >> grace
>>> > > > >> > > > > period to zero. This _should_ let the broker clean up
>>> the
>>> > > > >> changelog
>>> > > > >> > > records
>>> > > > >> > > > > as soon as the window ends.
>>> > > > >> > > > >
>>> > > > >> > > > > Of course, the log cleaner doesn't run all the time, so
>>> > > there's
>>> > > > >> some
>>> > > > >> > > extra
>>> > > > >> > > > > delay in which "expired" data would still be visible in
>>> the
>>> > > > >> changelog,
>>> > > > >> > > but
>>> > > > >> > > > > it would actually be just the same as if you manage the
>>> store
>>> > > > >> yourself.
>>> > > > >> > > > >
>>> > > > >> > > > > Hope this helps!
>>> > > > >> > > > > -John
>>> > > > >> > > > >
>>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra
>>> wrote:
>>> > > > >> > > > > > Thanks John for the explanation,
>>> > > > >> > > > > >
>>> > > > >> > > > > > I thought that with EOS enabled (which we have) it
>>> would in
>>> > > the
>>> > > > >> worst
>>> > > > >> > > > > case
>>> > > > >> > > > > > find a valid checkpoint and start the restore from
>>> there
>>> > > until
>>> > > > >> it
>>> > > > >> > > reached
>>> > > > >> > > > > > the last committed status, not completely from
>>> scratch. What
>>> > > > >> you say
>>> > > > >> > > > > > definitely makes sense now.
>>> > > > >> > > > > > Since we don't really need old time windows and we
>>> ensure
>>> > > data
>>> > > > >> is
>>> > > > >> > > ordered
>>> > > > >> > > > > > when processed I think I"ll just write a custom
>>> transformer
>>> > > to
>>> > > > >> keep
>>> > > > >> > > only
>>> > > > >> > > > > > the last window, store intermediate aggregation
>>> results in
>>> > > the
>>> > > > >> store
>>> > > > >> > > and
>>> > > > >> > > > > > emit a new value only when we receive data belonging
>>> to a
>>> > > new
>>> > > > >> window.
>>> > > > >> > > > > > That with a compact only changelog topic should keep
>>> the
>>> > > rebuild
>>> > > > >> > > data to
>>> > > > >> > > > > > the minimum as it would have only the last value for
>>> each
>>> > > key.
>>> > > > >> > > > > >
>>> > > > >> > > > > > Hope that makes sense
>>> > > > >> > > > > >
>>> > > > >> > > > > > Thanks again
>>> > > > >> > > > > >
>>> > > > >> > > > > > --
>>> > > > >> > > > > > Alessandro Tagliapietra
>>> > > > >> > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler <
>>> > > > >> vvcep...@apache.org>
>>> > > > >> > > wrote:
>>> > > > >> > > > > >
>>> > > > >> > > > > > > Hi Alessandro,
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > To take a stab at your question, maybe it first
>>> doesn't
>>> > > find
>>> > > > >> it,
>>> > > > >> > > but
>>> > > > >> > > > > then
>>> > > > >> > > > > > > restores some data, writes the checkpoint, and then
>>> later
>>> > > on,
>>> > > > >> it
>>> > > > >> > > has to
>>> > > > >> > > > > > > re-initialize the task for some reason, and that's
>>> why it
>>> > > does
>>> > > > >> > > find a
>>> > > > >> > > > > > > checkpoint then?
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > More to the heart of the issue, if you have EOS
>>> enabled,
>>> > > > >> Streams
>>> > > > >> > > _only_
>>> > > > >> > > > > > > records the checkpoint when the store is in a
>>> > > known-consistent
>>> > > > >> > > state.
>>> > > > >> > > > > For
>>> > > > >> > > > > > > example, if you have a graceful shutdown, Streams
>>> will
>>> > > flush
>>> > > > >> all
>>> > > > >> > > the
>>> > > > >> > > > > > > stores, commit all the transactions, and then write
>>> the
>>> > > > >> checkpoint
>>> > > > >> > > > > file.
>>> > > > >> > > > > > > Then, on re-start, it will pick up from that
>>> checkpoint.
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > But as soon as it starts processing records, it
>>> removes
>>> > > the
>>> > > > >> > > checkpoint
>>> > > > >> > > > > > > file, so if it crashes while it was processing,
>>> there is
>>> > > no
>>> > > > >> > > checkpoint
>>> > > > >> > > > > file
>>> > > > >> > > > > > > there, and it will have to restore from the
>>> beginning of
>>> > > the
>>> > > > >> > > changelog.
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > This design is there on purpose, because otherwise
>>> we
>>> > > cannot
>>> > > > >> > > actually
>>> > > > >> > > > > > > guarantee correctness... For example, if you are
>>> > > maintaining a
>>> > > > >> > > count
>>> > > > >> > > > > > > operation, and we process an input record i,
>>> increment the
>>> > > > >> count
>>> > > > >> > > and
>>> > > > >> > > > > write
>>> > > > >> > > > > > > it to the state store, and to the changelog topic.
>>> But we
>>> > > > >> crash
>>> > > > >> > > before
>>> > > > >> > > > > we
>>> > > > >> > > > > > > commit that transaction. Then, the write to the
>>> changelog
>>> > > > >> would be
>>> > > > >> > > > > aborted,
>>> > > > >> > > > > > > and we would re-process record i . However, we've
>>> already
>>> > > > >> updated
>>> > > > >> > > the
>>> > > > >> > > > > local
>>> > > > >> > > > > > > state store, so when we increment it again, it
>>> results in
>>> > > > >> > > > > double-counting
>>> > > > >> > > > > > > i. The key point here is that there's no way to do
>>> an
>>> > > atomic
>>> > > > >> > > operation
>>> > > > >> > > > > > > across two different systems (local state store and
>>> the
>>> > > > >> changelog
>>> > > > >> > > > > topic).
>>> > > > >> > > > > > > Since we can't guarantee that we roll back the
>>> incremented
>>> > > > >> count
>>> > > > >> > > when
>>> > > > >> > > > > the
>>> > > > >> > > > > > > changelog transaction is aborted, we can't keep the
>>> local
>>> > > > >> store
>>> > > > >> > > > > consistent
>>> > > > >> > > > > > > with the changelog.
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > After a crash, the only way to ensure the local
>>> store is
>>> > > > >> consistent
>>> > > > >> > > > > with
>>> > > > >> > > > > > > the changelog is to discard the entire thing and
>>> rebuild
>>> > > it.
>>> > > > >> This
>>> > > > >> > > is
>>> > > > >> > > > > why we
>>> > > > >> > > > > > > have an invariant that the checkpoint file only
>>> exists
>>> > > when we
>>> > > > >> > > _know_
>>> > > > >> > > > > that
>>> > > > >> > > > > > > the local store is consistent with the changelog,
>>> and
>>> > > this is
>>> > > > >> why
>>> > > > >> > > > > you're
>>> > > > >> > > > > > > seeing so much bandwidth when re-starting from an
>>> unclean
>>> > > > >> shutdown.
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > Note that it's definitely possible to do better
>>> than this,
>>> > > > >> and we
>>> > > > >> > > would
>>> > > > >> > > > > > > very much like to improve it in the future.
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > Thanks,
>>> > > > >> > > > > > > -John
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro
>>> Tagliapietra
>>> > > wrote:
>>> > > > >> > > > > > > > Hi John,
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > > thanks a lot for helping, regarding your message:
>>> > > > >> > > > > > > >  - no we only have 1 instance of the stream
>>> application,
>>> > > > >> and it
>>> > > > >> > > > > always
>>> > > > >> > > > > > > > re-uses the same state folder
>>> > > > >> > > > > > > >  - yes we're seeing most issues when restarting
>>> not
>>> > > > >> gracefully
>>> > > > >> > > due
>>> > > > >> > > > > > > exception
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > > I've enabled trace logging and filtering by a
>>> single
>>> > > state
>>> > > > >> store
>>> > > > >> > > the
>>> > > > >> > > > > > > > StoreChangelogReader messages are:
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > > Added restorer for changelog
>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-0
>>> > > > >> > > > > > > > Added restorer for changelog
>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-1
>>> > > > >> > > > > > > > Added restorer for changelog
>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-2
>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for
>>> store
>>> > > > >> > > aggregate-store,
>>> > > > >> > > > > > > > rewinding to beginning.
>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for
>>> store
>>> > > > >> > > aggregate-store,
>>> > > > >> > > > > > > > rewinding to beginning.
>>> > > > >> > > > > > > > Did not find checkpoint from changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for
>>> store
>>> > > > >> > > aggregate-store,
>>> > > > >> > > > > > > > rewinding to beginning.
>>> > > > >> > > > > > > > No checkpoint found for task 0_2 state store
>>> > > aggregate-store
>>> > > > >> > > > > changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 with
>>> EOS
>>> > > turned
>>> > > > >> on.
>>> > > > >> > > > > > > > Reinitializing the task and restore its state
>>> from the
>>> > > > >> beginning.
>>> > > > >> > > > > > > > No checkpoint found for task 0_1 state store
>>> > > aggregate-store
>>> > > > >> > > > > changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 with
>>> EOS
>>> > > turned
>>> > > > >> on.
>>> > > > >> > > > > > > > Reinitializing the task and restore its state
>>> from the
>>> > > > >> beginning.
>>> > > > >> > > > > > > > No checkpoint found for task 0_0 state store
>>> > > aggregate-store
>>> > > > >> > > > > changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 with
>>> EOS
>>> > > turned
>>> > > > >> on.
>>> > > > >> > > > > > > > Reinitializing the task and restore its state
>>> from the
>>> > > > >> beginning.
>>> > > > >> > > > > > > > Found checkpoint 709937 from changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for
>>> store
>>> > > > >> > > aggregate-store.
>>> > > > >> > > > > > > > Restoring partition
>>> > > > >> sensors-stream-aggregate-store-changelog-2
>>> > > > >> > > from
>>> > > > >> > > > > > > offset
>>> > > > >> > > > > > > > 709937 to endOffset 742799
>>> > > > >> > > > > > > > Found checkpoint 3024234 from changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for
>>> store
>>> > > > >> > > aggregate-store.
>>> > > > >> > > > > > > > Restoring partition
>>> > > > >> sensors-stream-aggregate-store-changelog-1
>>> > > > >> > > from
>>> > > > >> > > > > > > offset
>>> > > > >> > > > > > > > 3024234 to endOffset 3131513
>>> > > > >> > > > > > > > Found checkpoint 14514072 from changelog
>>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for
>>> store
>>> > > > >> > > aggregate-store.
>>> > > > >> > > > > > > > Restoring partition
>>> > > > >> sensors-stream-aggregate-store-changelog-0
>>> > > > >> > > from
>>> > > > >> > > > > > > offset
>>> > > > >> > > > > > > > 14514072 to endOffset 17116574
>>> > > > >> > > > > > > > Restored from
>>> > > sensors-stream-aggregate-store-changelog-2 to
>>> > > > >> > > > > > > aggregate-store
>>> > > > >> > > > > > > > with 966 records, ending offset is 711432, next
>>> starting
>>> > > > >> > > position is
>>> > > > >> > > > > > > 711434
>>> > > > >> > > > > > > > Restored from
>>> > > sensors-stream-aggregate-store-changelog-2 to
>>> > > > >> > > > > > > aggregate-store
>>> > > > >> > > > > > > > with 914 records, ending offset is 712711, next
>>> starting
>>> > > > >> > > position is
>>> > > > >> > > > > > > 712713
>>> > > > >> > > > > > > > Restored from
>>> > > sensors-stream-aggregate-store-changelog-1 to
>>> > > > >> > > > > > > aggregate-store
>>> > > > >> > > > > > > > with 18 records, ending offset is 3024261, next
>>> starting
>>> > > > >> > > position is
>>> > > > >> > > > > > > 3024262
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > > why it first says it didn't find the checkpoint
>>> and
>>> > > then it
>>> > > > >> does
>>> > > > >> > > > > find it?
>>> > > > >> > > > > > > > It seems it loaded about  2.7M records (sum of
>>> offset
>>> > > > >> difference
>>> > > > >> > > in
>>> > > > >> > > > > the
>>> > > > >> > > > > > > > "restorting partition ...." messages) right?
>>> > > > >> > > > > > > > Maybe should I try to reduce the checkpoint
>>> interval?
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > > Regards
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > > --
>>> > > > >> > > > > > > > Alessandro Tagliapietra
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John Roesler <
>>> > > > >> vvcep...@apache.org
>>> > > > >> > > >
>>> > > > >> > > > > wrote:
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > > > > Hi Alessandro,
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > > > I'm sorry to hear that.
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > > > The restore process only takes one factor into
>>> > > account:
>>> > > > >> the
>>> > > > >> > > current
>>> > > > >> > > > > > > offset
>>> > > > >> > > > > > > > > position of the changelog topic is stored in a
>>> local
>>> > > file
>>> > > > >> > > > > alongside the
>>> > > > >> > > > > > > > > state stores. On startup, the app checks if the
>>> > > recorded
>>> > > > >> > > position
>>> > > > >> > > > > lags
>>> > > > >> > > > > > > the
>>> > > > >> > > > > > > > > latest offset in the changelog. If so, then it
>>> reads
>>> > > the
>>> > > > >> > > missing
>>> > > > >> > > > > > > changelog
>>> > > > >> > > > > > > > > records before starting processing.
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > > > Thus, it would not restore any old window data.
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > > > There might be a few different things going on
>>> to
>>> > > explain
>>> > > > >> your
>>> > > > >> > > > > > > observation:
>>> > > > >> > > > > > > > > * if there is more than one instance in your
>>> Streams
>>> > > > >> cluster,
>>> > > > >> > > > > maybe the
>>> > > > >> > > > > > > > > task is "flopping" between instances, so each
>>> instance
>>> > > > >> still
>>> > > > >> > > has to
>>> > > > >> > > > > > > recover
>>> > > > >> > > > > > > > > state, since it wasn't the last one actively
>>> > > processing
>>> > > > >> it.
>>> > > > >> > > > > > > > > * if the application isn't stopped gracefully,
>>> it
>>> > > might
>>> > > > >> not
>>> > > > >> > > get a
>>> > > > >> > > > > > > chance
>>> > > > >> > > > > > > > > to record its offset in that local file, so on
>>> > > restart it
>>> > > > >> has
>>> > > > >> > > to
>>> > > > >> > > > > > > restore
>>> > > > >> > > > > > > > > some or all of the state store from changelog.
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > > > Or it could be something else; that's just what
>>> comes
>>> > > to
>>> > > > >> mind.
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > > > If you want to get to the bottom of it, you can
>>> take a
>>> > > > >> look at
>>> > > > >> > > the
>>> > > > >> > > > > > > logs,
>>> > > > >> > > > > > > > > paying close attention to which tasks are
>>> assigned to
>>> > > > >> which
>>> > > > >> > > > > instances
>>> > > > >> > > > > > > after
>>> > > > >> > > > > > > > > each restart. You can also look into the logs
>>> from
>>> > > > >> > > > > > > > >
>>> > > > >> > >
>>> > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
>>> > > > >> > > > > > > (might
>>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE level to
>>> really see
>>> > > > >> what's
>>> > > > >> > > > > happening).
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > > > I hope this helps!
>>> > > > >> > > > > > > > > -John
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25, Alessandro
>>> Tagliapietra
>>> > > > >> wrote:
>>> > > > >> > > > > > > > > > Hello everyone,
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > > > we're having a problem with bandwidth usage on
>>> > > streams
>>> > > > >> > > > > application
>>> > > > >> > > > > > > > > startup,
>>> > > > >> > > > > > > > > > our current setup does this:
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > > > ...
>>> > > > >> > > > > > > > > > .groupByKey()
>>> > > > >> > > > > > > > > >
>>> > > > >> > >
>>> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
>>> > > > >> > > > > > > > > > .aggregate(
>>> > > > >> > > > > > > > > >         { MetricSequenceList(ArrayList()) },
>>> > > > >> > > > > > > > > >         { key, value, aggregate ->
>>> > > > >> > > > > > > > > >             aggregate.getRecords().add(value)
>>> > > > >> > > > > > > > > >             aggregate
>>> > > > >> > > > > > > > > >         },
>>> > > > >> > > > > > > > > >         Materialized.`as`<String,
>>> > > MetricSequenceList,
>>> > > > >> > > > > > > WindowStore<Bytes,
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > >
>>> > > > >> > > > >
>>> > > > >> > >
>>> > > > >>
>>> > >
>>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>>> > > > >> > > > > > > > > > )
>>> > > > >> > > > > > > > > > .toStream()
>>> > > > >> > > > > > > > > > .flatTransform(TransformerSupplier {
>>> > > > >> > > > > > > > > > ...
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > > > basically in each window we append the new
>>> values
>>> > > and
>>> > > > >> then do
>>> > > > >> > > > > some
>>> > > > >> > > > > > > other
>>> > > > >> > > > > > > > > > logic with the array of windowed values.
>>> > > > >> > > > > > > > > > The aggregate-store changelog topic
>>> configuration
>>> > > uses
>>> > > > >> > > > > > > compact,delete as
>>> > > > >> > > > > > > > > > cleanup policy and has 12 hours of retention.
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > > > What we've seen is that on application
>>> startup it
>>> > > takes
>>> > > > >> a
>>> > > > >> > > couple
>>> > > > >> > > > > > > minutes
>>> > > > >> > > > > > > > > to
>>> > > > >> > > > > > > > > > rebuild the state store, even if the state
>>> store
>>> > > > >> directory is
>>> > > > >> > > > > > > persisted
>>> > > > >> > > > > > > > > > across restarts. That along with an exception
>>> that
>>> > > > >> caused the
>>> > > > >> > > > > docker
>>> > > > >> > > > > > > > > > container to be restarted a couple hundreds
>>> times
>>> > > > >> caused a
>>> > > > >> > > big
>>> > > > >> > > > > > > confluent
>>> > > > >> > > > > > > > > > cloud bill compared to what we usually spend
>>> (1/4
>>> > > of a
>>> > > > >> full
>>> > > > >> > > > > month in
>>> > > > >> > > > > > > 1
>>> > > > >> > > > > > > > > day).
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > > > What I think is happening is that the topic is
>>> > > keeping
>>> > > > >> all
>>> > > > >> > > the
>>> > > > >> > > > > > > previous
>>> > > > >> > > > > > > > > > windows even with the compacting policy
>>> because each
>>> > > > >> key is
>>> > > > >> > > the
>>> > > > >> > > > > > > original
>>> > > > >> > > > > > > > > > key + the timestamp not just the key. Since
>>> we don't
>>> > > > >> care
>>> > > > >> > > about
>>> > > > >> > > > > > > previous
>>> > > > >> > > > > > > > > > windows as the flatTransform after the
>>> toStream()
>>> > > makes
>>> > > > >> sure
>>> > > > >> > > > > that we
>>> > > > >> > > > > > > > > don't
>>> > > > >> > > > > > > > > > process old windows (a custom suppressor
>>> basically)
>>> > > is
>>> > > > >> there
>>> > > > >> > > a
>>> > > > >> > > > > way to
>>> > > > >> > > > > > > > > only
>>> > > > >> > > > > > > > > > keep the last window so that the store
>>> rebuilding
>>> > > goes
>>> > > > >> > > faster and
>>> > > > >> > > > > > > without
>>> > > > >> > > > > > > > > > rebuilding old windows too? Or should I
>>> create a
>>> > > custom
>>> > > > >> > > window
>>> > > > >> > > > > using
>>> > > > >> > > > > > > the
>>> > > > >> > > > > > > > > > original key as key so that the compaction
>>> keeps
>>> > > only
>>> > > > >> the
>>> > > > >> > > last
>>> > > > >> > > > > window
>>> > > > >> > > > > > > > > data?
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > > > Thank you
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > > > --
>>> > > > >> > > > > > > > > > Alessandro Tagliapietra
>>> > > > >> > > > > > > > > >
>>> > > > >> > > > > > > > >
>>> > > > >> > > > > > > >
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to