Hi Boyang, Hi Alex,

thank you for your reply. I can't use windowing so currently I'm managing
removals by wrapping messages in a delete-aware wrapper whenever I have to
do aggregation but this has a big impact on all the logic.

For me the ideal situation would be to get a handle on the state stores
that are being used during aggregation and other processors of the streams
DSL and programmatically delete them from the store whenever needed. This
way I can keep the changes to my streaming logic minimal and still delete
parts of it whenever needed.

Is there any way to do that? I know I can get a read-only reference to the
state stores using queryable stores but that won't do.

Jan

On Thu, Jan 2, 2020 at 11:17 PM Alex Brekken <brek...@gmail.com> wrote:

> Hi Jan, unfortunately there is no easy or automatic way to do this.
> Publishing null values directly to the changelog topics will remove them
> from the topic, but it won't remove the corresponding row from the RocksDB
> state store.  (though deleting data programmatically from a state-store
> WILL also remove it from the changelog topic)  Given that you want to
> completely remove the data for a given set of keys, your best option might
> be to modify your topology to handle null messages so that they can get
> deleted from your aggregations. (and publish those from an outside app)
> Hopefully this isn't too self-serving, but I actually wrote a blog post
> about managing state-store data not long ago:
>
> https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/
> .
> Hopefully that might give you some ideas.
>
> Alex
>
> On Thu, Jan 2, 2020 at 4:11 PM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
>
> > Hey Jan,
> >
> > although I believe your case is much more complicated, but would time
> based
> > retention work for you at all? If yes, time window store is like the best
> > option.
> >
> > If no, streams has no out-of-box solution for invalidating the
> aggregation
> > record. It seems at least we could provide an API to inject
> > tombstone records for aggregation logic
> > so that they don't get ignored eventually. This sounds like a good future
> > work.
> >
> > Boyang
> >
> > On Thu, Jan 2, 2020 at 1:47 PM Jan Bols <janb...@telenet.be> wrote:
> >
> > > Hi,
> > > I have a rather complicated kafka streams application involving
> multiple
> > > joins, aggregates, maps etc. At a certain point, parts of the data
> needs
> > to
> > > be removed throughout the entire streams topology, both in the topics,
> > the
> > > changelogs and the rocksdb state stores.
> > >
> > > Managing this requires a lot of effort and things get very complex.
> F.e.
> > > when a KStream has a null value and is aggregated, you first need to
> > > convert it into some optional value instead b/c aggregates ignore
> nulls.
> > >
> > > Is there a better way or a way that does not impact all the existing
> > > streaming logic?
> > >
> > > I was thinking about having an out-of-bound process that sends null
> > values
> > > to all topics with the correct keys. I could then filter out all null
> > > values before doing the rest of the existing stream logic.
> > > Would that make sense?
> > >
> > > I can send null values to all my topics, but how do I get the changelog
> > > topics created by kafka-streams. And what about the state store?
> > >
> > > Best regards
> > > Jan
> > >
> >
>

Reply via email to