Hi Boyang, Hi Alex, thank you for your reply. I can't use windowing so currently I'm managing removals by wrapping messages in a delete-aware wrapper whenever I have to do aggregation but this has a big impact on all the logic.
For me the ideal situation would be to get a handle on the state stores that are being used during aggregation and other processors of the streams DSL and programmatically delete them from the store whenever needed. This way I can keep the changes to my streaming logic minimal and still delete parts of it whenever needed. Is there any way to do that? I know I can get a read-only reference to the state stores using queryable stores but that won't do. Jan On Thu, Jan 2, 2020 at 11:17 PM Alex Brekken <brek...@gmail.com> wrote: > Hi Jan, unfortunately there is no easy or automatic way to do this. > Publishing null values directly to the changelog topics will remove them > from the topic, but it won't remove the corresponding row from the RocksDB > state store. (though deleting data programmatically from a state-store > WILL also remove it from the changelog topic) Given that you want to > completely remove the data for a given set of keys, your best option might > be to modify your topology to handle null messages so that they can get > deleted from your aggregations. (and publish those from an outside app) > Hopefully this isn't too self-serving, but I actually wrote a blog post > about managing state-store data not long ago: > > https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/ > . > Hopefully that might give you some ideas. > > Alex > > On Thu, Jan 2, 2020 at 4:11 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Hey Jan, > > > > although I believe your case is much more complicated, but would time > based > > retention work for you at all? If yes, time window store is like the best > > option. > > > > If no, streams has no out-of-box solution for invalidating the > aggregation > > record. It seems at least we could provide an API to inject > > tombstone records for aggregation logic > > so that they don't get ignored eventually. This sounds like a good future > > work. > > > > Boyang > > > > On Thu, Jan 2, 2020 at 1:47 PM Jan Bols <janb...@telenet.be> wrote: > > > > > Hi, > > > I have a rather complicated kafka streams application involving > multiple > > > joins, aggregates, maps etc. At a certain point, parts of the data > needs > > to > > > be removed throughout the entire streams topology, both in the topics, > > the > > > changelogs and the rocksdb state stores. > > > > > > Managing this requires a lot of effort and things get very complex. > F.e. > > > when a KStream has a null value and is aggregated, you first need to > > > convert it into some optional value instead b/c aggregates ignore > nulls. > > > > > > Is there a better way or a way that does not impact all the existing > > > streaming logic? > > > > > > I was thinking about having an out-of-bound process that sends null > > values > > > to all topics with the correct keys. I could then filter out all null > > > values before doing the rest of the existing stream logic. > > > Would that make sense? > > > > > > I can send null values to all my topics, but how do I get the changelog > > > topics created by kafka-streams. And what about the state store? > > > > > > Best regards > > > Jan > > > > > >