Hmm, that’s a good question. Now that we’re talking about caching, I wonder if the cache was just too small. It’s not very big by default.
On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote: > Ok I'll check on that! > > Now I can see that with caching we went from 3-4MB/s to 400KB/s, that will > help with the bill. > > Last question, any reason why after a while the regular windowed stream > starts sending every update instead of caching? > Could it be because it doesn't have any more memory available? Any other > possible reason? > > Thank you so much for your help > > -- > Alessandro Tagliapietra > > > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <vvcep...@apache.org> wrote: > > > Ah, yes. Glad you figured it out! > > > > Caching does not reduce EOS guarantees at all. I highly recommend using > > it. You might even want to take a look at the caching metrics to make sure > > you have a good hit ratio. > > > > -John > > > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote: > > > Never mind I've found out I can use `.withCachingEnabled` on the store > > > builder to achieve the same thing as the windowing example as > > > `Materialized.as` turns that on by default. > > > > > > Does caching in any way reduces the EOS guarantees? > > > > > > -- > > > Alessandro Tagliapietra > > > > > > > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra < > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > Seems my journey with this isn't done just yet, > > > > > > > > This seems very complicated to me but I'll try to explain it as best I > > can. > > > > To better understand the streams network usage I've used prometheus > > with > > > > the JMX exporter to export kafka metrics. > > > > To check the amount of data we use I'm looking at the increments > > > > of kafka_producer_topic_metrics_byte_total and > > > > kafka_producer_producer_topic_metrics_record_send_total, > > > > > > > > Our current (before the change mentioned above) code looks like this: > > > > > > > > // This transformers just pairs a value with the previous one storing > > the > > > > temporary one in a store > > > > val pairsStream = metricStream > > > > .transformValues(ValueTransformerWithKeySupplier { PairTransformer() > > }, > > > > "LastValueStore") > > > > .filter { _, value: MetricSequence? -> value != null } > > > > > > > > // Create a store to store suppressed windows until a new one is > > received > > > > val suppressStoreSupplier = > > > > > > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"), > > > > ...... > > > > > > > > // Window and aggregate data in 1 minute intervals > > > > val aggregatedStream = pairsStream > > > > .groupByKey() > > > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) > > > > .aggregate( > > > > { MetricSequenceList(ArrayList()) }, > > > > { key, value, aggregate -> > > > > aggregate.getRecords().add(value) > > > > aggregate > > > > }, > > > > Materialized.`as`<String, MetricSequenceList, > > WindowStore<Bytes, > > > > > > ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) > > > > ) > > > > .toStream() > > > > .flatTransform(TransformerSupplier { > > > > // This transformer basically waits until a new window is > > received > > > > to emit the previous one > > > > }, "suppress-store") > > > > .map { sensorId: String, suppressedOutput: SuppressedOutput -> > > > > .... etc .... > > > > > > > > > > > > Basically: > > > > - all data goes through LastValueStore store that stores each message > > and > > > > emits a pair with the previous one > > > > - the aggregate-store is used to store the per-window list of > > messages in > > > > the aggregate method > > > > - the suppress store is used to store each received window which is > > > > emitted only after a newer one is received > > > > > > > > What I'm experiencing is that: > > > > - during normal execution, the streams app sends to the lastvalue > > store > > > > changelog topic 5k messages/min, the aggregate and suppress store > > changelog > > > > topics only about 100 > > > > - at some point (after many hours of operation), the streams app > > starts > > > > sending to the aggregate and suppress store changelog topic the same > > amount > > > > of messages going through the lastvaluestore > > > > - if I restart the streams app it goes back to the initial behavior > > > > > > > > You can see the behavior in this graph https://imgur.com/dJcUNSf > > > > You can also see that after a restart everything goes back to normal > > > > levels. > > > > Regarding other metrics, process latency increases, poll latency > > > > decreases, poll rate decreases, commit rate stays the same while commit > > > > latency increases. > > > > > > > > Now, I've these questions: > > > > - why isn't the aggregate/suppress store changelog topic throughput > > the > > > > same as the LastValueStore? Shouldn't every time it aggregates send a > > > > record to the changelog? > > > > - is the windowing doing some internal caching like not sending every > > > > aggregation record until the window time is passed? (if so, where can I > > > > find that code since I would like to use that also for our new > > > > implementation) > > > > > > > > Thank you in advance > > > > > > > > -- > > > > Alessandro Tagliapietra > > > > > > > > > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <vvcep...@apache.org> > > wrote: > > > > > > > >> Oh, good! > > > >> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote: > > > >> > Testing on staging shows that a restart on exception is much faster > > and > > > >> the > > > >> > stream starts right away which I think means we're reading way less > > data > > > >> > than before! > > > >> > > > > >> > What I was referring to is that, in Streams, the keys for window > > > >> > > aggregation state is actually composed of both the window itself > > and > > > >> the > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results in the > > > >> store > > > >> > > having a unique key per window for each K, which is why we need > > > >> retention > > > >> > > as well as compaction for our changelogs. But for you, if you just > > > >> make the > > > >> > > key "K", then compaction alone should do the trick. > > > >> > > > > >> > Yes we had compact,delete as cleanup policy but probably it still > > had a > > > >> too > > > >> > long retention value, also the rocksdb store is probably much > > faster now > > > >> > having only one key per key instead of one key per window per key. > > > >> > > > > >> > Thanks a lot for helping! I'm now going to setup a prometheus-jmx > > > >> > monitoring so we can keep better track of what's going on :) > > > >> > > > > >> > -- > > > >> > Alessandro Tagliapietra > > > >> > > > > >> > > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler <vvcep...@apache.org> > > > >> wrote: > > > >> > > > > >> > > Oh, yeah, I remember that conversation! > > > >> > > > > > >> > > Yes, then, I agree, if you're only storing state of the most > > recent > > > >> window > > > >> > > for each key, and the key you use for that state is actually the > > key > > > >> of the > > > >> > > records, then an aggressive compaction policy plus your custom > > > >> transformer > > > >> > > seems like a good way forward. > > > >> > > > > > >> > > What I was referring to is that, in Streams, the keys for window > > > >> > > aggregation state is actually composed of both the window itself > > and > > > >> the > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results in the > > > >> store > > > >> > > having a unique key per window for each K, which is why we need > > > >> retention > > > >> > > as well as compaction for our changelogs. But for you, if you just > > > >> make the > > > >> > > key "K", then compaction alone should do the trick. > > > >> > > > > > >> > > And yes, if you manage the topic yourself, then Streams won't > > adjust > > > >> the > > > >> > > retention time. I think it might validate that the retention > > isn't too > > > >> > > short, but I don't remember offhand. > > > >> > > > > > >> > > Cheers, and let me know how it goes! > > > >> > > -John > > > >> > > > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro Tagliapietra wrote: > > > >> > > > Hi John, > > > >> > > > > > > >> > > > afaik grace period uses stream time > > > >> > > > > > > >> > > > > > >> > > https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html > > > >> > > > which is > > > >> > > > per partition, unfortunately we process data that's not in sync > > > >> between > > > >> > > > keys so each key needs to be independent and a key can have much > > > >> older > > > >> > > > data > > > >> > > > than the other. > > > >> > > > > > > >> > > > Having a small grace period would probably close old windows > > sooner > > > >> than > > > >> > > > expected. That's also why in my use case a custom store that > > just > > > >> stores > > > >> > > > the last window data for each key might work better. I had the > > same > > > >> issue > > > >> > > > with suppression and it has been reported here > > > >> > > > https://issues.apache.org/jira/browse/KAFKA-8769 > > > >> > > > Oh I just saw that you're the one that helped me on slack and > > > >> created the > > > >> > > > issue (thanks again for that). > > > >> > > > > > > >> > > > The behavior that you mention about streams setting changelog > > > >> retention > > > >> > > > time is something they do on creation of the topic when the > > broker > > > >> has > > > >> > > auto > > > >> > > > creation enabled? Because we're using confluent cloud and I had > > to > > > >> create > > > >> > > > it manually. > > > >> > > > Regarding the change in the recovery behavior, with compact > > cleanup > > > >> > > policy > > > >> > > > shouldn't the changelog only keep the last value? That would > > make > > > >> the > > > >> > > > recovery faster and "cheaper" as it would only need to read a > > single > > > >> > > value > > > >> > > > per key (if the cleanup just happened) right? > > > >> > > > > > > >> > > > -- > > > >> > > > Alessandro Tagliapietra > > > >> > > > > > > >> > > > > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler < > > vvcep...@apache.org> > > > >> wrote: > > > >> > > > > > > >> > > > > Hey Alessandro, > > > >> > > > > > > > >> > > > > That sounds also like it would work. I'm wondering if it would > > > >> actually > > > >> > > > > change what you observe w.r.t. recovery behavior, though. > > Streams > > > >> > > already > > > >> > > > > sets the retention time on the changelog to equal the > > retention > > > >> time > > > >> > > of the > > > >> > > > > windows, for windowed aggregations, so you shouldn't be > > loading a > > > >> lot > > > >> > > of > > > >> > > > > window data for old windows you no longer care about. > > > >> > > > > > > > >> > > > > Have you set the "grace period" on your window definition? By > > > >> default, > > > >> > > it > > > >> > > > > is set to 24 hours, but you can set it as low as you like. > > E.g., > > > >> if you > > > >> > > > > want to commit to having in-order data only, then you can set > > the > > > >> grace > > > >> > > > > period to zero. This _should_ let the broker clean up the > > > >> changelog > > > >> > > records > > > >> > > > > as soon as the window ends. > > > >> > > > > > > > >> > > > > Of course, the log cleaner doesn't run all the time, so > > there's > > > >> some > > > >> > > extra > > > >> > > > > delay in which "expired" data would still be visible in the > > > >> changelog, > > > >> > > but > > > >> > > > > it would actually be just the same as if you manage the store > > > >> yourself. > > > >> > > > > > > > >> > > > > Hope this helps! > > > >> > > > > -John > > > >> > > > > > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra wrote: > > > >> > > > > > Thanks John for the explanation, > > > >> > > > > > > > > >> > > > > > I thought that with EOS enabled (which we have) it would in > > the > > > >> worst > > > >> > > > > case > > > >> > > > > > find a valid checkpoint and start the restore from there > > until > > > >> it > > > >> > > reached > > > >> > > > > > the last committed status, not completely from scratch. What > > > >> you say > > > >> > > > > > definitely makes sense now. > > > >> > > > > > Since we don't really need old time windows and we ensure > > data > > > >> is > > > >> > > ordered > > > >> > > > > > when processed I think I"ll just write a custom transformer > > to > > > >> keep > > > >> > > only > > > >> > > > > > the last window, store intermediate aggregation results in > > the > > > >> store > > > >> > > and > > > >> > > > > > emit a new value only when we receive data belonging to a > > new > > > >> window. > > > >> > > > > > That with a compact only changelog topic should keep the > > rebuild > > > >> > > data to > > > >> > > > > > the minimum as it would have only the last value for each > > key. > > > >> > > > > > > > > >> > > > > > Hope that makes sense > > > >> > > > > > > > > >> > > > > > Thanks again > > > >> > > > > > > > > >> > > > > > -- > > > >> > > > > > Alessandro Tagliapietra > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler < > > > >> vvcep...@apache.org> > > > >> > > wrote: > > > >> > > > > > > > > >> > > > > > > Hi Alessandro, > > > >> > > > > > > > > > >> > > > > > > To take a stab at your question, maybe it first doesn't > > find > > > >> it, > > > >> > > but > > > >> > > > > then > > > >> > > > > > > restores some data, writes the checkpoint, and then later > > on, > > > >> it > > > >> > > has to > > > >> > > > > > > re-initialize the task for some reason, and that's why it > > does > > > >> > > find a > > > >> > > > > > > checkpoint then? > > > >> > > > > > > > > > >> > > > > > > More to the heart of the issue, if you have EOS enabled, > > > >> Streams > > > >> > > _only_ > > > >> > > > > > > records the checkpoint when the store is in a > > known-consistent > > > >> > > state. > > > >> > > > > For > > > >> > > > > > > example, if you have a graceful shutdown, Streams will > > flush > > > >> all > > > >> > > the > > > >> > > > > > > stores, commit all the transactions, and then write the > > > >> checkpoint > > > >> > > > > file. > > > >> > > > > > > Then, on re-start, it will pick up from that checkpoint. > > > >> > > > > > > > > > >> > > > > > > But as soon as it starts processing records, it removes > > the > > > >> > > checkpoint > > > >> > > > > > > file, so if it crashes while it was processing, there is > > no > > > >> > > checkpoint > > > >> > > > > file > > > >> > > > > > > there, and it will have to restore from the beginning of > > the > > > >> > > changelog. > > > >> > > > > > > > > > >> > > > > > > This design is there on purpose, because otherwise we > > cannot > > > >> > > actually > > > >> > > > > > > guarantee correctness... For example, if you are > > maintaining a > > > >> > > count > > > >> > > > > > > operation, and we process an input record i, increment the > > > >> count > > > >> > > and > > > >> > > > > write > > > >> > > > > > > it to the state store, and to the changelog topic. But we > > > >> crash > > > >> > > before > > > >> > > > > we > > > >> > > > > > > commit that transaction. Then, the write to the changelog > > > >> would be > > > >> > > > > aborted, > > > >> > > > > > > and we would re-process record i . However, we've already > > > >> updated > > > >> > > the > > > >> > > > > local > > > >> > > > > > > state store, so when we increment it again, it results in > > > >> > > > > double-counting > > > >> > > > > > > i. The key point here is that there's no way to do an > > atomic > > > >> > > operation > > > >> > > > > > > across two different systems (local state store and the > > > >> changelog > > > >> > > > > topic). > > > >> > > > > > > Since we can't guarantee that we roll back the incremented > > > >> count > > > >> > > when > > > >> > > > > the > > > >> > > > > > > changelog transaction is aborted, we can't keep the local > > > >> store > > > >> > > > > consistent > > > >> > > > > > > with the changelog. > > > >> > > > > > > > > > >> > > > > > > After a crash, the only way to ensure the local store is > > > >> consistent > > > >> > > > > with > > > >> > > > > > > the changelog is to discard the entire thing and rebuild > > it. > > > >> This > > > >> > > is > > > >> > > > > why we > > > >> > > > > > > have an invariant that the checkpoint file only exists > > when we > > > >> > > _know_ > > > >> > > > > that > > > >> > > > > > > the local store is consistent with the changelog, and > > this is > > > >> why > > > >> > > > > you're > > > >> > > > > > > seeing so much bandwidth when re-starting from an unclean > > > >> shutdown. > > > >> > > > > > > > > > >> > > > > > > Note that it's definitely possible to do better than this, > > > >> and we > > > >> > > would > > > >> > > > > > > very much like to improve it in the future. > > > >> > > > > > > > > > >> > > > > > > Thanks, > > > >> > > > > > > -John > > > >> > > > > > > > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro Tagliapietra > > wrote: > > > >> > > > > > > > Hi John, > > > >> > > > > > > > > > > >> > > > > > > > thanks a lot for helping, regarding your message: > > > >> > > > > > > > - no we only have 1 instance of the stream application, > > > >> and it > > > >> > > > > always > > > >> > > > > > > > re-uses the same state folder > > > >> > > > > > > > - yes we're seeing most issues when restarting not > > > >> gracefully > > > >> > > due > > > >> > > > > > > exception > > > >> > > > > > > > > > > >> > > > > > > > I've enabled trace logging and filtering by a single > > state > > > >> store > > > >> > > the > > > >> > > > > > > > StoreChangelogReader messages are: > > > >> > > > > > > > > > > >> > > > > > > > Added restorer for changelog > > > >> > > > > sensors-stream-aggregate-store-changelog-0 > > > >> > > > > > > > Added restorer for changelog > > > >> > > > > sensors-stream-aggregate-store-changelog-1 > > > >> > > > > > > > Added restorer for changelog > > > >> > > > > sensors-stream-aggregate-store-changelog-2 > > > >> > > > > > > > Did not find checkpoint from changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for store > > > >> > > aggregate-store, > > > >> > > > > > > > rewinding to beginning. > > > >> > > > > > > > Did not find checkpoint from changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for store > > > >> > > aggregate-store, > > > >> > > > > > > > rewinding to beginning. > > > >> > > > > > > > Did not find checkpoint from changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for store > > > >> > > aggregate-store, > > > >> > > > > > > > rewinding to beginning. > > > >> > > > > > > > No checkpoint found for task 0_2 state store > > aggregate-store > > > >> > > > > changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 with EOS > > turned > > > >> on. > > > >> > > > > > > > Reinitializing the task and restore its state from the > > > >> beginning. > > > >> > > > > > > > No checkpoint found for task 0_1 state store > > aggregate-store > > > >> > > > > changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 with EOS > > turned > > > >> on. > > > >> > > > > > > > Reinitializing the task and restore its state from the > > > >> beginning. > > > >> > > > > > > > No checkpoint found for task 0_0 state store > > aggregate-store > > > >> > > > > changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 with EOS > > turned > > > >> on. > > > >> > > > > > > > Reinitializing the task and restore its state from the > > > >> beginning. > > > >> > > > > > > > Found checkpoint 709937 from changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for store > > > >> > > aggregate-store. > > > >> > > > > > > > Restoring partition > > > >> sensors-stream-aggregate-store-changelog-2 > > > >> > > from > > > >> > > > > > > offset > > > >> > > > > > > > 709937 to endOffset 742799 > > > >> > > > > > > > Found checkpoint 3024234 from changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for store > > > >> > > aggregate-store. > > > >> > > > > > > > Restoring partition > > > >> sensors-stream-aggregate-store-changelog-1 > > > >> > > from > > > >> > > > > > > offset > > > >> > > > > > > > 3024234 to endOffset 3131513 > > > >> > > > > > > > Found checkpoint 14514072 from changelog > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for store > > > >> > > aggregate-store. > > > >> > > > > > > > Restoring partition > > > >> sensors-stream-aggregate-store-changelog-0 > > > >> > > from > > > >> > > > > > > offset > > > >> > > > > > > > 14514072 to endOffset 17116574 > > > >> > > > > > > > Restored from > > sensors-stream-aggregate-store-changelog-2 to > > > >> > > > > > > aggregate-store > > > >> > > > > > > > with 966 records, ending offset is 711432, next starting > > > >> > > position is > > > >> > > > > > > 711434 > > > >> > > > > > > > Restored from > > sensors-stream-aggregate-store-changelog-2 to > > > >> > > > > > > aggregate-store > > > >> > > > > > > > with 914 records, ending offset is 712711, next starting > > > >> > > position is > > > >> > > > > > > 712713 > > > >> > > > > > > > Restored from > > sensors-stream-aggregate-store-changelog-1 to > > > >> > > > > > > aggregate-store > > > >> > > > > > > > with 18 records, ending offset is 3024261, next starting > > > >> > > position is > > > >> > > > > > > 3024262 > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > why it first says it didn't find the checkpoint and > > then it > > > >> does > > > >> > > > > find it? > > > >> > > > > > > > It seems it loaded about 2.7M records (sum of offset > > > >> difference > > > >> > > in > > > >> > > > > the > > > >> > > > > > > > "restorting partition ...." messages) right? > > > >> > > > > > > > Maybe should I try to reduce the checkpoint interval? > > > >> > > > > > > > > > > >> > > > > > > > Regards > > > >> > > > > > > > > > > >> > > > > > > > -- > > > >> > > > > > > > Alessandro Tagliapietra > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John Roesler < > > > >> vvcep...@apache.org > > > >> > > > > > > >> > > > > wrote: > > > >> > > > > > > > > > > >> > > > > > > > > Hi Alessandro, > > > >> > > > > > > > > > > > >> > > > > > > > > I'm sorry to hear that. > > > >> > > > > > > > > > > > >> > > > > > > > > The restore process only takes one factor into > > account: > > > >> the > > > >> > > current > > > >> > > > > > > offset > > > >> > > > > > > > > position of the changelog topic is stored in a local > > file > > > >> > > > > alongside the > > > >> > > > > > > > > state stores. On startup, the app checks if the > > recorded > > > >> > > position > > > >> > > > > lags > > > >> > > > > > > the > > > >> > > > > > > > > latest offset in the changelog. If so, then it reads > > the > > > >> > > missing > > > >> > > > > > > changelog > > > >> > > > > > > > > records before starting processing. > > > >> > > > > > > > > > > > >> > > > > > > > > Thus, it would not restore any old window data. > > > >> > > > > > > > > > > > >> > > > > > > > > There might be a few different things going on to > > explain > > > >> your > > > >> > > > > > > observation: > > > >> > > > > > > > > * if there is more than one instance in your Streams > > > >> cluster, > > > >> > > > > maybe the > > > >> > > > > > > > > task is "flopping" between instances, so each instance > > > >> still > > > >> > > has to > > > >> > > > > > > recover > > > >> > > > > > > > > state, since it wasn't the last one actively > > processing > > > >> it. > > > >> > > > > > > > > * if the application isn't stopped gracefully, it > > might > > > >> not > > > >> > > get a > > > >> > > > > > > chance > > > >> > > > > > > > > to record its offset in that local file, so on > > restart it > > > >> has > > > >> > > to > > > >> > > > > > > restore > > > >> > > > > > > > > some or all of the state store from changelog. > > > >> > > > > > > > > > > > >> > > > > > > > > Or it could be something else; that's just what comes > > to > > > >> mind. > > > >> > > > > > > > > > > > >> > > > > > > > > If you want to get to the bottom of it, you can take a > > > >> look at > > > >> > > the > > > >> > > > > > > logs, > > > >> > > > > > > > > paying close attention to which tasks are assigned to > > > >> which > > > >> > > > > instances > > > >> > > > > > > after > > > >> > > > > > > > > each restart. You can also look into the logs from > > > >> > > > > > > > > > > > >> > > > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader` > > > >> > > > > > > (might > > > >> > > > > > > > > want to set it to DEBUG or TRACE level to really see > > > >> what's > > > >> > > > > happening). > > > >> > > > > > > > > > > > >> > > > > > > > > I hope this helps! > > > >> > > > > > > > > -John > > > >> > > > > > > > > > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25, Alessandro Tagliapietra > > > >> wrote: > > > >> > > > > > > > > > Hello everyone, > > > >> > > > > > > > > > > > > >> > > > > > > > > > we're having a problem with bandwidth usage on > > streams > > > >> > > > > application > > > >> > > > > > > > > startup, > > > >> > > > > > > > > > our current setup does this: > > > >> > > > > > > > > > > > > >> > > > > > > > > > ... > > > >> > > > > > > > > > .groupByKey() > > > >> > > > > > > > > > > > > >> > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) > > > >> > > > > > > > > > .aggregate( > > > >> > > > > > > > > > { MetricSequenceList(ArrayList()) }, > > > >> > > > > > > > > > { key, value, aggregate -> > > > >> > > > > > > > > > aggregate.getRecords().add(value) > > > >> > > > > > > > > > aggregate > > > >> > > > > > > > > > }, > > > >> > > > > > > > > > Materialized.`as`<String, > > MetricSequenceList, > > > >> > > > > > > WindowStore<Bytes, > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > >> > > ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) > > > >> > > > > > > > > > ) > > > >> > > > > > > > > > .toStream() > > > >> > > > > > > > > > .flatTransform(TransformerSupplier { > > > >> > > > > > > > > > ... > > > >> > > > > > > > > > > > > >> > > > > > > > > > basically in each window we append the new values > > and > > > >> then do > > > >> > > > > some > > > >> > > > > > > other > > > >> > > > > > > > > > logic with the array of windowed values. > > > >> > > > > > > > > > The aggregate-store changelog topic configuration > > uses > > > >> > > > > > > compact,delete as > > > >> > > > > > > > > > cleanup policy and has 12 hours of retention. > > > >> > > > > > > > > > > > > >> > > > > > > > > > What we've seen is that on application startup it > > takes > > > >> a > > > >> > > couple > > > >> > > > > > > minutes > > > >> > > > > > > > > to > > > >> > > > > > > > > > rebuild the state store, even if the state store > > > >> directory is > > > >> > > > > > > persisted > > > >> > > > > > > > > > across restarts. That along with an exception that > > > >> caused the > > > >> > > > > docker > > > >> > > > > > > > > > container to be restarted a couple hundreds times > > > >> caused a > > > >> > > big > > > >> > > > > > > confluent > > > >> > > > > > > > > > cloud bill compared to what we usually spend (1/4 > > of a > > > >> full > > > >> > > > > month in > > > >> > > > > > > 1 > > > >> > > > > > > > > day). > > > >> > > > > > > > > > > > > >> > > > > > > > > > What I think is happening is that the topic is > > keeping > > > >> all > > > >> > > the > > > >> > > > > > > previous > > > >> > > > > > > > > > windows even with the compacting policy because each > > > >> key is > > > >> > > the > > > >> > > > > > > original > > > >> > > > > > > > > > key + the timestamp not just the key. Since we don't > > > >> care > > > >> > > about > > > >> > > > > > > previous > > > >> > > > > > > > > > windows as the flatTransform after the toStream() > > makes > > > >> sure > > > >> > > > > that we > > > >> > > > > > > > > don't > > > >> > > > > > > > > > process old windows (a custom suppressor basically) > > is > > > >> there > > > >> > > a > > > >> > > > > way to > > > >> > > > > > > > > only > > > >> > > > > > > > > > keep the last window so that the store rebuilding > > goes > > > >> > > faster and > > > >> > > > > > > without > > > >> > > > > > > > > > rebuilding old windows too? Or should I create a > > custom > > > >> > > window > > > >> > > > > using > > > >> > > > > > > the > > > >> > > > > > > > > > original key as key so that the compaction keeps > > only > > > >> the > > > >> > > last > > > >> > > > > window > > > >> > > > > > > > > data? > > > >> > > > > > > > > > > > > >> > > > > > > > > > Thank you > > > >> > > > > > > > > > > > > >> > > > > > > > > > -- > > > >> > > > > > > > > > Alessandro Tagliapietra > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > >