Hi Mathieu, You are correct in that the de-duping only occurs within the commit interval.
I can understand and appreciate the use-case you have. So I think the right approach for this is to create a KIP with your suggested changes and put it to the community. Are you happy to do that? Thanks, Damian On Sun, 4 Dec 2016 at 15:20 Mathieu Fenniak <mathieu.fenn...@replicon.com> wrote: > Hi Eno, > > I'm not sure. My understanding is that the cache would prevent two > immediate updates for the same key from being forwarded, but that only > applies when records arrive within commit.interval.ms of each other. Is > that understanding correct? > > filterRedundant compares the newValue & oldValue in a Change to work > regardless of the time between records. > > https://github.com/apache/kafka/compare/trunk...mfenniak:filter-redundant > > > The use-case that is currently kicking me is a piece of source data that > contains multiple unrelated configuration fields in a single record; it's > not a great design, but it's the data I have to work with. I'm plucking > out only a single relevant field with mapValues, but changes to the other > fields within the record are causing excessive, expensive recomputations > that are redundant. > > Mathieu > > > On Sun, Dec 4, 2016 at 4:34 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > Hi Mathieu, > > > > Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do > some > > of this for you, in that it dedups records with the same key and prevents > > them from being forwarded downstream? > > > > Eno > > > On 4 Dec 2016, at 04:13, Mathieu Fenniak <mathieu.fenn...@replicon.com > > > > wrote: > > > > > > Hey all, > > > > > > I'd like to contribute a new KTable API that would allow for the > > > suppression of redundant KTable forwards, and I'd like to solicit > > feedback > > > before I put together a patch. > > > > > > A typical use-case of this API would be that you're using mapValues to > > > pluck a subset of data out of a topic, but you'd like changes to the > > record > > > value that don't modify the output of mapValues to not cause output > that > > > trigger expensive and redundant recalculations. > > > > > > For example, topic "user" contains key:1, value:{"firstName": "Jack", > > > "lastName": "Brown"}. builder.topic("user").mapValues((user) -> > > > user.get("lastName")) will create a KTable that would forward updates > > from > > > the user topic even if lastName never changed. > > > > > > My proposed API would be to add a filterRedundant method to KTable; one > > > override takes a Comparator<V> to provide a custom comparison for > > > evaluating whether a change is redundant, and one parameterless > override > > > would use a comparator backed by the object's equals() method. > > > > > > /** > > > * Creates a new instance of {@link KTable} that filters out > redundant > > > updates and prevents "non-updates" from > > > * propagating to further operations on the returned table. A > > > redundant update onewhere the same value is provided > > > * more than once for a given key. Object.equals is used to compare > > > whether a subsequent update has the same value. > > > > > > * @return a {@link KTable} that contains the same values as this > > > table, but suppresses redundant updates > > > */ > > > KTable<K, V> filterRedundant(); > > > > > > /** > > > * Creates a new instance of {@link KTable} that filters out > redundant > > > updates and prevents "non-updates" from > > > * propagating to further operations on the returned table. A > > > redundant update onewhere the same value is provided > > > * more than once for a given key. A user-provided comparator is > used > > > to compare whether a subsequent update has > > > * the same value. > > > > > > * @return a {@link KTable} that contains the same values as this > > > table, but suppresses redundant updates > > > */ > > > KTable<K, V> filterRedundant(Comparator<V> comparator); > > > > >