You're saying that with a 100ms commit interval, caching won't help because
it would still send the compacted changes to the changelog every 100ms?

Regarding the custom state store I'll look into that because I didn't go
much further than transformers and stores in my kafka experience so I'll
need to understand better what that implies.

Yeah I only have one window per key in the store.

The only thing I don't understand is why cache works 80% of the time and
then suddenly the changelog sent bytes increase 90x.
I mean, if cache wasn't working, why enabling it in our pipeline decreased
the sent bytes from 30-40MB/minute to 400KB/minute?

I'll look into the custom state store tho.

Thanks

--
Alessandro Tagliapietra



On Mon, Dec 9, 2019 at 7:02 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Alright, well I see why you have so much data being sent to the changelog
> if each
> update involves appending to a list and then writing in the whole list. And
> with 340
> records/minute I'm actually not sure how the cache could really help at all
> when it's
> being flushed every 100ms.
>
> Here's kind of a wild idea, if you really only need append semantics: what
> if you wrote
> a custom StateStore that wrapped the normal RocksDBStore (or
> RocksDBWindowStore)
> and did the append for you under the hood? The changelogging layer sits
> between the
> layer that you would call #put on in your transformer and the final layer
> that actually writes
> to the underlying storage engine. If you insert an extra layer and modify
> your transformer
> to only call put on the new data (rather than the entire list) then only
> this new data will get
> sent to the changelog. Your custom storage layer will know it's actually
> append semantics,
> and add the new data to the existing list before sending it on to RocksDB.
>
> Since you only ever have one window per key in the store (right?) you just
> need to make
> sure that nothing from the current window gets deleted prematurely. You'd
> want to turn off
> compaction on the changelog and caching on the store of course, and maybe
> give the
> changelog some extra retention time to be safe.
>
> Obviously I haven't thoroughly verified this alternative, but it seems like
> this approach (or
> something to its effect) could help you cut down on the changelog data.
> WDYT?
>
> On Mon, Dec 9, 2019 at 4:35 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > Just to give a better context, yes we use EOS and the problem happens in
> > our aggregation store.
> > Basically when windowing data we append each record into a list that's
> > stored in the aggregation store.
> > We have 2 versions, in production we use the kafka streams windowing API,
> > in staging we manually calculate the window end timestamp and aggregate
> > using that timestamp.
> >
> > To give you an example of the staging code, it's a simple transformer
> that:
> >  - if incoming data fits in the same window as the data in store, append
> > the data to the existing store list overwriting the same key and nothing
> is
> > sent downstream
> >  - if incoming data has a timestamp smaller than the existing store data,
> > discard the record
> >  - if incoming data has a timestamp bigger than the existing store data,
> > send the stored list downstream and store the new window data into the
> > store
> >
> > This way we don't use multiple keys (kafka streams instead uses a store
> > where each key is stream-key + window key) as we overwrite the store data
> > using the same key over and over.
> > So what I would expect is that since we're overwriting the same keys
> there
> > isn't more  and more data to be cached as the number of keys are always
> the
> > same and we don't really need to cache more data over time.
> >
> > To respond to your questions:
> >  - yes when I say that cache "stopped/started" working I mean that at
> some
> > point the store started sending more and more data to che changelog topic
> > and then suddenly stopped again even without a restart (a restart always
> > fixes the problem).
> >  - Yes there are no density changes in the input stream, I've checked the
> > number of records sent to the stream input topic and there is a variation
> > of ~10-20 records per minute on an average of 340 records per minute.
> Most
> > of the records are also generated by simulators with very predictable
> > output rate.
> >
> > In the meantime I've enabled reporting of debug metrics (so including
> cache
> > hit ratio) to hopefully get better insights the next time it happens.
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> >
> > > It's an LRU cache, so once it gets full new records will cause older
> ones
> > > to be evicted (and thus sent
> > > downstream). Of course this should only apply to records of a different
> > > key, otherwise it will just cause
> > > an update of that key in the cache.
> > >
> > > I missed that you were using EOS, given the short commit interval it's
> > hard
> > > to see those effects.
> > > When you say that it stopped working and then appeared to start working
> > > again, are you just
> > > referring to the amount of data being sent to the changelog? And you
> can
> > > definitely rule out differences
> > > in the density of updates in the input stream?
> > >
> > >
> > >
> > > On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra <
> > > tagliapietra.alessan...@gmail.com> wrote:
> > >
> > > > Hi Sophie,
> > > >
> > > > thanks fo helping.
> > > >
> > > > By eviction of older records you mean they get flushed to the
> changelog
> > > > topic?
> > > > Or the cache is just full and so all new records go to the changelog
> > > topic
> > > > until the old ones are evicted?
> > > >
> > > > Regarding the timing, what timing do you mean? Between when the cache
> > > stops
> > > > and starts working again? We're using EOS os I believe the commit
> > > interval
> > > > is every 100ms.
> > > >
> > > > Regards
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > >
> > > >
> > > > On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman <
> > sop...@confluent.io
> > > >
> > > > wrote:
> > > >
> > > > > It might be that the cache appears to "stop working" because it
> gets
> > > > full,
> > > > > and each
> > > > > new update causes an eviction (of some older record). This would
> also
> > > > > explain the
> > > > > opposite behavior, that it "starts working" again after some time
> > > without
> > > > > being restarted,
> > > > > since the cache is completely flushed on commit. Does the timing
> seem
> > > to
> > > > > align with your
> > > > > commit interval (default is 30s)?
> > > > >
> > > > > On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
> > > > > tagliapietra.alessan...@gmail.com> wrote:
> > > > >
> > > > > > And it seems that for some reason after a while caching works
> again
> > > > > > without a restart of the streams application
> > > > > >
> > > > > > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
> > > > > >
> > > > > > I'll try to enable debug metrics and see if I can find something
> > > useful
> > > > > > there.
> > > > > > Any idea is appreciated in the meantime :)
> > > > > >
> > > > > > --
> > > > > > Alessandro Tagliapietra
> > > > > >
> > > > > > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
> > > > > > tagliapietra.alessan...@gmail.com> wrote:
> > > > > >
> > > > > >> It seems that even with caching enabled, after a while the sent
> > > bytes
> > > > > >> stil go up
> > > > > >>
> > > > > >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
> > > > > >>
> > > > > >> you can see the deploy when I've enabled caching but it looks
> like
> > > > it's
> > > > > >> still a temporary solution.
> > > > > >>
> > > > > >> --
> > > > > >> Alessandro Tagliapietra
> > > > > >>
> > > > > >>
> > > > > >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
> > > > > >> tagliapietra.alessan...@gmail.com> wrote:
> > > > > >>
> > > > > >>> Could be, but since we have a limite amount of input keys
> (~30),
> > > > > >>> windowing generates new keys but old ones are never touched
> again
> > > > > since the
> > > > > >>> data per key is in order, I assume it shouldn't be a big deal
> for
> > > it
> > > > to
> > > > > >>> handle 30 keys
> > > > > >>> I'll have a look at cache metrics and see if something pops out
> > > > > >>>
> > > > > >>> Thanks
> > > > > >>>
> > > > > >>> --
> > > > > >>> Alessandro Tagliapietra
> > > > > >>>
> > > > > >>>
> > > > > >>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler <
> > vvcep...@apache.org>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> Hmm, that’s a good question. Now that we’re talking about
> > > caching, I
> > > > > >>>> wonder if the cache was just too small. It’s not very big by
> > > > default.
> > > > > >>>>
> > > > > >>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
> > > > > >>>> > Ok I'll check on that!
> > > > > >>>> >
> > > > > >>>> > Now I can see that with caching we went from 3-4MB/s to
> > 400KB/s,
> > > > > that
> > > > > >>>> will
> > > > > >>>> > help with the bill.
> > > > > >>>> >
> > > > > >>>> > Last question, any reason why after a while the regular
> > windowed
> > > > > >>>> stream
> > > > > >>>> > starts sending every update instead of caching?
> > > > > >>>> > Could it be because it doesn't have any more memory
> available?
> > > Any
> > > > > >>>> other
> > > > > >>>> > possible reason?
> > > > > >>>> >
> > > > > >>>> > Thank you so much for your help
> > > > > >>>> >
> > > > > >>>> > --
> > > > > >>>> > Alessandro Tagliapietra
> > > > > >>>> >
> > > > > >>>> >
> > > > > >>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <
> > > vvcep...@apache.org>
> > > > > >>>> wrote:
> > > > > >>>> >
> > > > > >>>> > > Ah, yes. Glad you figured it out!
> > > > > >>>> > >
> > > > > >>>> > > Caching does not reduce EOS guarantees at all. I highly
> > > > recommend
> > > > > >>>> using
> > > > > >>>> > > it. You might even want to take a look at the caching
> > metrics
> > > to
> > > > > >>>> make sure
> > > > > >>>> > > you have a good hit ratio.
> > > > > >>>> > >
> > > > > >>>> > > -John
> > > > > >>>> > >
> > > > > >>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra
> > wrote:
> > > > > >>>> > > > Never mind I've found out I can use
> `.withCachingEnabled`
> > on
> > > > the
> > > > > >>>> store
> > > > > >>>> > > > builder to achieve the same thing as the windowing
> example
> > > as
> > > > > >>>> > > > `Materialized.as` turns that on by default.
> > > > > >>>> > > >
> > > > > >>>> > > > Does caching in any way reduces the EOS guarantees?
> > > > > >>>> > > >
> > > > > >>>> > > > --
> > > > > >>>> > > > Alessandro Tagliapietra
> > > > > >>>> > > >
> > > > > >>>> > > >
> > > > > >>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
> > > > > >>>> > > > tagliapietra.alessan...@gmail.com> wrote:
> > > > > >>>> > > >
> > > > > >>>> > > > > Seems my journey with this isn't done just yet,
> > > > > >>>> > > > >
> > > > > >>>> > > > > This seems very complicated to me but I'll try to
> > explain
> > > it
> > > > > as
> > > > > >>>> best I
> > > > > >>>> > > can.
> > > > > >>>> > > > > To better understand the streams network usage I've
> used
> > > > > >>>> prometheus
> > > > > >>>> > > with
> > > > > >>>> > > > > the JMX exporter to export kafka metrics.
> > > > > >>>> > > > > To check the amount of data we use I'm looking at the
> > > > > increments
> > > > > >>>> > > > > of kafka_producer_topic_metrics_byte_total and
> > > > > >>>> > > > >
> kafka_producer_producer_topic_metrics_record_send_total,
> > > > > >>>> > > > >
> > > > > >>>> > > > > Our current (before the change mentioned above) code
> > looks
> > > > > like
> > > > > >>>> this:
> > > > > >>>> > > > >
> > > > > >>>> > > > > // This transformers just pairs a value with the
> > previous
> > > > one
> > > > > >>>> storing
> > > > > >>>> > > the
> > > > > >>>> > > > > temporary one in a store
> > > > > >>>> > > > > val pairsStream = metricStream
> > > > > >>>> > > > >   .transformValues(ValueTransformerWithKeySupplier {
> > > > > >>>> PairTransformer()
> > > > > >>>> > > },
> > > > > >>>> > > > > "LastValueStore")
> > > > > >>>> > > > >   .filter { _, value: MetricSequence? -> value !=
> null }
> > > > > >>>> > > > >
> > > > > >>>> > > > > // Create a store to store suppressed windows until a
> > new
> > > > one
> > > > > is
> > > > > >>>> > > received
> > > > > >>>> > > > > val suppressStoreSupplier =
> > > > > >>>> > > > >
> > > > > >>>> > >
> > > > > >>>>
> > > > >
> > > >
> > >
> >
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
> > > > > >>>> > > > > ......
> > > > > >>>> > > > >
> > > > > >>>> > > > > // Window and aggregate data in 1 minute intervals
> > > > > >>>> > > > > val aggregatedStream = pairsStream
> > > > > >>>> > > > >   .groupByKey()
> > > > > >>>> > > > >
> > > > >  .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > > > >>>> > > > >   .aggregate(
> > > > > >>>> > > > >           { MetricSequenceList(ArrayList()) },
> > > > > >>>> > > > >           { key, value, aggregate ->
> > > > > >>>> > > > >               aggregate.getRecords().add(value)
> > > > > >>>> > > > >               aggregate
> > > > > >>>> > > > >           },
> > > > > >>>> > > > >           Materialized.`as`<String,
> MetricSequenceList,
> > > > > >>>> > > WindowStore<Bytes,
> > > > > >>>> > > > >
> > > > > >>>> > >
> > > > > >>>>
> > > > >
> > > >
> > >
> >
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > > > >>>> > > > >   )
> > > > > >>>> > > > >   .toStream()
> > > > > >>>> > > > >   .flatTransform(TransformerSupplier {
> > > > > >>>> > > > >       // This transformer basically waits until a new
> > > window
> > > > > is
> > > > > >>>> > > received
> > > > > >>>> > > > > to emit the previous one
> > > > > >>>> > > > >   }, "suppress-store")
> > > > > >>>> > > > >   .map { sensorId: String, suppressedOutput:
> > > > SuppressedOutput
> > > > > ->
> > > > > >>>> > > > >       .... etc ....
> > > > > >>>> > > > >
> > > > > >>>> > > > >
> > > > > >>>> > > > > Basically:
> > > > > >>>> > > > >  - all data goes through LastValueStore store that
> > stores
> > > > each
> > > > > >>>> message
> > > > > >>>> > > and
> > > > > >>>> > > > > emits a pair with the previous one
> > > > > >>>> > > > >  - the aggregate-store is used to store the per-window
> > > list
> > > > of
> > > > > >>>> > > messages in
> > > > > >>>> > > > > the aggregate method
> > > > > >>>> > > > >  - the suppress store is used to store each received
> > > window
> > > > > >>>> which is
> > > > > >>>> > > > > emitted only after a newer one is received
> > > > > >>>> > > > >
> > > > > >>>> > > > > What I'm experiencing is that:
> > > > > >>>> > > > >  - during normal execution, the streams app sends to
> the
> > > > > >>>> lastvalue
> > > > > >>>> > > store
> > > > > >>>> > > > > changelog topic 5k messages/min, the aggregate and
> > > suppress
> > > > > >>>> store
> > > > > >>>> > > changelog
> > > > > >>>> > > > > topics only about 100
> > > > > >>>> > > > >  - at some point (after many hours of operation), the
> > > > streams
> > > > > >>>> app
> > > > > >>>> > > starts
> > > > > >>>> > > > > sending to the aggregate and suppress store changelog
> > > topic
> > > > > the
> > > > > >>>> same
> > > > > >>>> > > amount
> > > > > >>>> > > > > of messages going through the lastvaluestore
> > > > > >>>> > > > >  - if I restart the streams app it goes back to the
> > > initial
> > > > > >>>> behavior
> > > > > >>>> > > > >
> > > > > >>>> > > > > You can see the behavior in this graph
> > > > > >>>> https://imgur.com/dJcUNSf
> > > > > >>>> > > > > You can also see that after a restart everything goes
> > back
> > > > to
> > > > > >>>> normal
> > > > > >>>> > > > > levels.
> > > > > >>>> > > > > Regarding other metrics, process latency increases,
> poll
> > > > > latency
> > > > > >>>> > > > > decreases, poll rate decreases, commit rate stays the
> > same
> > > > > >>>> while commit
> > > > > >>>> > > > > latency increases.
> > > > > >>>> > > > >
> > > > > >>>> > > > > Now, I've these questions:
> > > > > >>>> > > > >  - why isn't the aggregate/suppress store changelog
> > topic
> > > > > >>>> throughput
> > > > > >>>> > > the
> > > > > >>>> > > > > same as the LastValueStore? Shouldn't every time it
> > > > aggregates
> > > > > >>>> send a
> > > > > >>>> > > > > record to the changelog?
> > > > > >>>> > > > >  - is the windowing doing some internal caching like
> not
> > > > > >>>> sending every
> > > > > >>>> > > > > aggregation record until the window time is passed?
> (if
> > > so,
> > > > > >>>> where can I
> > > > > >>>> > > > > find that code since I would like to use that also for
> > our
> > > > new
> > > > > >>>> > > > > implementation)
> > > > > >>>> > > > >
> > > > > >>>> > > > > Thank you in advance
> > > > > >>>> > > > >
> > > > > >>>> > > > > --
> > > > > >>>> > > > > Alessandro Tagliapietra
> > > > > >>>> > > > >
> > > > > >>>> > > > >
> > > > > >>>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <
> > > > > >>>> vvcep...@apache.org>
> > > > > >>>> > > wrote:
> > > > > >>>> > > > >
> > > > > >>>> > > > >> Oh, good!
> > > > > >>>> > > > >>
> > > > > >>>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro
> Tagliapietra
> > > > wrote:
> > > > > >>>> > > > >> > Testing on staging shows that a restart on
> exception
> > is
> > > > > much
> > > > > >>>> faster
> > > > > >>>> > > and
> > > > > >>>> > > > >> the
> > > > > >>>> > > > >> > stream starts right away which I think means we're
> > > > reading
> > > > > >>>> way less
> > > > > >>>> > > data
> > > > > >>>> > > > >> > than before!
> > > > > >>>> > > > >> >
> > > > > >>>> > > > >> > What I was referring to is that, in Streams, the
> keys
> > > for
> > > > > >>>> window
> > > > > >>>> > > > >> > > aggregation state is actually composed of both
> the
> > > > window
> > > > > >>>> itself
> > > > > >>>> > > and
> > > > > >>>> > > > >> the
> > > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>".
> That
> > > > > results
> > > > > >>>> in the
> > > > > >>>> > > > >> store
> > > > > >>>> > > > >> > > having a unique key per window for each K, which
> is
> > > why
> > > > > we
> > > > > >>>> need
> > > > > >>>> > > > >> retention
> > > > > >>>> > > > >> > > as well as compaction for our changelogs. But for
> > > you,
> > > > if
> > > > > >>>> you just
> > > > > >>>> > > > >> make the
> > > > > >>>> > > > >> > > key "K", then compaction alone should do the
> trick.
> > > > > >>>> > > > >> >
> > > > > >>>> > > > >> > Yes we had compact,delete as cleanup policy but
> > > probably
> > > > it
> > > > > >>>> still
> > > > > >>>> > > had a
> > > > > >>>> > > > >> too
> > > > > >>>> > > > >> > long retention value, also the rocksdb store is
> > > probably
> > > > > much
> > > > > >>>> > > faster now
> > > > > >>>> > > > >> > having only one key per key instead of one key per
> > > window
> > > > > >>>> per key.
> > > > > >>>> > > > >> >
> > > > > >>>> > > > >> > Thanks a lot for helping! I'm now going to setup a
> > > > > >>>> prometheus-jmx
> > > > > >>>> > > > >> > monitoring so we can keep better track of what's
> > going
> > > on
> > > > > :)
> > > > > >>>> > > > >> >
> > > > > >>>> > > > >> > --
> > > > > >>>> > > > >> > Alessandro Tagliapietra
> > > > > >>>> > > > >> >
> > > > > >>>> > > > >> >
> > > > > >>>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <
> > > > > >>>> vvcep...@apache.org>
> > > > > >>>> > > > >> wrote:
> > > > > >>>> > > > >> >
> > > > > >>>> > > > >> > > Oh, yeah, I remember that conversation!
> > > > > >>>> > > > >> > >
> > > > > >>>> > > > >> > > Yes, then, I agree, if you're only storing state
> of
> > > the
> > > > > >>>> most
> > > > > >>>> > > recent
> > > > > >>>> > > > >> window
> > > > > >>>> > > > >> > > for each key, and the key you use for that state
> is
> > > > > >>>> actually the
> > > > > >>>> > > key
> > > > > >>>> > > > >> of the
> > > > > >>>> > > > >> > > records, then an aggressive compaction policy
> plus
> > > your
> > > > > >>>> custom
> > > > > >>>> > > > >> transformer
> > > > > >>>> > > > >> > > seems like a good way forward.
> > > > > >>>> > > > >> > >
> > > > > >>>> > > > >> > > What I was referring to is that, in Streams, the
> > keys
> > > > for
> > > > > >>>> window
> > > > > >>>> > > > >> > > aggregation state is actually composed of both
> the
> > > > window
> > > > > >>>> itself
> > > > > >>>> > > and
> > > > > >>>> > > > >> the
> > > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>".
> That
> > > > > results
> > > > > >>>> in the
> > > > > >>>> > > > >> store
> > > > > >>>> > > > >> > > having a unique key per window for each K, which
> is
> > > why
> > > > > we
> > > > > >>>> need
> > > > > >>>> > > > >> retention
> > > > > >>>> > > > >> > > as well as compaction for our changelogs. But for
> > > you,
> > > > if
> > > > > >>>> you just
> > > > > >>>> > > > >> make the
> > > > > >>>> > > > >> > > key "K", then compaction alone should do the
> trick.
> > > > > >>>> > > > >> > >
> > > > > >>>> > > > >> > > And yes, if you manage the topic yourself, then
> > > Streams
> > > > > >>>> won't
> > > > > >>>> > > adjust
> > > > > >>>> > > > >> the
> > > > > >>>> > > > >> > > retention time. I think it might validate that
> the
> > > > > >>>> retention
> > > > > >>>> > > isn't too
> > > > > >>>> > > > >> > > short, but I don't remember offhand.
> > > > > >>>> > > > >> > >
> > > > > >>>> > > > >> > > Cheers, and let me know how it goes!
> > > > > >>>> > > > >> > > -John
> > > > > >>>> > > > >> > >
> > > > > >>>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro
> > > Tagliapietra
> > > > > >>>> wrote:
> > > > > >>>> > > > >> > > > Hi John,
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > > > afaik grace period uses stream time
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > >
> > > > > >>>> > > > >>
> > > > > >>>> > >
> > > > > >>>>
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
> > > > > >>>> > > > >> > > > which is
> > > > > >>>> > > > >> > > > per partition, unfortunately we process data
> > that's
> > > > not
> > > > > >>>> in sync
> > > > > >>>> > > > >> between
> > > > > >>>> > > > >> > > > keys so each key needs to be independent and a
> > key
> > > > can
> > > > > >>>> have much
> > > > > >>>> > > > >> older
> > > > > >>>> > > > >> > > > data
> > > > > >>>> > > > >> > > > than the other.
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > > > Having a small grace period would probably
> close
> > > old
> > > > > >>>> windows
> > > > > >>>> > > sooner
> > > > > >>>> > > > >> than
> > > > > >>>> > > > >> > > > expected. That's also why in my use case a
> custom
> > > > store
> > > > > >>>> that
> > > > > >>>> > > just
> > > > > >>>> > > > >> stores
> > > > > >>>> > > > >> > > > the last window data for each key might work
> > > better.
> > > > I
> > > > > >>>> had the
> > > > > >>>> > > same
> > > > > >>>> > > > >> issue
> > > > > >>>> > > > >> > > > with suppression and it has been reported here
> > > > > >>>> > > > >> > > >
> https://issues.apache.org/jira/browse/KAFKA-8769
> > > > > >>>> > > > >> > > > Oh I just saw that you're the one that helped
> me
> > on
> > > > > >>>> slack and
> > > > > >>>> > > > >> created the
> > > > > >>>> > > > >> > > > issue (thanks again for that).
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > > > The behavior that you mention about streams
> > setting
> > > > > >>>> changelog
> > > > > >>>> > > > >> retention
> > > > > >>>> > > > >> > > > time is something they do on creation of the
> > topic
> > > > when
> > > > > >>>> the
> > > > > >>>> > > broker
> > > > > >>>> > > > >> has
> > > > > >>>> > > > >> > > auto
> > > > > >>>> > > > >> > > > creation enabled? Because we're using confluent
> > > cloud
> > > > > >>>> and I had
> > > > > >>>> > > to
> > > > > >>>> > > > >> create
> > > > > >>>> > > > >> > > > it manually.
> > > > > >>>> > > > >> > > > Regarding the change in the recovery behavior,
> > with
> > > > > >>>> compact
> > > > > >>>> > > cleanup
> > > > > >>>> > > > >> > > policy
> > > > > >>>> > > > >> > > > shouldn't the changelog only keep the last
> value?
> > > > That
> > > > > >>>> would
> > > > > >>>> > > make
> > > > > >>>> > > > >> the
> > > > > >>>> > > > >> > > > recovery faster and "cheaper" as it would only
> > need
> > > > to
> > > > > >>>> read a
> > > > > >>>> > > single
> > > > > >>>> > > > >> > > value
> > > > > >>>> > > > >> > > > per key (if the cleanup just happened) right?
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > > > --
> > > > > >>>> > > > >> > > > Alessandro Tagliapietra
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler <
> > > > > >>>> > > vvcep...@apache.org>
> > > > > >>>> > > > >> wrote:
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > > > > Hey Alessandro,
> > > > > >>>> > > > >> > > > >
> > > > > >>>> > > > >> > > > > That sounds also like it would work. I'm
> > > wondering
> > > > if
> > > > > >>>> it would
> > > > > >>>> > > > >> actually
> > > > > >>>> > > > >> > > > > change what you observe w.r.t. recovery
> > behavior,
> > > > > >>>> though.
> > > > > >>>> > > Streams
> > > > > >>>> > > > >> > > already
> > > > > >>>> > > > >> > > > > sets the retention time on the changelog to
> > equal
> > > > the
> > > > > >>>> > > retention
> > > > > >>>> > > > >> time
> > > > > >>>> > > > >> > > of the
> > > > > >>>> > > > >> > > > > windows, for windowed aggregations, so you
> > > > shouldn't
> > > > > be
> > > > > >>>> > > loading a
> > > > > >>>> > > > >> lot
> > > > > >>>> > > > >> > > of
> > > > > >>>> > > > >> > > > > window data for old windows you no longer
> care
> > > > about.
> > > > > >>>> > > > >> > > > >
> > > > > >>>> > > > >> > > > > Have you set the "grace period" on your
> window
> > > > > >>>> definition? By
> > > > > >>>> > > > >> default,
> > > > > >>>> > > > >> > > it
> > > > > >>>> > > > >> > > > > is set to 24 hours, but you can set it as low
> > as
> > > > you
> > > > > >>>> like.
> > > > > >>>> > > E.g.,
> > > > > >>>> > > > >> if you
> > > > > >>>> > > > >> > > > > want to commit to having in-order data only,
> > then
> > > > you
> > > > > >>>> can set
> > > > > >>>> > > the
> > > > > >>>> > > > >> grace
> > > > > >>>> > > > >> > > > > period to zero. This _should_ let the broker
> > > clean
> > > > up
> > > > > >>>> the
> > > > > >>>> > > > >> changelog
> > > > > >>>> > > > >> > > records
> > > > > >>>> > > > >> > > > > as soon as the window ends.
> > > > > >>>> > > > >> > > > >
> > > > > >>>> > > > >> > > > > Of course, the log cleaner doesn't run all
> the
> > > > time,
> > > > > so
> > > > > >>>> > > there's
> > > > > >>>> > > > >> some
> > > > > >>>> > > > >> > > extra
> > > > > >>>> > > > >> > > > > delay in which "expired" data would still be
> > > > visible
> > > > > >>>> in the
> > > > > >>>> > > > >> changelog,
> > > > > >>>> > > > >> > > but
> > > > > >>>> > > > >> > > > > it would actually be just the same as if you
> > > manage
> > > > > >>>> the store
> > > > > >>>> > > > >> yourself.
> > > > > >>>> > > > >> > > > >
> > > > > >>>> > > > >> > > > > Hope this helps!
> > > > > >>>> > > > >> > > > > -John
> > > > > >>>> > > > >> > > > >
> > > > > >>>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro
> > > > > Tagliapietra
> > > > > >>>> wrote:
> > > > > >>>> > > > >> > > > > > Thanks John for the explanation,
> > > > > >>>> > > > >> > > > > >
> > > > > >>>> > > > >> > > > > > I thought that with EOS enabled (which we
> > have)
> > > > it
> > > > > >>>> would in
> > > > > >>>> > > the
> > > > > >>>> > > > >> worst
> > > > > >>>> > > > >> > > > > case
> > > > > >>>> > > > >> > > > > > find a valid checkpoint and start the
> restore
> > > > from
> > > > > >>>> there
> > > > > >>>> > > until
> > > > > >>>> > > > >> it
> > > > > >>>> > > > >> > > reached
> > > > > >>>> > > > >> > > > > > the last committed status, not completely
> > from
> > > > > >>>> scratch. What
> > > > > >>>> > > > >> you say
> > > > > >>>> > > > >> > > > > > definitely makes sense now.
> > > > > >>>> > > > >> > > > > > Since we don't really need old time windows
> > and
> > > > we
> > > > > >>>> ensure
> > > > > >>>> > > data
> > > > > >>>> > > > >> is
> > > > > >>>> > > > >> > > ordered
> > > > > >>>> > > > >> > > > > > when processed I think I"ll just write a
> > custom
> > > > > >>>> transformer
> > > > > >>>> > > to
> > > > > >>>> > > > >> keep
> > > > > >>>> > > > >> > > only
> > > > > >>>> > > > >> > > > > > the last window, store intermediate
> > aggregation
> > > > > >>>> results in
> > > > > >>>> > > the
> > > > > >>>> > > > >> store
> > > > > >>>> > > > >> > > and
> > > > > >>>> > > > >> > > > > > emit a new value only when we receive data
> > > > > belonging
> > > > > >>>> to a
> > > > > >>>> > > new
> > > > > >>>> > > > >> window.
> > > > > >>>> > > > >> > > > > > That with a compact only changelog topic
> > should
> > > > > keep
> > > > > >>>> the
> > > > > >>>> > > rebuild
> > > > > >>>> > > > >> > > data to
> > > > > >>>> > > > >> > > > > > the minimum as it would have only the last
> > > value
> > > > > for
> > > > > >>>> each
> > > > > >>>> > > key.
> > > > > >>>> > > > >> > > > > >
> > > > > >>>> > > > >> > > > > > Hope that makes sense
> > > > > >>>> > > > >> > > > > >
> > > > > >>>> > > > >> > > > > > Thanks again
> > > > > >>>> > > > >> > > > > >
> > > > > >>>> > > > >> > > > > > --
> > > > > >>>> > > > >> > > > > > Alessandro Tagliapietra
> > > > > >>>> > > > >> > > > > >
> > > > > >>>> > > > >> > > > > >
> > > > > >>>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John
> Roesler <
> > > > > >>>> > > > >> vvcep...@apache.org>
> > > > > >>>> > > > >> > > wrote:
> > > > > >>>> > > > >> > > > > >
> > > > > >>>> > > > >> > > > > > > Hi Alessandro,
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > > > To take a stab at your question, maybe it
> > > first
> > > > > >>>> doesn't
> > > > > >>>> > > find
> > > > > >>>> > > > >> it,
> > > > > >>>> > > > >> > > but
> > > > > >>>> > > > >> > > > > then
> > > > > >>>> > > > >> > > > > > > restores some data, writes the
> checkpoint,
> > > and
> > > > > >>>> then later
> > > > > >>>> > > on,
> > > > > >>>> > > > >> it
> > > > > >>>> > > > >> > > has to
> > > > > >>>> > > > >> > > > > > > re-initialize the task for some reason,
> and
> > > > > that's
> > > > > >>>> why it
> > > > > >>>> > > does
> > > > > >>>> > > > >> > > find a
> > > > > >>>> > > > >> > > > > > > checkpoint then?
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > > > More to the heart of the issue, if you
> have
> > > EOS
> > > > > >>>> enabled,
> > > > > >>>> > > > >> Streams
> > > > > >>>> > > > >> > > _only_
> > > > > >>>> > > > >> > > > > > > records the checkpoint when the store is
> > in a
> > > > > >>>> > > known-consistent
> > > > > >>>> > > > >> > > state.
> > > > > >>>> > > > >> > > > > For
> > > > > >>>> > > > >> > > > > > > example, if you have a graceful shutdown,
> > > > Streams
> > > > > >>>> will
> > > > > >>>> > > flush
> > > > > >>>> > > > >> all
> > > > > >>>> > > > >> > > the
> > > > > >>>> > > > >> > > > > > > stores, commit all the transactions, and
> > then
> > > > > >>>> write the
> > > > > >>>> > > > >> checkpoint
> > > > > >>>> > > > >> > > > > file.
> > > > > >>>> > > > >> > > > > > > Then, on re-start, it will pick up from
> > that
> > > > > >>>> checkpoint.
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > > > But as soon as it starts processing
> > records,
> > > it
> > > > > >>>> removes
> > > > > >>>> > > the
> > > > > >>>> > > > >> > > checkpoint
> > > > > >>>> > > > >> > > > > > > file, so if it crashes while it was
> > > processing,
> > > > > >>>> there is
> > > > > >>>> > > no
> > > > > >>>> > > > >> > > checkpoint
> > > > > >>>> > > > >> > > > > file
> > > > > >>>> > > > >> > > > > > > there, and it will have to restore from
> the
> > > > > >>>> beginning of
> > > > > >>>> > > the
> > > > > >>>> > > > >> > > changelog.
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > > > This design is there on purpose, because
> > > > > otherwise
> > > > > >>>> we
> > > > > >>>> > > cannot
> > > > > >>>> > > > >> > > actually
> > > > > >>>> > > > >> > > > > > > guarantee correctness... For example, if
> > you
> > > > are
> > > > > >>>> > > maintaining a
> > > > > >>>> > > > >> > > count
> > > > > >>>> > > > >> > > > > > > operation, and we process an input record
> > i,
> > > > > >>>> increment the
> > > > > >>>> > > > >> count
> > > > > >>>> > > > >> > > and
> > > > > >>>> > > > >> > > > > write
> > > > > >>>> > > > >> > > > > > > it to the state store, and to the
> changelog
> > > > > topic.
> > > > > >>>> But we
> > > > > >>>> > > > >> crash
> > > > > >>>> > > > >> > > before
> > > > > >>>> > > > >> > > > > we
> > > > > >>>> > > > >> > > > > > > commit that transaction. Then, the write
> to
> > > the
> > > > > >>>> changelog
> > > > > >>>> > > > >> would be
> > > > > >>>> > > > >> > > > > aborted,
> > > > > >>>> > > > >> > > > > > > and we would re-process record i .
> However,
> > > > we've
> > > > > >>>> already
> > > > > >>>> > > > >> updated
> > > > > >>>> > > > >> > > the
> > > > > >>>> > > > >> > > > > local
> > > > > >>>> > > > >> > > > > > > state store, so when we increment it
> again,
> > > it
> > > > > >>>> results in
> > > > > >>>> > > > >> > > > > double-counting
> > > > > >>>> > > > >> > > > > > > i. The key point here is that there's no
> > way
> > > to
> > > > > do
> > > > > >>>> an
> > > > > >>>> > > atomic
> > > > > >>>> > > > >> > > operation
> > > > > >>>> > > > >> > > > > > > across two different systems (local state
> > > store
> > > > > >>>> and the
> > > > > >>>> > > > >> changelog
> > > > > >>>> > > > >> > > > > topic).
> > > > > >>>> > > > >> > > > > > > Since we can't guarantee that we roll
> back
> > > the
> > > > > >>>> incremented
> > > > > >>>> > > > >> count
> > > > > >>>> > > > >> > > when
> > > > > >>>> > > > >> > > > > the
> > > > > >>>> > > > >> > > > > > > changelog transaction is aborted, we
> can't
> > > keep
> > > > > >>>> the local
> > > > > >>>> > > > >> store
> > > > > >>>> > > > >> > > > > consistent
> > > > > >>>> > > > >> > > > > > > with the changelog.
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > > > After a crash, the only way to ensure the
> > > local
> > > > > >>>> store is
> > > > > >>>> > > > >> consistent
> > > > > >>>> > > > >> > > > > with
> > > > > >>>> > > > >> > > > > > > the changelog is to discard the entire
> > thing
> > > > and
> > > > > >>>> rebuild
> > > > > >>>> > > it.
> > > > > >>>> > > > >> This
> > > > > >>>> > > > >> > > is
> > > > > >>>> > > > >> > > > > why we
> > > > > >>>> > > > >> > > > > > > have an invariant that the checkpoint
> file
> > > only
> > > > > >>>> exists
> > > > > >>>> > > when we
> > > > > >>>> > > > >> > > _know_
> > > > > >>>> > > > >> > > > > that
> > > > > >>>> > > > >> > > > > > > the local store is consistent with the
> > > > changelog,
> > > > > >>>> and
> > > > > >>>> > > this is
> > > > > >>>> > > > >> why
> > > > > >>>> > > > >> > > > > you're
> > > > > >>>> > > > >> > > > > > > seeing so much bandwidth when re-starting
> > > from
> > > > an
> > > > > >>>> unclean
> > > > > >>>> > > > >> shutdown.
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > > > Note that it's definitely possible to do
> > > better
> > > > > >>>> than this,
> > > > > >>>> > > > >> and we
> > > > > >>>> > > > >> > > would
> > > > > >>>> > > > >> > > > > > > very much like to improve it in the
> future.
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > > > Thanks,
> > > > > >>>> > > > >> > > > > > > -John
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro
> > > > > >>>> Tagliapietra
> > > > > >>>> > > wrote:
> > > > > >>>> > > > >> > > > > > > > Hi John,
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > > thanks a lot for helping, regarding
> your
> > > > > message:
> > > > > >>>> > > > >> > > > > > > >  - no we only have 1 instance of the
> > stream
> > > > > >>>> application,
> > > > > >>>> > > > >> and it
> > > > > >>>> > > > >> > > > > always
> > > > > >>>> > > > >> > > > > > > > re-uses the same state folder
> > > > > >>>> > > > >> > > > > > > >  - yes we're seeing most issues when
> > > > restarting
> > > > > >>>> not
> > > > > >>>> > > > >> gracefully
> > > > > >>>> > > > >> > > due
> > > > > >>>> > > > >> > > > > > > exception
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > > I've enabled trace logging and
> filtering
> > > by a
> > > > > >>>> single
> > > > > >>>> > > state
> > > > > >>>> > > > >> store
> > > > > >>>> > > > >> > > the
> > > > > >>>> > > > >> > > > > > > > StoreChangelogReader messages are:
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-0
> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-1
> > > > > >>>> > > > >> > > > > > > > Added restorer for changelog
> > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-2
> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-2
> > > > for
> > > > > >>>> store
> > > > > >>>> > > > >> > > aggregate-store,
> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-1
> > > > for
> > > > > >>>> store
> > > > > >>>> > > > >> > > aggregate-store,
> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-0
> > > > for
> > > > > >>>> store
> > > > > >>>> > > > >> > > aggregate-store,
> > > > > >>>> > > > >> > > > > > > > rewinding to beginning.
> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_2 state
> > > store
> > > > > >>>> > > aggregate-store
> > > > > >>>> > > > >> > > > > changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-2
> > > > with
> > > > > >>>> EOS
> > > > > >>>> > > turned
> > > > > >>>> > > > >> on.
> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its
> > > state
> > > > > >>>> from the
> > > > > >>>> > > > >> beginning.
> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_1 state
> > > store
> > > > > >>>> > > aggregate-store
> > > > > >>>> > > > >> > > > > changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-1
> > > > with
> > > > > >>>> EOS
> > > > > >>>> > > turned
> > > > > >>>> > > > >> on.
> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its
> > > state
> > > > > >>>> from the
> > > > > >>>> > > > >> beginning.
> > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_0 state
> > > store
> > > > > >>>> > > aggregate-store
> > > > > >>>> > > > >> > > > > changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-0
> > > > with
> > > > > >>>> EOS
> > > > > >>>> > > turned
> > > > > >>>> > > > >> on.
> > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its
> > > state
> > > > > >>>> from the
> > > > > >>>> > > > >> beginning.
> > > > > >>>> > > > >> > > > > > > > Found checkpoint 709937 from changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-2
> > > > for
> > > > > >>>> store
> > > > > >>>> > > > >> > > aggregate-store.
> > > > > >>>> > > > >> > > > > > > > Restoring partition
> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-2
> > > > > >>>> > > > >> > > from
> > > > > >>>> > > > >> > > > > > > offset
> > > > > >>>> > > > >> > > > > > > > 709937 to endOffset 742799
> > > > > >>>> > > > >> > > > > > > > Found checkpoint 3024234 from changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-1
> > > > for
> > > > > >>>> store
> > > > > >>>> > > > >> > > aggregate-store.
> > > > > >>>> > > > >> > > > > > > > Restoring partition
> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-1
> > > > > >>>> > > > >> > > from
> > > > > >>>> > > > >> > > > > > > offset
> > > > > >>>> > > > >> > > > > > > > 3024234 to endOffset 3131513
> > > > > >>>> > > > >> > > > > > > > Found checkpoint 14514072 from
> changelog
> > > > > >>>> > > > >> > > > > > > >
> > sensors-stream-aggregate-store-changelog-0
> > > > for
> > > > > >>>> store
> > > > > >>>> > > > >> > > aggregate-store.
> > > > > >>>> > > > >> > > > > > > > Restoring partition
> > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-0
> > > > > >>>> > > > >> > > from
> > > > > >>>> > > > >> > > > > > > offset
> > > > > >>>> > > > >> > > > > > > > 14514072 to endOffset 17116574
> > > > > >>>> > > > >> > > > > > > > Restored from
> > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to
> > > > > >>>> > > > >> > > > > > > aggregate-store
> > > > > >>>> > > > >> > > > > > > > with 966 records, ending offset is
> > 711432,
> > > > next
> > > > > >>>> starting
> > > > > >>>> > > > >> > > position is
> > > > > >>>> > > > >> > > > > > > 711434
> > > > > >>>> > > > >> > > > > > > > Restored from
> > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to
> > > > > >>>> > > > >> > > > > > > aggregate-store
> > > > > >>>> > > > >> > > > > > > > with 914 records, ending offset is
> > 712711,
> > > > next
> > > > > >>>> starting
> > > > > >>>> > > > >> > > position is
> > > > > >>>> > > > >> > > > > > > 712713
> > > > > >>>> > > > >> > > > > > > > Restored from
> > > > > >>>> > > sensors-stream-aggregate-store-changelog-1 to
> > > > > >>>> > > > >> > > > > > > aggregate-store
> > > > > >>>> > > > >> > > > > > > > with 18 records, ending offset is
> > 3024261,
> > > > next
> > > > > >>>> starting
> > > > > >>>> > > > >> > > position is
> > > > > >>>> > > > >> > > > > > > 3024262
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > > why it first says it didn't find the
> > > > checkpoint
> > > > > >>>> and
> > > > > >>>> > > then it
> > > > > >>>> > > > >> does
> > > > > >>>> > > > >> > > > > find it?
> > > > > >>>> > > > >> > > > > > > > It seems it loaded about  2.7M records
> > (sum
> > > > of
> > > > > >>>> offset
> > > > > >>>> > > > >> difference
> > > > > >>>> > > > >> > > in
> > > > > >>>> > > > >> > > > > the
> > > > > >>>> > > > >> > > > > > > > "restorting partition ...." messages)
> > > right?
> > > > > >>>> > > > >> > > > > > > > Maybe should I try to reduce the
> > checkpoint
> > > > > >>>> interval?
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > > Regards
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > > --
> > > > > >>>> > > > >> > > > > > > > Alessandro Tagliapietra
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John
> > > Roesler <
> > > > > >>>> > > > >> vvcep...@apache.org
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > > > > wrote:
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > Hi Alessandro,
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > I'm sorry to hear that.
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > The restore process only takes one
> > factor
> > > > > into
> > > > > >>>> > > account:
> > > > > >>>> > > > >> the
> > > > > >>>> > > > >> > > current
> > > > > >>>> > > > >> > > > > > > offset
> > > > > >>>> > > > >> > > > > > > > > position of the changelog topic is
> > stored
> > > > in
> > > > > a
> > > > > >>>> local
> > > > > >>>> > > file
> > > > > >>>> > > > >> > > > > alongside the
> > > > > >>>> > > > >> > > > > > > > > state stores. On startup, the app
> > checks
> > > if
> > > > > the
> > > > > >>>> > > recorded
> > > > > >>>> > > > >> > > position
> > > > > >>>> > > > >> > > > > lags
> > > > > >>>> > > > >> > > > > > > the
> > > > > >>>> > > > >> > > > > > > > > latest offset in the changelog. If
> so,
> > > then
> > > > > it
> > > > > >>>> reads
> > > > > >>>> > > the
> > > > > >>>> > > > >> > > missing
> > > > > >>>> > > > >> > > > > > > changelog
> > > > > >>>> > > > >> > > > > > > > > records before starting processing.
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > Thus, it would not restore any old
> > window
> > > > > data.
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > There might be a few different things
> > > going
> > > > > on
> > > > > >>>> to
> > > > > >>>> > > explain
> > > > > >>>> > > > >> your
> > > > > >>>> > > > >> > > > > > > observation:
> > > > > >>>> > > > >> > > > > > > > > * if there is more than one instance
> in
> > > > your
> > > > > >>>> Streams
> > > > > >>>> > > > >> cluster,
> > > > > >>>> > > > >> > > > > maybe the
> > > > > >>>> > > > >> > > > > > > > > task is "flopping" between instances,
> > so
> > > > each
> > > > > >>>> instance
> > > > > >>>> > > > >> still
> > > > > >>>> > > > >> > > has to
> > > > > >>>> > > > >> > > > > > > recover
> > > > > >>>> > > > >> > > > > > > > > state, since it wasn't the last one
> > > > actively
> > > > > >>>> > > processing
> > > > > >>>> > > > >> it.
> > > > > >>>> > > > >> > > > > > > > > * if the application isn't stopped
> > > > > gracefully,
> > > > > >>>> it
> > > > > >>>> > > might
> > > > > >>>> > > > >> not
> > > > > >>>> > > > >> > > get a
> > > > > >>>> > > > >> > > > > > > chance
> > > > > >>>> > > > >> > > > > > > > > to record its offset in that local
> > file,
> > > so
> > > > > on
> > > > > >>>> > > restart it
> > > > > >>>> > > > >> has
> > > > > >>>> > > > >> > > to
> > > > > >>>> > > > >> > > > > > > restore
> > > > > >>>> > > > >> > > > > > > > > some or all of the state store from
> > > > > changelog.
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > Or it could be something else; that's
> > > just
> > > > > >>>> what comes
> > > > > >>>> > > to
> > > > > >>>> > > > >> mind.
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > If you want to get to the bottom of
> it,
> > > you
> > > > > >>>> can take a
> > > > > >>>> > > > >> look at
> > > > > >>>> > > > >> > > the
> > > > > >>>> > > > >> > > > > > > logs,
> > > > > >>>> > > > >> > > > > > > > > paying close attention to which tasks
> > are
> > > > > >>>> assigned to
> > > > > >>>> > > > >> which
> > > > > >>>> > > > >> > > > > instances
> > > > > >>>> > > > >> > > > > > > after
> > > > > >>>> > > > >> > > > > > > > > each restart. You can also look into
> > the
> > > > logs
> > > > > >>>> from
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > >
> > > > > >>>> > >
> > > > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader`
> > > > > >>>> > > > >> > > > > > > (might
> > > > > >>>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE
> level
> > to
> > > > > >>>> really see
> > > > > >>>> > > > >> what's
> > > > > >>>> > > > >> > > > > happening).
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > I hope this helps!
> > > > > >>>> > > > >> > > > > > > > > -John
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25,
> > Alessandro
> > > > > >>>> Tagliapietra
> > > > > >>>> > > > >> wrote:
> > > > > >>>> > > > >> > > > > > > > > > Hello everyone,
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > > we're having a problem with
> bandwidth
> > > > usage
> > > > > >>>> on
> > > > > >>>> > > streams
> > > > > >>>> > > > >> > > > > application
> > > > > >>>> > > > >> > > > > > > > > startup,
> > > > > >>>> > > > >> > > > > > > > > > our current setup does this:
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > > ...
> > > > > >>>> > > > >> > > > > > > > > > .groupByKey()
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > >
> > > > > >>>> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
> > > > > >>>> > > > >> > > > > > > > > > .aggregate(
> > > > > >>>> > > > >> > > > > > > > > >         {
> > > MetricSequenceList(ArrayList())
> > > > > },
> > > > > >>>> > > > >> > > > > > > > > >         { key, value, aggregate ->
> > > > > >>>> > > > >> > > > > > > > > >
> > > > >  aggregate.getRecords().add(value)
> > > > > >>>> > > > >> > > > > > > > > >             aggregate
> > > > > >>>> > > > >> > > > > > > > > >         },
> > > > > >>>> > > > >> > > > > > > > > >         Materialized.`as`<String,
> > > > > >>>> > > MetricSequenceList,
> > > > > >>>> > > > >> > > > > > > WindowStore<Bytes,
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > >
> > > > > >>>> > > > >> > >
> > > > > >>>> > > > >>
> > > > > >>>> > >
> > > > > >>>>
> > > > >
> > > >
> > >
> >
> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
> > > > > >>>> > > > >> > > > > > > > > > )
> > > > > >>>> > > > >> > > > > > > > > > .toStream()
> > > > > >>>> > > > >> > > > > > > > > > .flatTransform(TransformerSupplier
> {
> > > > > >>>> > > > >> > > > > > > > > > ...
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > > basically in each window we append
> > the
> > > > new
> > > > > >>>> values
> > > > > >>>> > > and
> > > > > >>>> > > > >> then do
> > > > > >>>> > > > >> > > > > some
> > > > > >>>> > > > >> > > > > > > other
> > > > > >>>> > > > >> > > > > > > > > > logic with the array of windowed
> > > values.
> > > > > >>>> > > > >> > > > > > > > > > The aggregate-store changelog topic
> > > > > >>>> configuration
> > > > > >>>> > > uses
> > > > > >>>> > > > >> > > > > > > compact,delete as
> > > > > >>>> > > > >> > > > > > > > > > cleanup policy and has 12 hours of
> > > > > retention.
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > > What we've seen is that on
> > application
> > > > > >>>> startup it
> > > > > >>>> > > takes
> > > > > >>>> > > > >> a
> > > > > >>>> > > > >> > > couple
> > > > > >>>> > > > >> > > > > > > minutes
> > > > > >>>> > > > >> > > > > > > > > to
> > > > > >>>> > > > >> > > > > > > > > > rebuild the state store, even if
> the
> > > > state
> > > > > >>>> store
> > > > > >>>> > > > >> directory is
> > > > > >>>> > > > >> > > > > > > persisted
> > > > > >>>> > > > >> > > > > > > > > > across restarts. That along with an
> > > > > >>>> exception that
> > > > > >>>> > > > >> caused the
> > > > > >>>> > > > >> > > > > docker
> > > > > >>>> > > > >> > > > > > > > > > container to be restarted a couple
> > > > hundreds
> > > > > >>>> times
> > > > > >>>> > > > >> caused a
> > > > > >>>> > > > >> > > big
> > > > > >>>> > > > >> > > > > > > confluent
> > > > > >>>> > > > >> > > > > > > > > > cloud bill compared to what we
> > usually
> > > > > spend
> > > > > >>>> (1/4
> > > > > >>>> > > of a
> > > > > >>>> > > > >> full
> > > > > >>>> > > > >> > > > > month in
> > > > > >>>> > > > >> > > > > > > 1
> > > > > >>>> > > > >> > > > > > > > > day).
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > > What I think is happening is that
> the
> > > > topic
> > > > > >>>> is
> > > > > >>>> > > keeping
> > > > > >>>> > > > >> all
> > > > > >>>> > > > >> > > the
> > > > > >>>> > > > >> > > > > > > previous
> > > > > >>>> > > > >> > > > > > > > > > windows even with the compacting
> > policy
> > > > > >>>> because each
> > > > > >>>> > > > >> key is
> > > > > >>>> > > > >> > > the
> > > > > >>>> > > > >> > > > > > > original
> > > > > >>>> > > > >> > > > > > > > > > key + the timestamp not just the
> key.
> > > > Since
> > > > > >>>> we don't
> > > > > >>>> > > > >> care
> > > > > >>>> > > > >> > > about
> > > > > >>>> > > > >> > > > > > > previous
> > > > > >>>> > > > >> > > > > > > > > > windows as the flatTransform after
> > the
> > > > > >>>> toStream()
> > > > > >>>> > > makes
> > > > > >>>> > > > >> sure
> > > > > >>>> > > > >> > > > > that we
> > > > > >>>> > > > >> > > > > > > > > don't
> > > > > >>>> > > > >> > > > > > > > > > process old windows (a custom
> > > suppressor
> > > > > >>>> basically)
> > > > > >>>> > > is
> > > > > >>>> > > > >> there
> > > > > >>>> > > > >> > > a
> > > > > >>>> > > > >> > > > > way to
> > > > > >>>> > > > >> > > > > > > > > only
> > > > > >>>> > > > >> > > > > > > > > > keep the last window so that the
> > store
> > > > > >>>> rebuilding
> > > > > >>>> > > goes
> > > > > >>>> > > > >> > > faster and
> > > > > >>>> > > > >> > > > > > > without
> > > > > >>>> > > > >> > > > > > > > > > rebuilding old windows too? Or
> > should I
> > > > > >>>> create a
> > > > > >>>> > > custom
> > > > > >>>> > > > >> > > window
> > > > > >>>> > > > >> > > > > using
> > > > > >>>> > > > >> > > > > > > the
> > > > > >>>> > > > >> > > > > > > > > > original key as key so that the
> > > > compaction
> > > > > >>>> keeps
> > > > > >>>> > > only
> > > > > >>>> > > > >> the
> > > > > >>>> > > > >> > > last
> > > > > >>>> > > > >> > > > > window
> > > > > >>>> > > > >> > > > > > > > > data?
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > > Thank you
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > > > --
> > > > > >>>> > > > >> > > > > > > > > > Alessandro Tagliapietra
> > > > > >>>> > > > >> > > > > > > > > >
> > > > > >>>> > > > >> > > > > > > > >
> > > > > >>>> > > > >> > > > > > > >
> > > > > >>>> > > > >> > > > > > >
> > > > > >>>> > > > >> > > > > >
> > > > > >>>> > > > >> > > > >
> > > > > >>>> > > > >> > > >
> > > > > >>>> > > > >> > >
> > > > > >>>> > > > >> >
> > > > > >>>> > > > >>
> > > > > >>>> > > > >
> > > > > >>>> > > >
> > > > > >>>> > >
> > > > > >>>> >
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
>

Reply via email to