Hello everyone,

we're having a problem with bandwidth usage on streams application startup,
our current setup does this:

...
.groupByKey()
.windowedBy<TimeWindow>(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate(
        { MetricSequenceList(ArrayList()) },
        { key, value, aggregate ->
            aggregate.getRecords().add(value)
            aggregate
        },
        Materialized.`as`<String, MetricSequenceList, WindowStore<Bytes,
ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
)
.toStream()
.flatTransform(TransformerSupplier {
...

basically in each window we append the new values and then do some other
logic with the array of windowed values.
The aggregate-store changelog topic configuration  uses compact,delete as
cleanup policy and has 12 hours of retention.

What we've seen is that on application startup it takes a couple minutes to
rebuild the state store, even if the state store directory is persisted
across restarts. That along with an exception that caused the docker
container to be restarted a couple hundreds times caused a big confluent
cloud bill compared to what we usually spend (1/4 of a full month in 1 day).

What I think is happening is that the topic is keeping all the previous
windows even with the compacting policy because each key is the original
key + the timestamp not just the key. Since we don't care about previous
windows as the flatTransform after the toStream() makes sure that we don't
process old windows (a custom suppressor basically) is there a way to only
keep the last window so that the store rebuilding goes faster and without
rebuilding old windows too? Or should I create a custom window using the
original key as key so that the compaction keeps only the last window data?

Thank you

--
Alessandro Tagliapietra

Reply via email to