Hi Michael, Thanks for reviewing and for the link.
As I understand your post, especially > My question is: how much of the TTL idea is about (1) a more granular, per-key expiration of state than what we currently provide, vs. (2) decision-making for whether or not another downstream update should be sent to a specific consumer ("we only wish to generate such an update if we last informed the client about the property during some time span"). I am finding that stream/stream and table.toStream()/stream windowed joins provide this functionality. I am really loving it. It took me a few days to notice that stream/stream joins reflexively iterate over ALL the keys that are available for the join window, which was a great discovery for me. Generally, I am attempting to do the inside-out database thing so I have code that looks like this: https://gist.github.com/ dminkovsky/fb249c59a2446bf18f9a7b9a24ef7f50. All mutations are written to changelog topics of flat-normalized entities. These changelogs are joined by the topology into deep nested views, which are queryable via interactive queries. Doing mutations like this means that if a client wants to know when a mutation is complete, that client needs to subscribe to a certain view-state (that's where the TTL window joins come in). Keeping incomplete or duplicate view-state from query and mutation-completion clients has been challenging. Some table join operations result in the same view-state as before. Some join operations result in incomplete view state which I don't want exposed, such as when a view is the result of several denormalizations. I solve this mostly by having denormalized views built "bottom-up": nested entity denormalizations are built and joined into parent entities up to the root of the view, so exposed incomplete view state can only happen at the root view level when left joining. But I have encountered such situations and it's been a bit tricky for me. FWIW I am still discovering techniques and learning things about the library every day, so a lot of these struggles are related to my experience and understanding levels. But yeah, in building this system I've found it necessary or desirable at times to have access to new and old state, as in the gist example above, and as mentioned in the use cases in my previous post. On Wed, Feb 22, 2017 at 3:05 AM, Michael Noll <mich...@confluent.io> wrote: > Dmitry, > > I think your use case is similar to the one I described in the link below > (discussion in the kafka-dev mailing list): > http://search-hadoop.com/m/uyzND1rVOQ12OJ84U&subj=Re+Streams+TTLCacheStore > > Could you take a quick look? > > -Michael > > > > > On Wed, Feb 22, 2017 at 12:39 AM, Dmitry Minkovsky <dminkov...@gmail.com> > wrote: > > > Hi Eno, > > > > Thank you. I don't think I'm advanced enough to imagine a good API. But I > > can elaborate my use-cases further. > > > > So say I have two tables: > > > > KTable<String,String> left = topology.table(stringSerde, > > stringSerde, topicLeft, topicLeft); > > KTable<String,String> right = topology.table(stringSerde, > > stringSerde, topicRight, topicRight); > > > > left > > .leftJoin(right, (l, r) -> asList(l, r)) > > .to(topicView) > > > > > > - I'd like to filter duplicates out of the change stream. I only want > > topicView to receive proper updates. > > > > - I'd like to be able to detect change type easy: > > > > - oldValue == null and newValue != null => create > > - oldValue != null and newValue == null => delete > > - oldValue != null and newValue != null => update > > > > - I'd like to be able to update indices when records are deleted or > > updated. Old values are needed to determine which index keys which > > should > > be updated or removed. > > > > > > I can do all these things now, mostly with groupBy()/reduce(), > > groupBy/aggregate() and transform(). > > > > > > Best, > > Dmitry > > > > On Tue, Feb 21, 2017 at 5:21 PM, Eno Thereska <eno.there...@gmail.com> > > wrote: > > > > > Hi Dmitry, > > > > > > Could you tell us more on the exact API you'd like? Perhaps if others > > find > > > it useful too we/you can do a KIP. > > > > > > Thanks > > > Eno > > > > > > > On 21 Feb 2017, at 22:01, Dmitry Minkovsky <dminkov...@gmail.com> > > wrote: > > > > > > > > At KAFKA-2984: ktable sends old values when required > > > > <https://github.com/apache/kafka/pull/672#issue-122101101>, > @ymatsuda > > > > writes: > > > > > > > >> NOTE: This is meant to be used by aggregation. But, if there is a > use > > > > case like a SQL database trigger, we can add a new KTable method to > > > expose > > > > this. > > > > > > > > Looking through the source it does not seem that this API was ever > > > exposed. > > > > Not finding anything on Google on this subject either. The SQL > database > > > > trigger is my exact use case. Enabling change-streaming for some > tables > > > > would help simplify my code. Is this possible? Is this scheduled for > a > > > > future version? > > > > > > > > Thank you, > > > > Dmitry > > > > > > > > > > > > -- > *Michael G. Noll* > Product Manager | Confluent > +1 650 453 5860 | @miguno <https://twitter.com/miguno> > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog > <http://www.confluent.io/blog> >