Hi Mathieu,

You are correct in that the de-duping only occurs within the commit
interval.

I can understand and appreciate the use-case you have. So I think the right
approach for this is to create a KIP with your suggested changes and put it
to the community. Are you happy to do that?

Thanks,
Damian

On Sun, 4 Dec 2016 at 15:20 Mathieu Fenniak <mathieu.fenn...@replicon.com>
wrote:

> Hi Eno,
>
> I'm not sure.  My understanding is that the cache would prevent two
> immediate updates for the same key from being forwarded, but that only
> applies when records arrive within commit.interval.ms of each other.  Is
> that understanding correct?
>
> filterRedundant compares the newValue & oldValue in a Change to work
> regardless of the time between records.
>
> https://github.com/apache/kafka/compare/trunk...mfenniak:filter-redundant
>
>
> The use-case that is currently kicking me is a piece of source data that
> contains multiple unrelated configuration fields in a single record; it's
> not a great design, but it's the data I have to work with.  I'm plucking
> out only a single relevant field with mapValues, but changes to the other
> fields within the record are causing excessive, expensive recomputations
> that are redundant.
>
> Mathieu
>
>
> On Sun, Dec 4, 2016 at 4:34 AM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
> > Hi Mathieu,
> >
> > Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do
> some
> > of this for you, in that it dedups records with the same key and prevents
> > them from being forwarded downstream?
> >
> > Eno
> > > On 4 Dec 2016, at 04:13, Mathieu Fenniak <mathieu.fenn...@replicon.com
> >
> > wrote:
> > >
> > > Hey all,
> > >
> > > I'd like to contribute a new KTable API that would allow for the
> > > suppression of redundant KTable forwards, and I'd like to solicit
> > feedback
> > > before I put together a patch.
> > >
> > > A typical use-case of this API would be that you're using mapValues to
> > > pluck a subset of data out of a topic, but you'd like changes to the
> > record
> > > value that don't modify the output of mapValues to not cause output
> that
> > > trigger expensive and redundant recalculations.
> > >
> > > For example, topic "user" contains key:1, value:{"firstName": "Jack",
> > > "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> > > user.get("lastName"))  will create a KTable that would forward updates
> > from
> > > the user topic even if lastName never changed.
> > >
> > > My proposed API would be to add a filterRedundant method to KTable; one
> > > override takes a Comparator<V> to provide a custom comparison for
> > > evaluating whether a change is redundant, and one parameterless
> override
> > > would use a comparator backed by the object's equals() method.
> > >
> > >    /**
> > >     * Creates a new instance of {@link KTable} that filters out
> redundant
> > > updates and prevents "non-updates" from
> > >     * propagating to further operations on the returned table.  A
> > > redundant update onewhere the same value is provided
> > >     * more than once for a given key.  Object.equals is used to compare
> > > whether a subsequent update has the same value.
> > >
> > >     * @return a {@link KTable} that contains the same values as this
> > > table, but suppresses redundant updates
> > >     */
> > >    KTable<K, V> filterRedundant();
> > >
> > >    /**
> > >     * Creates a new instance of {@link KTable} that filters out
> redundant
> > > updates and prevents "non-updates" from
> > >     * propagating to further operations on the returned table.  A
> > > redundant update onewhere the same value is provided
> > >     * more than once for a given key.  A user-provided comparator is
> used
> > > to compare whether a subsequent update has
> > >     * the same value.
> > >
> > >     * @return a {@link KTable} that contains the same values as this
> > > table, but suppresses redundant updates
> > >     */
> > >    KTable<K, V> filterRedundant(Comparator<V> comparator);
> >
> >
>

Reply via email to