Hi Mathieu, Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do some of this for you, in that it dedups records with the same key and prevents them from being forwarded downstream?
Eno > On 4 Dec 2016, at 04:13, Mathieu Fenniak <mathieu.fenn...@replicon.com> wrote: > > Hey all, > > I'd like to contribute a new KTable API that would allow for the > suppression of redundant KTable forwards, and I'd like to solicit feedback > before I put together a patch. > > A typical use-case of this API would be that you're using mapValues to > pluck a subset of data out of a topic, but you'd like changes to the record > value that don't modify the output of mapValues to not cause output that > trigger expensive and redundant recalculations. > > For example, topic "user" contains key:1, value:{"firstName": "Jack", > "lastName": "Brown"}. builder.topic("user").mapValues((user) -> > user.get("lastName")) will create a KTable that would forward updates from > the user topic even if lastName never changed. > > My proposed API would be to add a filterRedundant method to KTable; one > override takes a Comparator<V> to provide a custom comparison for > evaluating whether a change is redundant, and one parameterless override > would use a comparator backed by the object's equals() method. > > /** > * Creates a new instance of {@link KTable} that filters out redundant > updates and prevents "non-updates" from > * propagating to further operations on the returned table. A > redundant update onewhere the same value is provided > * more than once for a given key. Object.equals is used to compare > whether a subsequent update has the same value. > > * @return a {@link KTable} that contains the same values as this > table, but suppresses redundant updates > */ > KTable<K, V> filterRedundant(); > > /** > * Creates a new instance of {@link KTable} that filters out redundant > updates and prevents "non-updates" from > * propagating to further operations on the returned table. A > redundant update onewhere the same value is provided > * more than once for a given key. A user-provided comparator is used > to compare whether a subsequent update has > * the same value. > > * @return a {@link KTable} that contains the same values as this > table, but suppresses redundant updates > */ > KTable<K, V> filterRedundant(Comparator<V> comparator);