Dmitry, I think your use case is similar to the one I described in the link below (discussion in the kafka-dev mailing list): http://search-hadoop.com/m/uyzND1rVOQ12OJ84U&subj=Re+Streams+TTLCacheStore
Could you take a quick look? -Michael On Wed, Feb 22, 2017 at 12:39 AM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > Hi Eno, > > Thank you. I don't think I'm advanced enough to imagine a good API. But I > can elaborate my use-cases further. > > So say I have two tables: > > KTable<String,String> left = topology.table(stringSerde, > stringSerde, topicLeft, topicLeft); > KTable<String,String> right = topology.table(stringSerde, > stringSerde, topicRight, topicRight); > > left > .leftJoin(right, (l, r) -> asList(l, r)) > .to(topicView) > > > - I'd like to filter duplicates out of the change stream. I only want > topicView to receive proper updates. > > - I'd like to be able to detect change type easy: > > - oldValue == null and newValue != null => create > - oldValue != null and newValue == null => delete > - oldValue != null and newValue != null => update > > - I'd like to be able to update indices when records are deleted or > updated. Old values are needed to determine which index keys which > should > be updated or removed. > > > I can do all these things now, mostly with groupBy()/reduce(), > groupBy/aggregate() and transform(). > > > Best, > Dmitry > > On Tue, Feb 21, 2017 at 5:21 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > Hi Dmitry, > > > > Could you tell us more on the exact API you'd like? Perhaps if others > find > > it useful too we/you can do a KIP. > > > > Thanks > > Eno > > > > > On 21 Feb 2017, at 22:01, Dmitry Minkovsky <dminkov...@gmail.com> > wrote: > > > > > > At KAFKA-2984: ktable sends old values when required > > > <https://github.com/apache/kafka/pull/672#issue-122101101>, @ymatsuda > > > writes: > > > > > >> NOTE: This is meant to be used by aggregation. But, if there is a use > > > case like a SQL database trigger, we can add a new KTable method to > > expose > > > this. > > > > > > Looking through the source it does not seem that this API was ever > > exposed. > > > Not finding anything on Google on this subject either. The SQL database > > > trigger is my exact use case. Enabling change-streaming for some tables > > > would help simplify my code. Is this possible? Is this scheduled for a > > > future version? > > > > > > Thank you, > > > Dmitry > > > > > -- *Michael G. Noll* Product Manager | Confluent +1 650 453 5860 | @miguno <https://twitter.com/miguno> Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog <http://www.confluent.io/blog>