You're saying that with a 100ms commit interval, caching won't help because it would still send the compacted changes to the changelog every 100ms?
Regarding the custom state store I'll look into that because I didn't go much further than transformers and stores in my kafka experience so I'll need to understand better what that implies. Yeah I only have one window per key in the store. The only thing I don't understand is why cache works 80% of the time and then suddenly the changelog sent bytes increase 90x. I mean, if cache wasn't working, why enabling it in our pipeline decreased the sent bytes from 30-40MB/minute to 400KB/minute? I'll look into the custom state store tho. Thanks -- Alessandro Tagliapietra On Mon, Dec 9, 2019 at 7:02 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > Alright, well I see why you have so much data being sent to the changelog > if each > update involves appending to a list and then writing in the whole list. And > with 340 > records/minute I'm actually not sure how the cache could really help at all > when it's > being flushed every 100ms. > > Here's kind of a wild idea, if you really only need append semantics: what > if you wrote > a custom StateStore that wrapped the normal RocksDBStore (or > RocksDBWindowStore) > and did the append for you under the hood? The changelogging layer sits > between the > layer that you would call #put on in your transformer and the final layer > that actually writes > to the underlying storage engine. If you insert an extra layer and modify > your transformer > to only call put on the new data (rather than the entire list) then only > this new data will get > sent to the changelog. Your custom storage layer will know it's actually > append semantics, > and add the new data to the existing list before sending it on to RocksDB. > > Since you only ever have one window per key in the store (right?) you just > need to make > sure that nothing from the current window gets deleted prematurely. You'd > want to turn off > compaction on the changelog and caching on the store of course, and maybe > give the > changelog some extra retention time to be safe. > > Obviously I haven't thoroughly verified this alternative, but it seems like > this approach (or > something to its effect) could help you cut down on the changelog data. > WDYT? > > On Mon, Dec 9, 2019 at 4:35 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > Hi Sophie, > > > > Just to give a better context, yes we use EOS and the problem happens in > > our aggregation store. > > Basically when windowing data we append each record into a list that's > > stored in the aggregation store. > > We have 2 versions, in production we use the kafka streams windowing API, > > in staging we manually calculate the window end timestamp and aggregate > > using that timestamp. > > > > To give you an example of the staging code, it's a simple transformer > that: > > - if incoming data fits in the same window as the data in store, append > > the data to the existing store list overwriting the same key and nothing > is > > sent downstream > > - if incoming data has a timestamp smaller than the existing store data, > > discard the record > > - if incoming data has a timestamp bigger than the existing store data, > > send the stored list downstream and store the new window data into the > > store > > > > This way we don't use multiple keys (kafka streams instead uses a store > > where each key is stream-key + window key) as we overwrite the store data > > using the same key over and over. > > So what I would expect is that since we're overwriting the same keys > there > > isn't more and more data to be cached as the number of keys are always > the > > same and we don't really need to cache more data over time. > > > > To respond to your questions: > > - yes when I say that cache "stopped/started" working I mean that at > some > > point the store started sending more and more data to che changelog topic > > and then suddenly stopped again even without a restart (a restart always > > fixes the problem). > > - Yes there are no density changes in the input stream, I've checked the > > number of records sent to the stream input topic and there is a variation > > of ~10-20 records per minute on an average of 340 records per minute. > Most > > of the records are also generated by simulators with very predictable > > output rate. > > > > In the meantime I've enabled reporting of debug metrics (so including > cache > > hit ratio) to hopefully get better insights the next time it happens. > > > > Thank you in advance > > > > -- > > Alessandro Tagliapietra > > > > On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman <sop...@confluent.io> > > wrote: > > > > > It's an LRU cache, so once it gets full new records will cause older > ones > > > to be evicted (and thus sent > > > downstream). Of course this should only apply to records of a different > > > key, otherwise it will just cause > > > an update of that key in the cache. > > > > > > I missed that you were using EOS, given the short commit interval it's > > hard > > > to see those effects. > > > When you say that it stopped working and then appeared to start working > > > again, are you just > > > referring to the amount of data being sent to the changelog? And you > can > > > definitely rule out differences > > > in the density of updates in the input stream? > > > > > > > > > > > > On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra < > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > Hi Sophie, > > > > > > > > thanks fo helping. > > > > > > > > By eviction of older records you mean they get flushed to the > changelog > > > > topic? > > > > Or the cache is just full and so all new records go to the changelog > > > topic > > > > until the old ones are evicted? > > > > > > > > Regarding the timing, what timing do you mean? Between when the cache > > > stops > > > > and starts working again? We're using EOS os I believe the commit > > > interval > > > > is every 100ms. > > > > > > > > Regards > > > > > > > > -- > > > > Alessandro Tagliapietra > > > > > > > > > > > > > > > > On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman < > > sop...@confluent.io > > > > > > > > wrote: > > > > > > > > > It might be that the cache appears to "stop working" because it > gets > > > > full, > > > > > and each > > > > > new update causes an eviction (of some older record). This would > also > > > > > explain the > > > > > opposite behavior, that it "starts working" again after some time > > > without > > > > > being restarted, > > > > > since the cache is completely flushed on commit. Does the timing > seem > > > to > > > > > align with your > > > > > commit interval (default is 30s)? > > > > > > > > > > On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra < > > > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > > > > > And it seems that for some reason after a while caching works > again > > > > > > without a restart of the streams application > > > > > > > > > > > > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png] > > > > > > > > > > > > I'll try to enable debug metrics and see if I can find something > > > useful > > > > > > there. > > > > > > Any idea is appreciated in the meantime :) > > > > > > > > > > > > -- > > > > > > Alessandro Tagliapietra > > > > > > > > > > > > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra < > > > > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > > > > > >> It seems that even with caching enabled, after a while the sent > > > bytes > > > > > >> stil go up > > > > > >> > > > > > >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png] > > > > > >> > > > > > >> you can see the deploy when I've enabled caching but it looks > like > > > > it's > > > > > >> still a temporary solution. > > > > > >> > > > > > >> -- > > > > > >> Alessandro Tagliapietra > > > > > >> > > > > > >> > > > > > >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra < > > > > > >> tagliapietra.alessan...@gmail.com> wrote: > > > > > >> > > > > > >>> Could be, but since we have a limite amount of input keys > (~30), > > > > > >>> windowing generates new keys but old ones are never touched > again > > > > > since the > > > > > >>> data per key is in order, I assume it shouldn't be a big deal > for > > > it > > > > to > > > > > >>> handle 30 keys > > > > > >>> I'll have a look at cache metrics and see if something pops out > > > > > >>> > > > > > >>> Thanks > > > > > >>> > > > > > >>> -- > > > > > >>> Alessandro Tagliapietra > > > > > >>> > > > > > >>> > > > > > >>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler < > > vvcep...@apache.org> > > > > > >>> wrote: > > > > > >>> > > > > > >>>> Hmm, that’s a good question. Now that we’re talking about > > > caching, I > > > > > >>>> wonder if the cache was just too small. It’s not very big by > > > > default. > > > > > >>>> > > > > > >>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote: > > > > > >>>> > Ok I'll check on that! > > > > > >>>> > > > > > > >>>> > Now I can see that with caching we went from 3-4MB/s to > > 400KB/s, > > > > > that > > > > > >>>> will > > > > > >>>> > help with the bill. > > > > > >>>> > > > > > > >>>> > Last question, any reason why after a while the regular > > windowed > > > > > >>>> stream > > > > > >>>> > starts sending every update instead of caching? > > > > > >>>> > Could it be because it doesn't have any more memory > available? > > > Any > > > > > >>>> other > > > > > >>>> > possible reason? > > > > > >>>> > > > > > > >>>> > Thank you so much for your help > > > > > >>>> > > > > > > >>>> > -- > > > > > >>>> > Alessandro Tagliapietra > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler < > > > vvcep...@apache.org> > > > > > >>>> wrote: > > > > > >>>> > > > > > > >>>> > > Ah, yes. Glad you figured it out! > > > > > >>>> > > > > > > > >>>> > > Caching does not reduce EOS guarantees at all. I highly > > > > recommend > > > > > >>>> using > > > > > >>>> > > it. You might even want to take a look at the caching > > metrics > > > to > > > > > >>>> make sure > > > > > >>>> > > you have a good hit ratio. > > > > > >>>> > > > > > > > >>>> > > -John > > > > > >>>> > > > > > > > >>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra > > wrote: > > > > > >>>> > > > Never mind I've found out I can use > `.withCachingEnabled` > > on > > > > the > > > > > >>>> store > > > > > >>>> > > > builder to achieve the same thing as the windowing > example > > > as > > > > > >>>> > > > `Materialized.as` turns that on by default. > > > > > >>>> > > > > > > > > >>>> > > > Does caching in any way reduces the EOS guarantees? > > > > > >>>> > > > > > > > > >>>> > > > -- > > > > > >>>> > > > Alessandro Tagliapietra > > > > > >>>> > > > > > > > > >>>> > > > > > > > > >>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra < > > > > > >>>> > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > >>>> > > > > > > > > >>>> > > > > Seems my journey with this isn't done just yet, > > > > > >>>> > > > > > > > > > >>>> > > > > This seems very complicated to me but I'll try to > > explain > > > it > > > > > as > > > > > >>>> best I > > > > > >>>> > > can. > > > > > >>>> > > > > To better understand the streams network usage I've > used > > > > > >>>> prometheus > > > > > >>>> > > with > > > > > >>>> > > > > the JMX exporter to export kafka metrics. > > > > > >>>> > > > > To check the amount of data we use I'm looking at the > > > > > increments > > > > > >>>> > > > > of kafka_producer_topic_metrics_byte_total and > > > > > >>>> > > > > > kafka_producer_producer_topic_metrics_record_send_total, > > > > > >>>> > > > > > > > > > >>>> > > > > Our current (before the change mentioned above) code > > looks > > > > > like > > > > > >>>> this: > > > > > >>>> > > > > > > > > > >>>> > > > > // This transformers just pairs a value with the > > previous > > > > one > > > > > >>>> storing > > > > > >>>> > > the > > > > > >>>> > > > > temporary one in a store > > > > > >>>> > > > > val pairsStream = metricStream > > > > > >>>> > > > > .transformValues(ValueTransformerWithKeySupplier { > > > > > >>>> PairTransformer() > > > > > >>>> > > }, > > > > > >>>> > > > > "LastValueStore") > > > > > >>>> > > > > .filter { _, value: MetricSequence? -> value != > null } > > > > > >>>> > > > > > > > > > >>>> > > > > // Create a store to store suppressed windows until a > > new > > > > one > > > > > is > > > > > >>>> > > received > > > > > >>>> > > > > val suppressStoreSupplier = > > > > > >>>> > > > > > > > > > >>>> > > > > > > > >>>> > > > > > > > > > > > > > > > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"), > > > > > >>>> > > > > ...... > > > > > >>>> > > > > > > > > > >>>> > > > > // Window and aggregate data in 1 minute intervals > > > > > >>>> > > > > val aggregatedStream = pairsStream > > > > > >>>> > > > > .groupByKey() > > > > > >>>> > > > > > > > > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) > > > > > >>>> > > > > .aggregate( > > > > > >>>> > > > > { MetricSequenceList(ArrayList()) }, > > > > > >>>> > > > > { key, value, aggregate -> > > > > > >>>> > > > > aggregate.getRecords().add(value) > > > > > >>>> > > > > aggregate > > > > > >>>> > > > > }, > > > > > >>>> > > > > Materialized.`as`<String, > MetricSequenceList, > > > > > >>>> > > WindowStore<Bytes, > > > > > >>>> > > > > > > > > > >>>> > > > > > > > >>>> > > > > > > > > > > > > > > > ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) > > > > > >>>> > > > > ) > > > > > >>>> > > > > .toStream() > > > > > >>>> > > > > .flatTransform(TransformerSupplier { > > > > > >>>> > > > > // This transformer basically waits until a new > > > window > > > > > is > > > > > >>>> > > received > > > > > >>>> > > > > to emit the previous one > > > > > >>>> > > > > }, "suppress-store") > > > > > >>>> > > > > .map { sensorId: String, suppressedOutput: > > > > SuppressedOutput > > > > > -> > > > > > >>>> > > > > .... etc .... > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > > >>>> > > > > Basically: > > > > > >>>> > > > > - all data goes through LastValueStore store that > > stores > > > > each > > > > > >>>> message > > > > > >>>> > > and > > > > > >>>> > > > > emits a pair with the previous one > > > > > >>>> > > > > - the aggregate-store is used to store the per-window > > > list > > > > of > > > > > >>>> > > messages in > > > > > >>>> > > > > the aggregate method > > > > > >>>> > > > > - the suppress store is used to store each received > > > window > > > > > >>>> which is > > > > > >>>> > > > > emitted only after a newer one is received > > > > > >>>> > > > > > > > > > >>>> > > > > What I'm experiencing is that: > > > > > >>>> > > > > - during normal execution, the streams app sends to > the > > > > > >>>> lastvalue > > > > > >>>> > > store > > > > > >>>> > > > > changelog topic 5k messages/min, the aggregate and > > > suppress > > > > > >>>> store > > > > > >>>> > > changelog > > > > > >>>> > > > > topics only about 100 > > > > > >>>> > > > > - at some point (after many hours of operation), the > > > > streams > > > > > >>>> app > > > > > >>>> > > starts > > > > > >>>> > > > > sending to the aggregate and suppress store changelog > > > topic > > > > > the > > > > > >>>> same > > > > > >>>> > > amount > > > > > >>>> > > > > of messages going through the lastvaluestore > > > > > >>>> > > > > - if I restart the streams app it goes back to the > > > initial > > > > > >>>> behavior > > > > > >>>> > > > > > > > > > >>>> > > > > You can see the behavior in this graph > > > > > >>>> https://imgur.com/dJcUNSf > > > > > >>>> > > > > You can also see that after a restart everything goes > > back > > > > to > > > > > >>>> normal > > > > > >>>> > > > > levels. > > > > > >>>> > > > > Regarding other metrics, process latency increases, > poll > > > > > latency > > > > > >>>> > > > > decreases, poll rate decreases, commit rate stays the > > same > > > > > >>>> while commit > > > > > >>>> > > > > latency increases. > > > > > >>>> > > > > > > > > > >>>> > > > > Now, I've these questions: > > > > > >>>> > > > > - why isn't the aggregate/suppress store changelog > > topic > > > > > >>>> throughput > > > > > >>>> > > the > > > > > >>>> > > > > same as the LastValueStore? Shouldn't every time it > > > > aggregates > > > > > >>>> send a > > > > > >>>> > > > > record to the changelog? > > > > > >>>> > > > > - is the windowing doing some internal caching like > not > > > > > >>>> sending every > > > > > >>>> > > > > aggregation record until the window time is passed? > (if > > > so, > > > > > >>>> where can I > > > > > >>>> > > > > find that code since I would like to use that also for > > our > > > > new > > > > > >>>> > > > > implementation) > > > > > >>>> > > > > > > > > > >>>> > > > > Thank you in advance > > > > > >>>> > > > > > > > > > >>>> > > > > -- > > > > > >>>> > > > > Alessandro Tagliapietra > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > > >>>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler < > > > > > >>>> vvcep...@apache.org> > > > > > >>>> > > wrote: > > > > > >>>> > > > > > > > > > >>>> > > > >> Oh, good! > > > > > >>>> > > > >> > > > > > >>>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro > Tagliapietra > > > > wrote: > > > > > >>>> > > > >> > Testing on staging shows that a restart on > exception > > is > > > > > much > > > > > >>>> faster > > > > > >>>> > > and > > > > > >>>> > > > >> the > > > > > >>>> > > > >> > stream starts right away which I think means we're > > > > reading > > > > > >>>> way less > > > > > >>>> > > data > > > > > >>>> > > > >> > than before! > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > What I was referring to is that, in Streams, the > keys > > > for > > > > > >>>> window > > > > > >>>> > > > >> > > aggregation state is actually composed of both > the > > > > window > > > > > >>>> itself > > > > > >>>> > > and > > > > > >>>> > > > >> the > > > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". > That > > > > > results > > > > > >>>> in the > > > > > >>>> > > > >> store > > > > > >>>> > > > >> > > having a unique key per window for each K, which > is > > > why > > > > > we > > > > > >>>> need > > > > > >>>> > > > >> retention > > > > > >>>> > > > >> > > as well as compaction for our changelogs. But for > > > you, > > > > if > > > > > >>>> you just > > > > > >>>> > > > >> make the > > > > > >>>> > > > >> > > key "K", then compaction alone should do the > trick. > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > Yes we had compact,delete as cleanup policy but > > > probably > > > > it > > > > > >>>> still > > > > > >>>> > > had a > > > > > >>>> > > > >> too > > > > > >>>> > > > >> > long retention value, also the rocksdb store is > > > probably > > > > > much > > > > > >>>> > > faster now > > > > > >>>> > > > >> > having only one key per key instead of one key per > > > window > > > > > >>>> per key. > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > Thanks a lot for helping! I'm now going to setup a > > > > > >>>> prometheus-jmx > > > > > >>>> > > > >> > monitoring so we can keep better track of what's > > going > > > on > > > > > :) > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > -- > > > > > >>>> > > > >> > Alessandro Tagliapietra > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler < > > > > > >>>> vvcep...@apache.org> > > > > > >>>> > > > >> wrote: > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > > Oh, yeah, I remember that conversation! > > > > > >>>> > > > >> > > > > > > > >>>> > > > >> > > Yes, then, I agree, if you're only storing state > of > > > the > > > > > >>>> most > > > > > >>>> > > recent > > > > > >>>> > > > >> window > > > > > >>>> > > > >> > > for each key, and the key you use for that state > is > > > > > >>>> actually the > > > > > >>>> > > key > > > > > >>>> > > > >> of the > > > > > >>>> > > > >> > > records, then an aggressive compaction policy > plus > > > your > > > > > >>>> custom > > > > > >>>> > > > >> transformer > > > > > >>>> > > > >> > > seems like a good way forward. > > > > > >>>> > > > >> > > > > > > > >>>> > > > >> > > What I was referring to is that, in Streams, the > > keys > > > > for > > > > > >>>> window > > > > > >>>> > > > >> > > aggregation state is actually composed of both > the > > > > window > > > > > >>>> itself > > > > > >>>> > > and > > > > > >>>> > > > >> the > > > > > >>>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". > That > > > > > results > > > > > >>>> in the > > > > > >>>> > > > >> store > > > > > >>>> > > > >> > > having a unique key per window for each K, which > is > > > why > > > > > we > > > > > >>>> need > > > > > >>>> > > > >> retention > > > > > >>>> > > > >> > > as well as compaction for our changelogs. But for > > > you, > > > > if > > > > > >>>> you just > > > > > >>>> > > > >> make the > > > > > >>>> > > > >> > > key "K", then compaction alone should do the > trick. > > > > > >>>> > > > >> > > > > > > > >>>> > > > >> > > And yes, if you manage the topic yourself, then > > > Streams > > > > > >>>> won't > > > > > >>>> > > adjust > > > > > >>>> > > > >> the > > > > > >>>> > > > >> > > retention time. I think it might validate that > the > > > > > >>>> retention > > > > > >>>> > > isn't too > > > > > >>>> > > > >> > > short, but I don't remember offhand. > > > > > >>>> > > > >> > > > > > > > >>>> > > > >> > > Cheers, and let me know how it goes! > > > > > >>>> > > > >> > > -John > > > > > >>>> > > > >> > > > > > > > >>>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro > > > Tagliapietra > > > > > >>>> wrote: > > > > > >>>> > > > >> > > > Hi John, > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > afaik grace period uses stream time > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > > > > > >>>> > > > >> > > > > > >>>> > > > > > > > >>>> > > > > > > > > > > > > > > > https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html > > > > > >>>> > > > >> > > > which is > > > > > >>>> > > > >> > > > per partition, unfortunately we process data > > that's > > > > not > > > > > >>>> in sync > > > > > >>>> > > > >> between > > > > > >>>> > > > >> > > > keys so each key needs to be independent and a > > key > > > > can > > > > > >>>> have much > > > > > >>>> > > > >> older > > > > > >>>> > > > >> > > > data > > > > > >>>> > > > >> > > > than the other. > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > Having a small grace period would probably > close > > > old > > > > > >>>> windows > > > > > >>>> > > sooner > > > > > >>>> > > > >> than > > > > > >>>> > > > >> > > > expected. That's also why in my use case a > custom > > > > store > > > > > >>>> that > > > > > >>>> > > just > > > > > >>>> > > > >> stores > > > > > >>>> > > > >> > > > the last window data for each key might work > > > better. > > > > I > > > > > >>>> had the > > > > > >>>> > > same > > > > > >>>> > > > >> issue > > > > > >>>> > > > >> > > > with suppression and it has been reported here > > > > > >>>> > > > >> > > > > https://issues.apache.org/jira/browse/KAFKA-8769 > > > > > >>>> > > > >> > > > Oh I just saw that you're the one that helped > me > > on > > > > > >>>> slack and > > > > > >>>> > > > >> created the > > > > > >>>> > > > >> > > > issue (thanks again for that). > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > The behavior that you mention about streams > > setting > > > > > >>>> changelog > > > > > >>>> > > > >> retention > > > > > >>>> > > > >> > > > time is something they do on creation of the > > topic > > > > when > > > > > >>>> the > > > > > >>>> > > broker > > > > > >>>> > > > >> has > > > > > >>>> > > > >> > > auto > > > > > >>>> > > > >> > > > creation enabled? Because we're using confluent > > > cloud > > > > > >>>> and I had > > > > > >>>> > > to > > > > > >>>> > > > >> create > > > > > >>>> > > > >> > > > it manually. > > > > > >>>> > > > >> > > > Regarding the change in the recovery behavior, > > with > > > > > >>>> compact > > > > > >>>> > > cleanup > > > > > >>>> > > > >> > > policy > > > > > >>>> > > > >> > > > shouldn't the changelog only keep the last > value? > > > > That > > > > > >>>> would > > > > > >>>> > > make > > > > > >>>> > > > >> the > > > > > >>>> > > > >> > > > recovery faster and "cheaper" as it would only > > need > > > > to > > > > > >>>> read a > > > > > >>>> > > single > > > > > >>>> > > > >> > > value > > > > > >>>> > > > >> > > > per key (if the cleanup just happened) right? > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > -- > > > > > >>>> > > > >> > > > Alessandro Tagliapietra > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler < > > > > > >>>> > > vvcep...@apache.org> > > > > > >>>> > > > >> wrote: > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > > Hey Alessandro, > > > > > >>>> > > > >> > > > > > > > > > >>>> > > > >> > > > > That sounds also like it would work. I'm > > > wondering > > > > if > > > > > >>>> it would > > > > > >>>> > > > >> actually > > > > > >>>> > > > >> > > > > change what you observe w.r.t. recovery > > behavior, > > > > > >>>> though. > > > > > >>>> > > Streams > > > > > >>>> > > > >> > > already > > > > > >>>> > > > >> > > > > sets the retention time on the changelog to > > equal > > > > the > > > > > >>>> > > retention > > > > > >>>> > > > >> time > > > > > >>>> > > > >> > > of the > > > > > >>>> > > > >> > > > > windows, for windowed aggregations, so you > > > > shouldn't > > > > > be > > > > > >>>> > > loading a > > > > > >>>> > > > >> lot > > > > > >>>> > > > >> > > of > > > > > >>>> > > > >> > > > > window data for old windows you no longer > care > > > > about. > > > > > >>>> > > > >> > > > > > > > > > >>>> > > > >> > > > > Have you set the "grace period" on your > window > > > > > >>>> definition? By > > > > > >>>> > > > >> default, > > > > > >>>> > > > >> > > it > > > > > >>>> > > > >> > > > > is set to 24 hours, but you can set it as low > > as > > > > you > > > > > >>>> like. > > > > > >>>> > > E.g., > > > > > >>>> > > > >> if you > > > > > >>>> > > > >> > > > > want to commit to having in-order data only, > > then > > > > you > > > > > >>>> can set > > > > > >>>> > > the > > > > > >>>> > > > >> grace > > > > > >>>> > > > >> > > > > period to zero. This _should_ let the broker > > > clean > > > > up > > > > > >>>> the > > > > > >>>> > > > >> changelog > > > > > >>>> > > > >> > > records > > > > > >>>> > > > >> > > > > as soon as the window ends. > > > > > >>>> > > > >> > > > > > > > > > >>>> > > > >> > > > > Of course, the log cleaner doesn't run all > the > > > > time, > > > > > so > > > > > >>>> > > there's > > > > > >>>> > > > >> some > > > > > >>>> > > > >> > > extra > > > > > >>>> > > > >> > > > > delay in which "expired" data would still be > > > > visible > > > > > >>>> in the > > > > > >>>> > > > >> changelog, > > > > > >>>> > > > >> > > but > > > > > >>>> > > > >> > > > > it would actually be just the same as if you > > > manage > > > > > >>>> the store > > > > > >>>> > > > >> yourself. > > > > > >>>> > > > >> > > > > > > > > > >>>> > > > >> > > > > Hope this helps! > > > > > >>>> > > > >> > > > > -John > > > > > >>>> > > > >> > > > > > > > > > >>>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro > > > > > Tagliapietra > > > > > >>>> wrote: > > > > > >>>> > > > >> > > > > > Thanks John for the explanation, > > > > > >>>> > > > >> > > > > > > > > > > >>>> > > > >> > > > > > I thought that with EOS enabled (which we > > have) > > > > it > > > > > >>>> would in > > > > > >>>> > > the > > > > > >>>> > > > >> worst > > > > > >>>> > > > >> > > > > case > > > > > >>>> > > > >> > > > > > find a valid checkpoint and start the > restore > > > > from > > > > > >>>> there > > > > > >>>> > > until > > > > > >>>> > > > >> it > > > > > >>>> > > > >> > > reached > > > > > >>>> > > > >> > > > > > the last committed status, not completely > > from > > > > > >>>> scratch. What > > > > > >>>> > > > >> you say > > > > > >>>> > > > >> > > > > > definitely makes sense now. > > > > > >>>> > > > >> > > > > > Since we don't really need old time windows > > and > > > > we > > > > > >>>> ensure > > > > > >>>> > > data > > > > > >>>> > > > >> is > > > > > >>>> > > > >> > > ordered > > > > > >>>> > > > >> > > > > > when processed I think I"ll just write a > > custom > > > > > >>>> transformer > > > > > >>>> > > to > > > > > >>>> > > > >> keep > > > > > >>>> > > > >> > > only > > > > > >>>> > > > >> > > > > > the last window, store intermediate > > aggregation > > > > > >>>> results in > > > > > >>>> > > the > > > > > >>>> > > > >> store > > > > > >>>> > > > >> > > and > > > > > >>>> > > > >> > > > > > emit a new value only when we receive data > > > > > belonging > > > > > >>>> to a > > > > > >>>> > > new > > > > > >>>> > > > >> window. > > > > > >>>> > > > >> > > > > > That with a compact only changelog topic > > should > > > > > keep > > > > > >>>> the > > > > > >>>> > > rebuild > > > > > >>>> > > > >> > > data to > > > > > >>>> > > > >> > > > > > the minimum as it would have only the last > > > value > > > > > for > > > > > >>>> each > > > > > >>>> > > key. > > > > > >>>> > > > >> > > > > > > > > > > >>>> > > > >> > > > > > Hope that makes sense > > > > > >>>> > > > >> > > > > > > > > > > >>>> > > > >> > > > > > Thanks again > > > > > >>>> > > > >> > > > > > > > > > > >>>> > > > >> > > > > > -- > > > > > >>>> > > > >> > > > > > Alessandro Tagliapietra > > > > > >>>> > > > >> > > > > > > > > > > >>>> > > > >> > > > > > > > > > > >>>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John > Roesler < > > > > > >>>> > > > >> vvcep...@apache.org> > > > > > >>>> > > > >> > > wrote: > > > > > >>>> > > > >> > > > > > > > > > > >>>> > > > >> > > > > > > Hi Alessandro, > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > To take a stab at your question, maybe it > > > first > > > > > >>>> doesn't > > > > > >>>> > > find > > > > > >>>> > > > >> it, > > > > > >>>> > > > >> > > but > > > > > >>>> > > > >> > > > > then > > > > > >>>> > > > >> > > > > > > restores some data, writes the > checkpoint, > > > and > > > > > >>>> then later > > > > > >>>> > > on, > > > > > >>>> > > > >> it > > > > > >>>> > > > >> > > has to > > > > > >>>> > > > >> > > > > > > re-initialize the task for some reason, > and > > > > > that's > > > > > >>>> why it > > > > > >>>> > > does > > > > > >>>> > > > >> > > find a > > > > > >>>> > > > >> > > > > > > checkpoint then? > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > More to the heart of the issue, if you > have > > > EOS > > > > > >>>> enabled, > > > > > >>>> > > > >> Streams > > > > > >>>> > > > >> > > _only_ > > > > > >>>> > > > >> > > > > > > records the checkpoint when the store is > > in a > > > > > >>>> > > known-consistent > > > > > >>>> > > > >> > > state. > > > > > >>>> > > > >> > > > > For > > > > > >>>> > > > >> > > > > > > example, if you have a graceful shutdown, > > > > Streams > > > > > >>>> will > > > > > >>>> > > flush > > > > > >>>> > > > >> all > > > > > >>>> > > > >> > > the > > > > > >>>> > > > >> > > > > > > stores, commit all the transactions, and > > then > > > > > >>>> write the > > > > > >>>> > > > >> checkpoint > > > > > >>>> > > > >> > > > > file. > > > > > >>>> > > > >> > > > > > > Then, on re-start, it will pick up from > > that > > > > > >>>> checkpoint. > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > But as soon as it starts processing > > records, > > > it > > > > > >>>> removes > > > > > >>>> > > the > > > > > >>>> > > > >> > > checkpoint > > > > > >>>> > > > >> > > > > > > file, so if it crashes while it was > > > processing, > > > > > >>>> there is > > > > > >>>> > > no > > > > > >>>> > > > >> > > checkpoint > > > > > >>>> > > > >> > > > > file > > > > > >>>> > > > >> > > > > > > there, and it will have to restore from > the > > > > > >>>> beginning of > > > > > >>>> > > the > > > > > >>>> > > > >> > > changelog. > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > This design is there on purpose, because > > > > > otherwise > > > > > >>>> we > > > > > >>>> > > cannot > > > > > >>>> > > > >> > > actually > > > > > >>>> > > > >> > > > > > > guarantee correctness... For example, if > > you > > > > are > > > > > >>>> > > maintaining a > > > > > >>>> > > > >> > > count > > > > > >>>> > > > >> > > > > > > operation, and we process an input record > > i, > > > > > >>>> increment the > > > > > >>>> > > > >> count > > > > > >>>> > > > >> > > and > > > > > >>>> > > > >> > > > > write > > > > > >>>> > > > >> > > > > > > it to the state store, and to the > changelog > > > > > topic. > > > > > >>>> But we > > > > > >>>> > > > >> crash > > > > > >>>> > > > >> > > before > > > > > >>>> > > > >> > > > > we > > > > > >>>> > > > >> > > > > > > commit that transaction. Then, the write > to > > > the > > > > > >>>> changelog > > > > > >>>> > > > >> would be > > > > > >>>> > > > >> > > > > aborted, > > > > > >>>> > > > >> > > > > > > and we would re-process record i . > However, > > > > we've > > > > > >>>> already > > > > > >>>> > > > >> updated > > > > > >>>> > > > >> > > the > > > > > >>>> > > > >> > > > > local > > > > > >>>> > > > >> > > > > > > state store, so when we increment it > again, > > > it > > > > > >>>> results in > > > > > >>>> > > > >> > > > > double-counting > > > > > >>>> > > > >> > > > > > > i. The key point here is that there's no > > way > > > to > > > > > do > > > > > >>>> an > > > > > >>>> > > atomic > > > > > >>>> > > > >> > > operation > > > > > >>>> > > > >> > > > > > > across two different systems (local state > > > store > > > > > >>>> and the > > > > > >>>> > > > >> changelog > > > > > >>>> > > > >> > > > > topic). > > > > > >>>> > > > >> > > > > > > Since we can't guarantee that we roll > back > > > the > > > > > >>>> incremented > > > > > >>>> > > > >> count > > > > > >>>> > > > >> > > when > > > > > >>>> > > > >> > > > > the > > > > > >>>> > > > >> > > > > > > changelog transaction is aborted, we > can't > > > keep > > > > > >>>> the local > > > > > >>>> > > > >> store > > > > > >>>> > > > >> > > > > consistent > > > > > >>>> > > > >> > > > > > > with the changelog. > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > After a crash, the only way to ensure the > > > local > > > > > >>>> store is > > > > > >>>> > > > >> consistent > > > > > >>>> > > > >> > > > > with > > > > > >>>> > > > >> > > > > > > the changelog is to discard the entire > > thing > > > > and > > > > > >>>> rebuild > > > > > >>>> > > it. > > > > > >>>> > > > >> This > > > > > >>>> > > > >> > > is > > > > > >>>> > > > >> > > > > why we > > > > > >>>> > > > >> > > > > > > have an invariant that the checkpoint > file > > > only > > > > > >>>> exists > > > > > >>>> > > when we > > > > > >>>> > > > >> > > _know_ > > > > > >>>> > > > >> > > > > that > > > > > >>>> > > > >> > > > > > > the local store is consistent with the > > > > changelog, > > > > > >>>> and > > > > > >>>> > > this is > > > > > >>>> > > > >> why > > > > > >>>> > > > >> > > > > you're > > > > > >>>> > > > >> > > > > > > seeing so much bandwidth when re-starting > > > from > > > > an > > > > > >>>> unclean > > > > > >>>> > > > >> shutdown. > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > Note that it's definitely possible to do > > > better > > > > > >>>> than this, > > > > > >>>> > > > >> and we > > > > > >>>> > > > >> > > would > > > > > >>>> > > > >> > > > > > > very much like to improve it in the > future. > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > Thanks, > > > > > >>>> > > > >> > > > > > > -John > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro > > > > > >>>> Tagliapietra > > > > > >>>> > > wrote: > > > > > >>>> > > > >> > > > > > > > Hi John, > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > thanks a lot for helping, regarding > your > > > > > message: > > > > > >>>> > > > >> > > > > > > > - no we only have 1 instance of the > > stream > > > > > >>>> application, > > > > > >>>> > > > >> and it > > > > > >>>> > > > >> > > > > always > > > > > >>>> > > > >> > > > > > > > re-uses the same state folder > > > > > >>>> > > > >> > > > > > > > - yes we're seeing most issues when > > > > restarting > > > > > >>>> not > > > > > >>>> > > > >> gracefully > > > > > >>>> > > > >> > > due > > > > > >>>> > > > >> > > > > > > exception > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > I've enabled trace logging and > filtering > > > by a > > > > > >>>> single > > > > > >>>> > > state > > > > > >>>> > > > >> store > > > > > >>>> > > > >> > > the > > > > > >>>> > > > >> > > > > > > > StoreChangelogReader messages are: > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > Added restorer for changelog > > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-0 > > > > > >>>> > > > >> > > > > > > > Added restorer for changelog > > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-1 > > > > > >>>> > > > >> > > > > > > > Added restorer for changelog > > > > > >>>> > > > >> > > > > sensors-stream-aggregate-store-changelog-2 > > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-2 > > > > for > > > > > >>>> store > > > > > >>>> > > > >> > > aggregate-store, > > > > > >>>> > > > >> > > > > > > > rewinding to beginning. > > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-1 > > > > for > > > > > >>>> store > > > > > >>>> > > > >> > > aggregate-store, > > > > > >>>> > > > >> > > > > > > > rewinding to beginning. > > > > > >>>> > > > >> > > > > > > > Did not find checkpoint from changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-0 > > > > for > > > > > >>>> store > > > > > >>>> > > > >> > > aggregate-store, > > > > > >>>> > > > >> > > > > > > > rewinding to beginning. > > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_2 state > > > store > > > > > >>>> > > aggregate-store > > > > > >>>> > > > >> > > > > changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-2 > > > > with > > > > > >>>> EOS > > > > > >>>> > > turned > > > > > >>>> > > > >> on. > > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its > > > state > > > > > >>>> from the > > > > > >>>> > > > >> beginning. > > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_1 state > > > store > > > > > >>>> > > aggregate-store > > > > > >>>> > > > >> > > > > changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-1 > > > > with > > > > > >>>> EOS > > > > > >>>> > > turned > > > > > >>>> > > > >> on. > > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its > > > state > > > > > >>>> from the > > > > > >>>> > > > >> beginning. > > > > > >>>> > > > >> > > > > > > > No checkpoint found for task 0_0 state > > > store > > > > > >>>> > > aggregate-store > > > > > >>>> > > > >> > > > > changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-0 > > > > with > > > > > >>>> EOS > > > > > >>>> > > turned > > > > > >>>> > > > >> on. > > > > > >>>> > > > >> > > > > > > > Reinitializing the task and restore its > > > state > > > > > >>>> from the > > > > > >>>> > > > >> beginning. > > > > > >>>> > > > >> > > > > > > > Found checkpoint 709937 from changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-2 > > > > for > > > > > >>>> store > > > > > >>>> > > > >> > > aggregate-store. > > > > > >>>> > > > >> > > > > > > > Restoring partition > > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-2 > > > > > >>>> > > > >> > > from > > > > > >>>> > > > >> > > > > > > offset > > > > > >>>> > > > >> > > > > > > > 709937 to endOffset 742799 > > > > > >>>> > > > >> > > > > > > > Found checkpoint 3024234 from changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-1 > > > > for > > > > > >>>> store > > > > > >>>> > > > >> > > aggregate-store. > > > > > >>>> > > > >> > > > > > > > Restoring partition > > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-1 > > > > > >>>> > > > >> > > from > > > > > >>>> > > > >> > > > > > > offset > > > > > >>>> > > > >> > > > > > > > 3024234 to endOffset 3131513 > > > > > >>>> > > > >> > > > > > > > Found checkpoint 14514072 from > changelog > > > > > >>>> > > > >> > > > > > > > > > sensors-stream-aggregate-store-changelog-0 > > > > for > > > > > >>>> store > > > > > >>>> > > > >> > > aggregate-store. > > > > > >>>> > > > >> > > > > > > > Restoring partition > > > > > >>>> > > > >> sensors-stream-aggregate-store-changelog-0 > > > > > >>>> > > > >> > > from > > > > > >>>> > > > >> > > > > > > offset > > > > > >>>> > > > >> > > > > > > > 14514072 to endOffset 17116574 > > > > > >>>> > > > >> > > > > > > > Restored from > > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to > > > > > >>>> > > > >> > > > > > > aggregate-store > > > > > >>>> > > > >> > > > > > > > with 966 records, ending offset is > > 711432, > > > > next > > > > > >>>> starting > > > > > >>>> > > > >> > > position is > > > > > >>>> > > > >> > > > > > > 711434 > > > > > >>>> > > > >> > > > > > > > Restored from > > > > > >>>> > > sensors-stream-aggregate-store-changelog-2 to > > > > > >>>> > > > >> > > > > > > aggregate-store > > > > > >>>> > > > >> > > > > > > > with 914 records, ending offset is > > 712711, > > > > next > > > > > >>>> starting > > > > > >>>> > > > >> > > position is > > > > > >>>> > > > >> > > > > > > 712713 > > > > > >>>> > > > >> > > > > > > > Restored from > > > > > >>>> > > sensors-stream-aggregate-store-changelog-1 to > > > > > >>>> > > > >> > > > > > > aggregate-store > > > > > >>>> > > > >> > > > > > > > with 18 records, ending offset is > > 3024261, > > > > next > > > > > >>>> starting > > > > > >>>> > > > >> > > position is > > > > > >>>> > > > >> > > > > > > 3024262 > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > why it first says it didn't find the > > > > checkpoint > > > > > >>>> and > > > > > >>>> > > then it > > > > > >>>> > > > >> does > > > > > >>>> > > > >> > > > > find it? > > > > > >>>> > > > >> > > > > > > > It seems it loaded about 2.7M records > > (sum > > > > of > > > > > >>>> offset > > > > > >>>> > > > >> difference > > > > > >>>> > > > >> > > in > > > > > >>>> > > > >> > > > > the > > > > > >>>> > > > >> > > > > > > > "restorting partition ...." messages) > > > right? > > > > > >>>> > > > >> > > > > > > > Maybe should I try to reduce the > > checkpoint > > > > > >>>> interval? > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > Regards > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > -- > > > > > >>>> > > > >> > > > > > > > Alessandro Tagliapietra > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John > > > Roesler < > > > > > >>>> > > > >> vvcep...@apache.org > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > > wrote: > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > > Hi Alessandro, > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > I'm sorry to hear that. > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > The restore process only takes one > > factor > > > > > into > > > > > >>>> > > account: > > > > > >>>> > > > >> the > > > > > >>>> > > > >> > > current > > > > > >>>> > > > >> > > > > > > offset > > > > > >>>> > > > >> > > > > > > > > position of the changelog topic is > > stored > > > > in > > > > > a > > > > > >>>> local > > > > > >>>> > > file > > > > > >>>> > > > >> > > > > alongside the > > > > > >>>> > > > >> > > > > > > > > state stores. On startup, the app > > checks > > > if > > > > > the > > > > > >>>> > > recorded > > > > > >>>> > > > >> > > position > > > > > >>>> > > > >> > > > > lags > > > > > >>>> > > > >> > > > > > > the > > > > > >>>> > > > >> > > > > > > > > latest offset in the changelog. If > so, > > > then > > > > > it > > > > > >>>> reads > > > > > >>>> > > the > > > > > >>>> > > > >> > > missing > > > > > >>>> > > > >> > > > > > > changelog > > > > > >>>> > > > >> > > > > > > > > records before starting processing. > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > Thus, it would not restore any old > > window > > > > > data. > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > There might be a few different things > > > going > > > > > on > > > > > >>>> to > > > > > >>>> > > explain > > > > > >>>> > > > >> your > > > > > >>>> > > > >> > > > > > > observation: > > > > > >>>> > > > >> > > > > > > > > * if there is more than one instance > in > > > > your > > > > > >>>> Streams > > > > > >>>> > > > >> cluster, > > > > > >>>> > > > >> > > > > maybe the > > > > > >>>> > > > >> > > > > > > > > task is "flopping" between instances, > > so > > > > each > > > > > >>>> instance > > > > > >>>> > > > >> still > > > > > >>>> > > > >> > > has to > > > > > >>>> > > > >> > > > > > > recover > > > > > >>>> > > > >> > > > > > > > > state, since it wasn't the last one > > > > actively > > > > > >>>> > > processing > > > > > >>>> > > > >> it. > > > > > >>>> > > > >> > > > > > > > > * if the application isn't stopped > > > > > gracefully, > > > > > >>>> it > > > > > >>>> > > might > > > > > >>>> > > > >> not > > > > > >>>> > > > >> > > get a > > > > > >>>> > > > >> > > > > > > chance > > > > > >>>> > > > >> > > > > > > > > to record its offset in that local > > file, > > > so > > > > > on > > > > > >>>> > > restart it > > > > > >>>> > > > >> has > > > > > >>>> > > > >> > > to > > > > > >>>> > > > >> > > > > > > restore > > > > > >>>> > > > >> > > > > > > > > some or all of the state store from > > > > > changelog. > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > Or it could be something else; that's > > > just > > > > > >>>> what comes > > > > > >>>> > > to > > > > > >>>> > > > >> mind. > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > If you want to get to the bottom of > it, > > > you > > > > > >>>> can take a > > > > > >>>> > > > >> look at > > > > > >>>> > > > >> > > the > > > > > >>>> > > > >> > > > > > > logs, > > > > > >>>> > > > >> > > > > > > > > paying close attention to which tasks > > are > > > > > >>>> assigned to > > > > > >>>> > > > >> which > > > > > >>>> > > > >> > > > > instances > > > > > >>>> > > > >> > > > > > > after > > > > > >>>> > > > >> > > > > > > > > each restart. You can also look into > > the > > > > logs > > > > > >>>> from > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > >>>> > > > > > > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader` > > > > > >>>> > > > >> > > > > > > (might > > > > > >>>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE > level > > to > > > > > >>>> really see > > > > > >>>> > > > >> what's > > > > > >>>> > > > >> > > > > happening). > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > I hope this helps! > > > > > >>>> > > > >> > > > > > > > > -John > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25, > > Alessandro > > > > > >>>> Tagliapietra > > > > > >>>> > > > >> wrote: > > > > > >>>> > > > >> > > > > > > > > > Hello everyone, > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > we're having a problem with > bandwidth > > > > usage > > > > > >>>> on > > > > > >>>> > > streams > > > > > >>>> > > > >> > > > > application > > > > > >>>> > > > >> > > > > > > > > startup, > > > > > >>>> > > > >> > > > > > > > > > our current setup does this: > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > ... > > > > > >>>> > > > >> > > > > > > > > > .groupByKey() > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > >>>> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) > > > > > >>>> > > > >> > > > > > > > > > .aggregate( > > > > > >>>> > > > >> > > > > > > > > > { > > > MetricSequenceList(ArrayList()) > > > > > }, > > > > > >>>> > > > >> > > > > > > > > > { key, value, aggregate -> > > > > > >>>> > > > >> > > > > > > > > > > > > > > aggregate.getRecords().add(value) > > > > > >>>> > > > >> > > > > > > > > > aggregate > > > > > >>>> > > > >> > > > > > > > > > }, > > > > > >>>> > > > >> > > > > > > > > > Materialized.`as`<String, > > > > > >>>> > > MetricSequenceList, > > > > > >>>> > > > >> > > > > > > WindowStore<Bytes, > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > > > > >>>> > > > >> > > > > > > > >>>> > > > >> > > > > > >>>> > > > > > > > >>>> > > > > > > > > > > > > > > > ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) > > > > > >>>> > > > >> > > > > > > > > > ) > > > > > >>>> > > > >> > > > > > > > > > .toStream() > > > > > >>>> > > > >> > > > > > > > > > .flatTransform(TransformerSupplier > { > > > > > >>>> > > > >> > > > > > > > > > ... > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > basically in each window we append > > the > > > > new > > > > > >>>> values > > > > > >>>> > > and > > > > > >>>> > > > >> then do > > > > > >>>> > > > >> > > > > some > > > > > >>>> > > > >> > > > > > > other > > > > > >>>> > > > >> > > > > > > > > > logic with the array of windowed > > > values. > > > > > >>>> > > > >> > > > > > > > > > The aggregate-store changelog topic > > > > > >>>> configuration > > > > > >>>> > > uses > > > > > >>>> > > > >> > > > > > > compact,delete as > > > > > >>>> > > > >> > > > > > > > > > cleanup policy and has 12 hours of > > > > > retention. > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > What we've seen is that on > > application > > > > > >>>> startup it > > > > > >>>> > > takes > > > > > >>>> > > > >> a > > > > > >>>> > > > >> > > couple > > > > > >>>> > > > >> > > > > > > minutes > > > > > >>>> > > > >> > > > > > > > > to > > > > > >>>> > > > >> > > > > > > > > > rebuild the state store, even if > the > > > > state > > > > > >>>> store > > > > > >>>> > > > >> directory is > > > > > >>>> > > > >> > > > > > > persisted > > > > > >>>> > > > >> > > > > > > > > > across restarts. That along with an > > > > > >>>> exception that > > > > > >>>> > > > >> caused the > > > > > >>>> > > > >> > > > > docker > > > > > >>>> > > > >> > > > > > > > > > container to be restarted a couple > > > > hundreds > > > > > >>>> times > > > > > >>>> > > > >> caused a > > > > > >>>> > > > >> > > big > > > > > >>>> > > > >> > > > > > > confluent > > > > > >>>> > > > >> > > > > > > > > > cloud bill compared to what we > > usually > > > > > spend > > > > > >>>> (1/4 > > > > > >>>> > > of a > > > > > >>>> > > > >> full > > > > > >>>> > > > >> > > > > month in > > > > > >>>> > > > >> > > > > > > 1 > > > > > >>>> > > > >> > > > > > > > > day). > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > What I think is happening is that > the > > > > topic > > > > > >>>> is > > > > > >>>> > > keeping > > > > > >>>> > > > >> all > > > > > >>>> > > > >> > > the > > > > > >>>> > > > >> > > > > > > previous > > > > > >>>> > > > >> > > > > > > > > > windows even with the compacting > > policy > > > > > >>>> because each > > > > > >>>> > > > >> key is > > > > > >>>> > > > >> > > the > > > > > >>>> > > > >> > > > > > > original > > > > > >>>> > > > >> > > > > > > > > > key + the timestamp not just the > key. > > > > Since > > > > > >>>> we don't > > > > > >>>> > > > >> care > > > > > >>>> > > > >> > > about > > > > > >>>> > > > >> > > > > > > previous > > > > > >>>> > > > >> > > > > > > > > > windows as the flatTransform after > > the > > > > > >>>> toStream() > > > > > >>>> > > makes > > > > > >>>> > > > >> sure > > > > > >>>> > > > >> > > > > that we > > > > > >>>> > > > >> > > > > > > > > don't > > > > > >>>> > > > >> > > > > > > > > > process old windows (a custom > > > suppressor > > > > > >>>> basically) > > > > > >>>> > > is > > > > > >>>> > > > >> there > > > > > >>>> > > > >> > > a > > > > > >>>> > > > >> > > > > way to > > > > > >>>> > > > >> > > > > > > > > only > > > > > >>>> > > > >> > > > > > > > > > keep the last window so that the > > store > > > > > >>>> rebuilding > > > > > >>>> > > goes > > > > > >>>> > > > >> > > faster and > > > > > >>>> > > > >> > > > > > > without > > > > > >>>> > > > >> > > > > > > > > > rebuilding old windows too? Or > > should I > > > > > >>>> create a > > > > > >>>> > > custom > > > > > >>>> > > > >> > > window > > > > > >>>> > > > >> > > > > using > > > > > >>>> > > > >> > > > > > > the > > > > > >>>> > > > >> > > > > > > > > > original key as key so that the > > > > compaction > > > > > >>>> keeps > > > > > >>>> > > only > > > > > >>>> > > > >> the > > > > > >>>> > > > >> > > last > > > > > >>>> > > > >> > > > > window > > > > > >>>> > > > >> > > > > > > > > data? > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > Thank you > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > -- > > > > > >>>> > > > >> > > > > > > > > > Alessandro Tagliapietra > > > > > >>>> > > > >> > > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > > > > >>>> > > > >> > > > > > > > > > > > >>>> > > > >> > > > > > > > > > > >>>> > > > >> > > > > > > > > > >>>> > > > >> > > > > > > > > >>>> > > > >> > > > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > >>>> > > > > > > > >>>> > > > > > > >>>> > > > > > >>> > > > > > > > > > > > > > > >