Hi Michael,

Thanks for reviewing and for the link.

As I understand your post, especially

> My question is: how much of the TTL idea is about (1) a more granular,
per-key expiration of state than what we currently provide, vs. (2)
decision-making for whether or not another downstream update should be sent
to a specific consumer ("we only wish to generate such an update if we last
informed the client about the property during some time span").

I am finding that stream/stream and table.toStream()/stream windowed joins
provide this functionality. I am really loving it. It took me a few days to
notice that stream/stream joins reflexively iterate over ALL the keys that
are available for the join window, which was a great discovery for me.

Generally, I am attempting to do the inside-out database thing so I have
code that looks like this: https://gist.github.com/
dminkovsky/fb249c59a2446bf18f9a7b9a24ef7f50. All mutations are written to
changelog topics of flat-normalized entities. These changelogs are joined
by the topology into deep nested views, which are queryable via interactive
queries. Doing mutations like this means that if a client wants to know
when a mutation is complete, that client needs to subscribe to a certain
view-state (that's where the TTL window joins come in).

Keeping incomplete or duplicate view-state from query and
mutation-completion clients has been challenging. Some table join
operations result in the same view-state as before. Some join operations
result in incomplete view state which I don't want exposed, such as when a
view is the result of several denormalizations. I solve this mostly by
having denormalized views built "bottom-up": nested entity denormalizations
are built and joined into parent entities up to the root of the view, so
exposed incomplete view state can only happen at the root view level when
left joining. But I have encountered such situations and it's been a bit
tricky for me. FWIW I am still discovering techniques and learning things
about the library every day, so a lot of these struggles are related to my
experience and understanding levels.

But yeah, in building this system I've found it necessary or desirable at
times to have access to new and old state, as in the gist example above,
and as mentioned in the use cases in my previous post.



On Wed, Feb 22, 2017 at 3:05 AM, Michael Noll <mich...@confluent.io> wrote:

> Dmitry,
>
> I think your use case is similar to the one I described in the link below
> (discussion in the kafka-dev mailing list):
> http://search-hadoop.com/m/uyzND1rVOQ12OJ84U&subj=Re+Streams+TTLCacheStore
>
> Could you take a quick look?
>
> -Michael
>
>
>
>
> On Wed, Feb 22, 2017 at 12:39 AM, Dmitry Minkovsky <dminkov...@gmail.com>
> wrote:
>
> > Hi Eno,
> >
> > Thank you. I don't think I'm advanced enough to imagine a good API. But I
> > can elaborate my use-cases further.
> >
> > So say I have two tables:
> >
> >         KTable<String,String> left = topology.table(stringSerde,
> > stringSerde, topicLeft, topicLeft);
> >         KTable<String,String> right = topology.table(stringSerde,
> > stringSerde, topicRight, topicRight);
> >
> >         left
> >           .leftJoin(right, (l, r) -> asList(l, r))
> >           .to(topicView)
> >
> >
> >    - I'd like to filter duplicates out of the change stream. I only want
> >    topicView to receive proper updates.
> >
> >    - I'd like to be able to detect change type easy:
> >
> >    - oldValue == null and newValue != null => create
> >       - oldValue != null and newValue == null => delete
> >       - oldValue != null and newValue != null => update
> >
> >       - I'd like to be able to update indices when records are deleted or
> >    updated. Old values are needed to determine which index keys which
> > should
> >    be updated or removed.
> >
> >
> > I can do all these things now, mostly with groupBy()/reduce(),
> > groupBy/aggregate() and transform().
> >
> >
> > Best,
> > Dmitry
> >
> > On Tue, Feb 21, 2017 at 5:21 PM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> > > Hi Dmitry,
> > >
> > > Could you tell us more on the exact API you'd like? Perhaps if others
> > find
> > > it useful too we/you can do a KIP.
> > >
> > > Thanks
> > > Eno
> > >
> > > > On 21 Feb 2017, at 22:01, Dmitry Minkovsky <dminkov...@gmail.com>
> > wrote:
> > > >
> > > > At KAFKA-2984: ktable sends old values when required
> > > > <https://github.com/apache/kafka/pull/672#issue-122101101>,
> @ymatsuda
> > > > writes:
> > > >
> > > >> NOTE: This is meant to be used by aggregation. But, if there is a
> use
> > > > case like a SQL database trigger, we can add a new KTable method to
> > > expose
> > > > this.
> > > >
> > > > Looking through the source it does not seem that this API was ever
> > > exposed.
> > > > Not finding anything on Google on this subject either. The SQL
> database
> > > > trigger is my exact use case. Enabling change-streaming for some
> tables
> > > > would help simplify my code. Is this possible? Is this scheduled for
> a
> > > > future version?
> > > >
> > > > Thank you,
> > > > Dmitry
> > >
> > >
> >
>
>
>
> --
> *Michael G. Noll*
> Product Manager | Confluent
> +1 650 453 5860 | @miguno <https://twitter.com/miguno>
> Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> <http://www.confluent.io/blog>
>

Reply via email to