And it seems that for some reason after a while caching works again without a restart of the streams application
[image: Screen Shot 2019-12-08 at 11.59.30 PM.png] I'll try to enable debug metrics and see if I can find something useful there. Any idea is appreciated in the meantime :) -- Alessandro Tagliapietra On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > It seems that even with caching enabled, after a while the sent bytes stil > go up > > [image: Screen Shot 2019-12-08 at 12.52.31 PM.png] > > you can see the deploy when I've enabled caching but it looks like it's > still a temporary solution. > > -- > Alessandro Tagliapietra > > > On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > >> Could be, but since we have a limite amount of input keys (~30), >> windowing generates new keys but old ones are never touched again since the >> data per key is in order, I assume it shouldn't be a big deal for it to >> handle 30 keys >> I'll have a look at cache metrics and see if something pops out >> >> Thanks >> >> -- >> Alessandro Tagliapietra >> >> >> On Sat, Dec 7, 2019 at 10:02 AM John Roesler <vvcep...@apache.org> wrote: >> >>> Hmm, that’s a good question. Now that we’re talking about caching, I >>> wonder if the cache was just too small. It’s not very big by default. >>> >>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote: >>> > Ok I'll check on that! >>> > >>> > Now I can see that with caching we went from 3-4MB/s to 400KB/s, that >>> will >>> > help with the bill. >>> > >>> > Last question, any reason why after a while the regular windowed stream >>> > starts sending every update instead of caching? >>> > Could it be because it doesn't have any more memory available? Any >>> other >>> > possible reason? >>> > >>> > Thank you so much for your help >>> > >>> > -- >>> > Alessandro Tagliapietra >>> > >>> > >>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler <vvcep...@apache.org> >>> wrote: >>> > >>> > > Ah, yes. Glad you figured it out! >>> > > >>> > > Caching does not reduce EOS guarantees at all. I highly recommend >>> using >>> > > it. You might even want to take a look at the caching metrics to >>> make sure >>> > > you have a good hit ratio. >>> > > >>> > > -John >>> > > >>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote: >>> > > > Never mind I've found out I can use `.withCachingEnabled` on the >>> store >>> > > > builder to achieve the same thing as the windowing example as >>> > > > `Materialized.as` turns that on by default. >>> > > > >>> > > > Does caching in any way reduces the EOS guarantees? >>> > > > >>> > > > -- >>> > > > Alessandro Tagliapietra >>> > > > >>> > > > >>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra < >>> > > > tagliapietra.alessan...@gmail.com> wrote: >>> > > > >>> > > > > Seems my journey with this isn't done just yet, >>> > > > > >>> > > > > This seems very complicated to me but I'll try to explain it as >>> best I >>> > > can. >>> > > > > To better understand the streams network usage I've used >>> prometheus >>> > > with >>> > > > > the JMX exporter to export kafka metrics. >>> > > > > To check the amount of data we use I'm looking at the increments >>> > > > > of kafka_producer_topic_metrics_byte_total and >>> > > > > kafka_producer_producer_topic_metrics_record_send_total, >>> > > > > >>> > > > > Our current (before the change mentioned above) code looks like >>> this: >>> > > > > >>> > > > > // This transformers just pairs a value with the previous one >>> storing >>> > > the >>> > > > > temporary one in a store >>> > > > > val pairsStream = metricStream >>> > > > > .transformValues(ValueTransformerWithKeySupplier { >>> PairTransformer() >>> > > }, >>> > > > > "LastValueStore") >>> > > > > .filter { _, value: MetricSequence? -> value != null } >>> > > > > >>> > > > > // Create a store to store suppressed windows until a new one is >>> > > received >>> > > > > val suppressStoreSupplier = >>> > > > > >>> > > >>> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"), >>> > > > > ...... >>> > > > > >>> > > > > // Window and aggregate data in 1 minute intervals >>> > > > > val aggregatedStream = pairsStream >>> > > > > .groupByKey() >>> > > > > .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) >>> > > > > .aggregate( >>> > > > > { MetricSequenceList(ArrayList()) }, >>> > > > > { key, value, aggregate -> >>> > > > > aggregate.getRecords().add(value) >>> > > > > aggregate >>> > > > > }, >>> > > > > Materialized.`as`<String, MetricSequenceList, >>> > > WindowStore<Bytes, >>> > > > > >>> > > >>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) >>> > > > > ) >>> > > > > .toStream() >>> > > > > .flatTransform(TransformerSupplier { >>> > > > > // This transformer basically waits until a new window is >>> > > received >>> > > > > to emit the previous one >>> > > > > }, "suppress-store") >>> > > > > .map { sensorId: String, suppressedOutput: SuppressedOutput -> >>> > > > > .... etc .... >>> > > > > >>> > > > > >>> > > > > Basically: >>> > > > > - all data goes through LastValueStore store that stores each >>> message >>> > > and >>> > > > > emits a pair with the previous one >>> > > > > - the aggregate-store is used to store the per-window list of >>> > > messages in >>> > > > > the aggregate method >>> > > > > - the suppress store is used to store each received window >>> which is >>> > > > > emitted only after a newer one is received >>> > > > > >>> > > > > What I'm experiencing is that: >>> > > > > - during normal execution, the streams app sends to the >>> lastvalue >>> > > store >>> > > > > changelog topic 5k messages/min, the aggregate and suppress store >>> > > changelog >>> > > > > topics only about 100 >>> > > > > - at some point (after many hours of operation), the streams app >>> > > starts >>> > > > > sending to the aggregate and suppress store changelog topic the >>> same >>> > > amount >>> > > > > of messages going through the lastvaluestore >>> > > > > - if I restart the streams app it goes back to the initial >>> behavior >>> > > > > >>> > > > > You can see the behavior in this graph https://imgur.com/dJcUNSf >>> > > > > You can also see that after a restart everything goes back to >>> normal >>> > > > > levels. >>> > > > > Regarding other metrics, process latency increases, poll latency >>> > > > > decreases, poll rate decreases, commit rate stays the same while >>> commit >>> > > > > latency increases. >>> > > > > >>> > > > > Now, I've these questions: >>> > > > > - why isn't the aggregate/suppress store changelog topic >>> throughput >>> > > the >>> > > > > same as the LastValueStore? Shouldn't every time it aggregates >>> send a >>> > > > > record to the changelog? >>> > > > > - is the windowing doing some internal caching like not sending >>> every >>> > > > > aggregation record until the window time is passed? (if so, >>> where can I >>> > > > > find that code since I would like to use that also for our new >>> > > > > implementation) >>> > > > > >>> > > > > Thank you in advance >>> > > > > >>> > > > > -- >>> > > > > Alessandro Tagliapietra >>> > > > > >>> > > > > >>> > > > > On Wed, Dec 4, 2019 at 7:57 AM John Roesler <vvcep...@apache.org >>> > >>> > > wrote: >>> > > > > >>> > > > >> Oh, good! >>> > > > >> >>> > > > >> On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote: >>> > > > >> > Testing on staging shows that a restart on exception is much >>> faster >>> > > and >>> > > > >> the >>> > > > >> > stream starts right away which I think means we're reading >>> way less >>> > > data >>> > > > >> > than before! >>> > > > >> > >>> > > > >> > What I was referring to is that, in Streams, the keys for >>> window >>> > > > >> > > aggregation state is actually composed of both the window >>> itself >>> > > and >>> > > > >> the >>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results >>> in the >>> > > > >> store >>> > > > >> > > having a unique key per window for each K, which is why we >>> need >>> > > > >> retention >>> > > > >> > > as well as compaction for our changelogs. But for you, if >>> you just >>> > > > >> make the >>> > > > >> > > key "K", then compaction alone should do the trick. >>> > > > >> > >>> > > > >> > Yes we had compact,delete as cleanup policy but probably it >>> still >>> > > had a >>> > > > >> too >>> > > > >> > long retention value, also the rocksdb store is probably much >>> > > faster now >>> > > > >> > having only one key per key instead of one key per window per >>> key. >>> > > > >> > >>> > > > >> > Thanks a lot for helping! I'm now going to setup a >>> prometheus-jmx >>> > > > >> > monitoring so we can keep better track of what's going on :) >>> > > > >> > >>> > > > >> > -- >>> > > > >> > Alessandro Tagliapietra >>> > > > >> > >>> > > > >> > >>> > > > >> > On Tue, Dec 3, 2019 at 9:12 PM John Roesler < >>> vvcep...@apache.org> >>> > > > >> wrote: >>> > > > >> > >>> > > > >> > > Oh, yeah, I remember that conversation! >>> > > > >> > > >>> > > > >> > > Yes, then, I agree, if you're only storing state of the most >>> > > recent >>> > > > >> window >>> > > > >> > > for each key, and the key you use for that state is >>> actually the >>> > > key >>> > > > >> of the >>> > > > >> > > records, then an aggressive compaction policy plus your >>> custom >>> > > > >> transformer >>> > > > >> > > seems like a good way forward. >>> > > > >> > > >>> > > > >> > > What I was referring to is that, in Streams, the keys for >>> window >>> > > > >> > > aggregation state is actually composed of both the window >>> itself >>> > > and >>> > > > >> the >>> > > > >> > > key. In the DSL, it looks like "Windowed<K>". That results >>> in the >>> > > > >> store >>> > > > >> > > having a unique key per window for each K, which is why we >>> need >>> > > > >> retention >>> > > > >> > > as well as compaction for our changelogs. But for you, if >>> you just >>> > > > >> make the >>> > > > >> > > key "K", then compaction alone should do the trick. >>> > > > >> > > >>> > > > >> > > And yes, if you manage the topic yourself, then Streams >>> won't >>> > > adjust >>> > > > >> the >>> > > > >> > > retention time. I think it might validate that the retention >>> > > isn't too >>> > > > >> > > short, but I don't remember offhand. >>> > > > >> > > >>> > > > >> > > Cheers, and let me know how it goes! >>> > > > >> > > -John >>> > > > >> > > >>> > > > >> > > On Tue, Dec 3, 2019, at 23:03, Alessandro Tagliapietra >>> wrote: >>> > > > >> > > > Hi John, >>> > > > >> > > > >>> > > > >> > > > afaik grace period uses stream time >>> > > > >> > > > >>> > > > >> > > >>> > > > >> >>> > > >>> https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html >>> > > > >> > > > which is >>> > > > >> > > > per partition, unfortunately we process data that's not >>> in sync >>> > > > >> between >>> > > > >> > > > keys so each key needs to be independent and a key can >>> have much >>> > > > >> older >>> > > > >> > > > data >>> > > > >> > > > than the other. >>> > > > >> > > > >>> > > > >> > > > Having a small grace period would probably close old >>> windows >>> > > sooner >>> > > > >> than >>> > > > >> > > > expected. That's also why in my use case a custom store >>> that >>> > > just >>> > > > >> stores >>> > > > >> > > > the last window data for each key might work better. I >>> had the >>> > > same >>> > > > >> issue >>> > > > >> > > > with suppression and it has been reported here >>> > > > >> > > > https://issues.apache.org/jira/browse/KAFKA-8769 >>> > > > >> > > > Oh I just saw that you're the one that helped me on slack >>> and >>> > > > >> created the >>> > > > >> > > > issue (thanks again for that). >>> > > > >> > > > >>> > > > >> > > > The behavior that you mention about streams setting >>> changelog >>> > > > >> retention >>> > > > >> > > > time is something they do on creation of the topic when >>> the >>> > > broker >>> > > > >> has >>> > > > >> > > auto >>> > > > >> > > > creation enabled? Because we're using confluent cloud and >>> I had >>> > > to >>> > > > >> create >>> > > > >> > > > it manually. >>> > > > >> > > > Regarding the change in the recovery behavior, with >>> compact >>> > > cleanup >>> > > > >> > > policy >>> > > > >> > > > shouldn't the changelog only keep the last value? That >>> would >>> > > make >>> > > > >> the >>> > > > >> > > > recovery faster and "cheaper" as it would only need to >>> read a >>> > > single >>> > > > >> > > value >>> > > > >> > > > per key (if the cleanup just happened) right? >>> > > > >> > > > >>> > > > >> > > > -- >>> > > > >> > > > Alessandro Tagliapietra >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > On Tue, Dec 3, 2019 at 8:51 PM John Roesler < >>> > > vvcep...@apache.org> >>> > > > >> wrote: >>> > > > >> > > > >>> > > > >> > > > > Hey Alessandro, >>> > > > >> > > > > >>> > > > >> > > > > That sounds also like it would work. I'm wondering if >>> it would >>> > > > >> actually >>> > > > >> > > > > change what you observe w.r.t. recovery behavior, >>> though. >>> > > Streams >>> > > > >> > > already >>> > > > >> > > > > sets the retention time on the changelog to equal the >>> > > retention >>> > > > >> time >>> > > > >> > > of the >>> > > > >> > > > > windows, for windowed aggregations, so you shouldn't be >>> > > loading a >>> > > > >> lot >>> > > > >> > > of >>> > > > >> > > > > window data for old windows you no longer care about. >>> > > > >> > > > > >>> > > > >> > > > > Have you set the "grace period" on your window >>> definition? By >>> > > > >> default, >>> > > > >> > > it >>> > > > >> > > > > is set to 24 hours, but you can set it as low as you >>> like. >>> > > E.g., >>> > > > >> if you >>> > > > >> > > > > want to commit to having in-order data only, then you >>> can set >>> > > the >>> > > > >> grace >>> > > > >> > > > > period to zero. This _should_ let the broker clean up >>> the >>> > > > >> changelog >>> > > > >> > > records >>> > > > >> > > > > as soon as the window ends. >>> > > > >> > > > > >>> > > > >> > > > > Of course, the log cleaner doesn't run all the time, so >>> > > there's >>> > > > >> some >>> > > > >> > > extra >>> > > > >> > > > > delay in which "expired" data would still be visible in >>> the >>> > > > >> changelog, >>> > > > >> > > but >>> > > > >> > > > > it would actually be just the same as if you manage the >>> store >>> > > > >> yourself. >>> > > > >> > > > > >>> > > > >> > > > > Hope this helps! >>> > > > >> > > > > -John >>> > > > >> > > > > >>> > > > >> > > > > On Tue, Dec 3, 2019, at 22:22, Alessandro Tagliapietra >>> wrote: >>> > > > >> > > > > > Thanks John for the explanation, >>> > > > >> > > > > > >>> > > > >> > > > > > I thought that with EOS enabled (which we have) it >>> would in >>> > > the >>> > > > >> worst >>> > > > >> > > > > case >>> > > > >> > > > > > find a valid checkpoint and start the restore from >>> there >>> > > until >>> > > > >> it >>> > > > >> > > reached >>> > > > >> > > > > > the last committed status, not completely from >>> scratch. What >>> > > > >> you say >>> > > > >> > > > > > definitely makes sense now. >>> > > > >> > > > > > Since we don't really need old time windows and we >>> ensure >>> > > data >>> > > > >> is >>> > > > >> > > ordered >>> > > > >> > > > > > when processed I think I"ll just write a custom >>> transformer >>> > > to >>> > > > >> keep >>> > > > >> > > only >>> > > > >> > > > > > the last window, store intermediate aggregation >>> results in >>> > > the >>> > > > >> store >>> > > > >> > > and >>> > > > >> > > > > > emit a new value only when we receive data belonging >>> to a >>> > > new >>> > > > >> window. >>> > > > >> > > > > > That with a compact only changelog topic should keep >>> the >>> > > rebuild >>> > > > >> > > data to >>> > > > >> > > > > > the minimum as it would have only the last value for >>> each >>> > > key. >>> > > > >> > > > > > >>> > > > >> > > > > > Hope that makes sense >>> > > > >> > > > > > >>> > > > >> > > > > > Thanks again >>> > > > >> > > > > > >>> > > > >> > > > > > -- >>> > > > >> > > > > > Alessandro Tagliapietra >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > On Tue, Dec 3, 2019 at 3:04 PM John Roesler < >>> > > > >> vvcep...@apache.org> >>> > > > >> > > wrote: >>> > > > >> > > > > > >>> > > > >> > > > > > > Hi Alessandro, >>> > > > >> > > > > > > >>> > > > >> > > > > > > To take a stab at your question, maybe it first >>> doesn't >>> > > find >>> > > > >> it, >>> > > > >> > > but >>> > > > >> > > > > then >>> > > > >> > > > > > > restores some data, writes the checkpoint, and then >>> later >>> > > on, >>> > > > >> it >>> > > > >> > > has to >>> > > > >> > > > > > > re-initialize the task for some reason, and that's >>> why it >>> > > does >>> > > > >> > > find a >>> > > > >> > > > > > > checkpoint then? >>> > > > >> > > > > > > >>> > > > >> > > > > > > More to the heart of the issue, if you have EOS >>> enabled, >>> > > > >> Streams >>> > > > >> > > _only_ >>> > > > >> > > > > > > records the checkpoint when the store is in a >>> > > known-consistent >>> > > > >> > > state. >>> > > > >> > > > > For >>> > > > >> > > > > > > example, if you have a graceful shutdown, Streams >>> will >>> > > flush >>> > > > >> all >>> > > > >> > > the >>> > > > >> > > > > > > stores, commit all the transactions, and then write >>> the >>> > > > >> checkpoint >>> > > > >> > > > > file. >>> > > > >> > > > > > > Then, on re-start, it will pick up from that >>> checkpoint. >>> > > > >> > > > > > > >>> > > > >> > > > > > > But as soon as it starts processing records, it >>> removes >>> > > the >>> > > > >> > > checkpoint >>> > > > >> > > > > > > file, so if it crashes while it was processing, >>> there is >>> > > no >>> > > > >> > > checkpoint >>> > > > >> > > > > file >>> > > > >> > > > > > > there, and it will have to restore from the >>> beginning of >>> > > the >>> > > > >> > > changelog. >>> > > > >> > > > > > > >>> > > > >> > > > > > > This design is there on purpose, because otherwise >>> we >>> > > cannot >>> > > > >> > > actually >>> > > > >> > > > > > > guarantee correctness... For example, if you are >>> > > maintaining a >>> > > > >> > > count >>> > > > >> > > > > > > operation, and we process an input record i, >>> increment the >>> > > > >> count >>> > > > >> > > and >>> > > > >> > > > > write >>> > > > >> > > > > > > it to the state store, and to the changelog topic. >>> But we >>> > > > >> crash >>> > > > >> > > before >>> > > > >> > > > > we >>> > > > >> > > > > > > commit that transaction. Then, the write to the >>> changelog >>> > > > >> would be >>> > > > >> > > > > aborted, >>> > > > >> > > > > > > and we would re-process record i . However, we've >>> already >>> > > > >> updated >>> > > > >> > > the >>> > > > >> > > > > local >>> > > > >> > > > > > > state store, so when we increment it again, it >>> results in >>> > > > >> > > > > double-counting >>> > > > >> > > > > > > i. The key point here is that there's no way to do >>> an >>> > > atomic >>> > > > >> > > operation >>> > > > >> > > > > > > across two different systems (local state store and >>> the >>> > > > >> changelog >>> > > > >> > > > > topic). >>> > > > >> > > > > > > Since we can't guarantee that we roll back the >>> incremented >>> > > > >> count >>> > > > >> > > when >>> > > > >> > > > > the >>> > > > >> > > > > > > changelog transaction is aborted, we can't keep the >>> local >>> > > > >> store >>> > > > >> > > > > consistent >>> > > > >> > > > > > > with the changelog. >>> > > > >> > > > > > > >>> > > > >> > > > > > > After a crash, the only way to ensure the local >>> store is >>> > > > >> consistent >>> > > > >> > > > > with >>> > > > >> > > > > > > the changelog is to discard the entire thing and >>> rebuild >>> > > it. >>> > > > >> This >>> > > > >> > > is >>> > > > >> > > > > why we >>> > > > >> > > > > > > have an invariant that the checkpoint file only >>> exists >>> > > when we >>> > > > >> > > _know_ >>> > > > >> > > > > that >>> > > > >> > > > > > > the local store is consistent with the changelog, >>> and >>> > > this is >>> > > > >> why >>> > > > >> > > > > you're >>> > > > >> > > > > > > seeing so much bandwidth when re-starting from an >>> unclean >>> > > > >> shutdown. >>> > > > >> > > > > > > >>> > > > >> > > > > > > Note that it's definitely possible to do better >>> than this, >>> > > > >> and we >>> > > > >> > > would >>> > > > >> > > > > > > very much like to improve it in the future. >>> > > > >> > > > > > > >>> > > > >> > > > > > > Thanks, >>> > > > >> > > > > > > -John >>> > > > >> > > > > > > >>> > > > >> > > > > > > On Tue, Dec 3, 2019, at 16:16, Alessandro >>> Tagliapietra >>> > > wrote: >>> > > > >> > > > > > > > Hi John, >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > thanks a lot for helping, regarding your message: >>> > > > >> > > > > > > > - no we only have 1 instance of the stream >>> application, >>> > > > >> and it >>> > > > >> > > > > always >>> > > > >> > > > > > > > re-uses the same state folder >>> > > > >> > > > > > > > - yes we're seeing most issues when restarting >>> not >>> > > > >> gracefully >>> > > > >> > > due >>> > > > >> > > > > > > exception >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > I've enabled trace logging and filtering by a >>> single >>> > > state >>> > > > >> store >>> > > > >> > > the >>> > > > >> > > > > > > > StoreChangelogReader messages are: >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > Added restorer for changelog >>> > > > >> > > > > sensors-stream-aggregate-store-changelog-0 >>> > > > >> > > > > > > > Added restorer for changelog >>> > > > >> > > > > sensors-stream-aggregate-store-changelog-1 >>> > > > >> > > > > > > > Added restorer for changelog >>> > > > >> > > > > sensors-stream-aggregate-store-changelog-2 >>> > > > >> > > > > > > > Did not find checkpoint from changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for >>> store >>> > > > >> > > aggregate-store, >>> > > > >> > > > > > > > rewinding to beginning. >>> > > > >> > > > > > > > Did not find checkpoint from changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for >>> store >>> > > > >> > > aggregate-store, >>> > > > >> > > > > > > > rewinding to beginning. >>> > > > >> > > > > > > > Did not find checkpoint from changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for >>> store >>> > > > >> > > aggregate-store, >>> > > > >> > > > > > > > rewinding to beginning. >>> > > > >> > > > > > > > No checkpoint found for task 0_2 state store >>> > > aggregate-store >>> > > > >> > > > > changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 with >>> EOS >>> > > turned >>> > > > >> on. >>> > > > >> > > > > > > > Reinitializing the task and restore its state >>> from the >>> > > > >> beginning. >>> > > > >> > > > > > > > No checkpoint found for task 0_1 state store >>> > > aggregate-store >>> > > > >> > > > > changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 with >>> EOS >>> > > turned >>> > > > >> on. >>> > > > >> > > > > > > > Reinitializing the task and restore its state >>> from the >>> > > > >> beginning. >>> > > > >> > > > > > > > No checkpoint found for task 0_0 state store >>> > > aggregate-store >>> > > > >> > > > > changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 with >>> EOS >>> > > turned >>> > > > >> on. >>> > > > >> > > > > > > > Reinitializing the task and restore its state >>> from the >>> > > > >> beginning. >>> > > > >> > > > > > > > Found checkpoint 709937 from changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-2 for >>> store >>> > > > >> > > aggregate-store. >>> > > > >> > > > > > > > Restoring partition >>> > > > >> sensors-stream-aggregate-store-changelog-2 >>> > > > >> > > from >>> > > > >> > > > > > > offset >>> > > > >> > > > > > > > 709937 to endOffset 742799 >>> > > > >> > > > > > > > Found checkpoint 3024234 from changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-1 for >>> store >>> > > > >> > > aggregate-store. >>> > > > >> > > > > > > > Restoring partition >>> > > > >> sensors-stream-aggregate-store-changelog-1 >>> > > > >> > > from >>> > > > >> > > > > > > offset >>> > > > >> > > > > > > > 3024234 to endOffset 3131513 >>> > > > >> > > > > > > > Found checkpoint 14514072 from changelog >>> > > > >> > > > > > > > sensors-stream-aggregate-store-changelog-0 for >>> store >>> > > > >> > > aggregate-store. >>> > > > >> > > > > > > > Restoring partition >>> > > > >> sensors-stream-aggregate-store-changelog-0 >>> > > > >> > > from >>> > > > >> > > > > > > offset >>> > > > >> > > > > > > > 14514072 to endOffset 17116574 >>> > > > >> > > > > > > > Restored from >>> > > sensors-stream-aggregate-store-changelog-2 to >>> > > > >> > > > > > > aggregate-store >>> > > > >> > > > > > > > with 966 records, ending offset is 711432, next >>> starting >>> > > > >> > > position is >>> > > > >> > > > > > > 711434 >>> > > > >> > > > > > > > Restored from >>> > > sensors-stream-aggregate-store-changelog-2 to >>> > > > >> > > > > > > aggregate-store >>> > > > >> > > > > > > > with 914 records, ending offset is 712711, next >>> starting >>> > > > >> > > position is >>> > > > >> > > > > > > 712713 >>> > > > >> > > > > > > > Restored from >>> > > sensors-stream-aggregate-store-changelog-1 to >>> > > > >> > > > > > > aggregate-store >>> > > > >> > > > > > > > with 18 records, ending offset is 3024261, next >>> starting >>> > > > >> > > position is >>> > > > >> > > > > > > 3024262 >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > why it first says it didn't find the checkpoint >>> and >>> > > then it >>> > > > >> does >>> > > > >> > > > > find it? >>> > > > >> > > > > > > > It seems it loaded about 2.7M records (sum of >>> offset >>> > > > >> difference >>> > > > >> > > in >>> > > > >> > > > > the >>> > > > >> > > > > > > > "restorting partition ...." messages) right? >>> > > > >> > > > > > > > Maybe should I try to reduce the checkpoint >>> interval? >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > Regards >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > -- >>> > > > >> > > > > > > > Alessandro Tagliapietra >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > On Mon, Dec 2, 2019 at 9:18 AM John Roesler < >>> > > > >> vvcep...@apache.org >>> > > > >> > > > >>> > > > >> > > > > wrote: >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > > Hi Alessandro, >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > I'm sorry to hear that. >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > The restore process only takes one factor into >>> > > account: >>> > > > >> the >>> > > > >> > > current >>> > > > >> > > > > > > offset >>> > > > >> > > > > > > > > position of the changelog topic is stored in a >>> local >>> > > file >>> > > > >> > > > > alongside the >>> > > > >> > > > > > > > > state stores. On startup, the app checks if the >>> > > recorded >>> > > > >> > > position >>> > > > >> > > > > lags >>> > > > >> > > > > > > the >>> > > > >> > > > > > > > > latest offset in the changelog. If so, then it >>> reads >>> > > the >>> > > > >> > > missing >>> > > > >> > > > > > > changelog >>> > > > >> > > > > > > > > records before starting processing. >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > Thus, it would not restore any old window data. >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > There might be a few different things going on >>> to >>> > > explain >>> > > > >> your >>> > > > >> > > > > > > observation: >>> > > > >> > > > > > > > > * if there is more than one instance in your >>> Streams >>> > > > >> cluster, >>> > > > >> > > > > maybe the >>> > > > >> > > > > > > > > task is "flopping" between instances, so each >>> instance >>> > > > >> still >>> > > > >> > > has to >>> > > > >> > > > > > > recover >>> > > > >> > > > > > > > > state, since it wasn't the last one actively >>> > > processing >>> > > > >> it. >>> > > > >> > > > > > > > > * if the application isn't stopped gracefully, >>> it >>> > > might >>> > > > >> not >>> > > > >> > > get a >>> > > > >> > > > > > > chance >>> > > > >> > > > > > > > > to record its offset in that local file, so on >>> > > restart it >>> > > > >> has >>> > > > >> > > to >>> > > > >> > > > > > > restore >>> > > > >> > > > > > > > > some or all of the state store from changelog. >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > Or it could be something else; that's just what >>> comes >>> > > to >>> > > > >> mind. >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > If you want to get to the bottom of it, you can >>> take a >>> > > > >> look at >>> > > > >> > > the >>> > > > >> > > > > > > logs, >>> > > > >> > > > > > > > > paying close attention to which tasks are >>> assigned to >>> > > > >> which >>> > > > >> > > > > instances >>> > > > >> > > > > > > after >>> > > > >> > > > > > > > > each restart. You can also look into the logs >>> from >>> > > > >> > > > > > > > > >>> > > > >> > > >>> > > `org.apache.kafka.streams.processor.internals.StoreChangelogReader` >>> > > > >> > > > > > > (might >>> > > > >> > > > > > > > > want to set it to DEBUG or TRACE level to >>> really see >>> > > > >> what's >>> > > > >> > > > > happening). >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > I hope this helps! >>> > > > >> > > > > > > > > -John >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > On Sun, Dec 1, 2019, at 21:25, Alessandro >>> Tagliapietra >>> > > > >> wrote: >>> > > > >> > > > > > > > > > Hello everyone, >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > we're having a problem with bandwidth usage on >>> > > streams >>> > > > >> > > > > application >>> > > > >> > > > > > > > > startup, >>> > > > >> > > > > > > > > > our current setup does this: >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > ... >>> > > > >> > > > > > > > > > .groupByKey() >>> > > > >> > > > > > > > > > >>> > > > >> > > >>> .windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1))) >>> > > > >> > > > > > > > > > .aggregate( >>> > > > >> > > > > > > > > > { MetricSequenceList(ArrayList()) }, >>> > > > >> > > > > > > > > > { key, value, aggregate -> >>> > > > >> > > > > > > > > > aggregate.getRecords().add(value) >>> > > > >> > > > > > > > > > aggregate >>> > > > >> > > > > > > > > > }, >>> > > > >> > > > > > > > > > Materialized.`as`<String, >>> > > MetricSequenceList, >>> > > > >> > > > > > > WindowStore<Bytes, >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > >>> > > > >> > > >>> > > > >> >>> > > >>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde()) >>> > > > >> > > > > > > > > > ) >>> > > > >> > > > > > > > > > .toStream() >>> > > > >> > > > > > > > > > .flatTransform(TransformerSupplier { >>> > > > >> > > > > > > > > > ... >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > basically in each window we append the new >>> values >>> > > and >>> > > > >> then do >>> > > > >> > > > > some >>> > > > >> > > > > > > other >>> > > > >> > > > > > > > > > logic with the array of windowed values. >>> > > > >> > > > > > > > > > The aggregate-store changelog topic >>> configuration >>> > > uses >>> > > > >> > > > > > > compact,delete as >>> > > > >> > > > > > > > > > cleanup policy and has 12 hours of retention. >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > What we've seen is that on application >>> startup it >>> > > takes >>> > > > >> a >>> > > > >> > > couple >>> > > > >> > > > > > > minutes >>> > > > >> > > > > > > > > to >>> > > > >> > > > > > > > > > rebuild the state store, even if the state >>> store >>> > > > >> directory is >>> > > > >> > > > > > > persisted >>> > > > >> > > > > > > > > > across restarts. That along with an exception >>> that >>> > > > >> caused the >>> > > > >> > > > > docker >>> > > > >> > > > > > > > > > container to be restarted a couple hundreds >>> times >>> > > > >> caused a >>> > > > >> > > big >>> > > > >> > > > > > > confluent >>> > > > >> > > > > > > > > > cloud bill compared to what we usually spend >>> (1/4 >>> > > of a >>> > > > >> full >>> > > > >> > > > > month in >>> > > > >> > > > > > > 1 >>> > > > >> > > > > > > > > day). >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > What I think is happening is that the topic is >>> > > keeping >>> > > > >> all >>> > > > >> > > the >>> > > > >> > > > > > > previous >>> > > > >> > > > > > > > > > windows even with the compacting policy >>> because each >>> > > > >> key is >>> > > > >> > > the >>> > > > >> > > > > > > original >>> > > > >> > > > > > > > > > key + the timestamp not just the key. Since >>> we don't >>> > > > >> care >>> > > > >> > > about >>> > > > >> > > > > > > previous >>> > > > >> > > > > > > > > > windows as the flatTransform after the >>> toStream() >>> > > makes >>> > > > >> sure >>> > > > >> > > > > that we >>> > > > >> > > > > > > > > don't >>> > > > >> > > > > > > > > > process old windows (a custom suppressor >>> basically) >>> > > is >>> > > > >> there >>> > > > >> > > a >>> > > > >> > > > > way to >>> > > > >> > > > > > > > > only >>> > > > >> > > > > > > > > > keep the last window so that the store >>> rebuilding >>> > > goes >>> > > > >> > > faster and >>> > > > >> > > > > > > without >>> > > > >> > > > > > > > > > rebuilding old windows too? Or should I >>> create a >>> > > custom >>> > > > >> > > window >>> > > > >> > > > > using >>> > > > >> > > > > > > the >>> > > > >> > > > > > > > > > original key as key so that the compaction >>> keeps >>> > > only >>> > > > >> the >>> > > > >> > > last >>> > > > >> > > > > window >>> > > > >> > > > > > > > > data? >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > Thank you >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > -- >>> > > > >> > > > > > > > > > Alessandro Tagliapietra >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > > >>> > > > >>> > > >>> > >>> >>