Hi Eno, Thank you. I don't think I'm advanced enough to imagine a good API. But I can elaborate my use-cases further.
So say I have two tables: KTable<String,String> left = topology.table(stringSerde, stringSerde, topicLeft, topicLeft); KTable<String,String> right = topology.table(stringSerde, stringSerde, topicRight, topicRight); left .leftJoin(right, (l, r) -> asList(l, r)) .to(topicView) - I'd like to filter duplicates out of the change stream. I only want topicView to receive proper updates. - I'd like to be able to detect change type easy: - oldValue == null and newValue != null => create - oldValue != null and newValue == null => delete - oldValue != null and newValue != null => update - I'd like to be able to update indices when records are deleted or updated. Old values are needed to determine which index keys which should be updated or removed. I can do all these things now, mostly with groupBy()/reduce(), groupBy/aggregate() and transform(). Best, Dmitry On Tue, Feb 21, 2017 at 5:21 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Dmitry, > > Could you tell us more on the exact API you'd like? Perhaps if others find > it useful too we/you can do a KIP. > > Thanks > Eno > > > On 21 Feb 2017, at 22:01, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > > > > At KAFKA-2984: ktable sends old values when required > > <https://github.com/apache/kafka/pull/672#issue-122101101>, @ymatsuda > > writes: > > > >> NOTE: This is meant to be used by aggregation. But, if there is a use > > case like a SQL database trigger, we can add a new KTable method to > expose > > this. > > > > Looking through the source it does not seem that this API was ever > exposed. > > Not finding anything on Google on this subject either. The SQL database > > trigger is my exact use case. Enabling change-streaming for some tables > > would help simplify my code. Is this possible? Is this scheduled for a > > future version? > > > > Thank you, > > Dmitry > >