Hi Mathieu,

Thanks for the suggestion. Wouldn't the cache introduced in KIP-63 do some of 
this for you, in that it dedups records with the same key and prevents them 
from being forwarded downstream?

Eno
> On 4 Dec 2016, at 04:13, Mathieu Fenniak <mathieu.fenn...@replicon.com> wrote:
> 
> Hey all,
> 
> I'd like to contribute a new KTable API that would allow for the
> suppression of redundant KTable forwards, and I'd like to solicit feedback
> before I put together a patch.
> 
> A typical use-case of this API would be that you're using mapValues to
> pluck a subset of data out of a topic, but you'd like changes to the record
> value that don't modify the output of mapValues to not cause output that
> trigger expensive and redundant recalculations.
> 
> For example, topic "user" contains key:1, value:{"firstName": "Jack",
> "lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
> user.get("lastName"))  will create a KTable that would forward updates from
> the user topic even if lastName never changed.
> 
> My proposed API would be to add a filterRedundant method to KTable; one
> override takes a Comparator<V> to provide a custom comparison for
> evaluating whether a change is redundant, and one parameterless override
> would use a comparator backed by the object's equals() method.
> 
>    /**
>     * Creates a new instance of {@link KTable} that filters out redundant
> updates and prevents "non-updates" from
>     * propagating to further operations on the returned table.  A
> redundant update onewhere the same value is provided
>     * more than once for a given key.  Object.equals is used to compare
> whether a subsequent update has the same value.
> 
>     * @return a {@link KTable} that contains the same values as this
> table, but suppresses redundant updates
>     */
>    KTable<K, V> filterRedundant();
> 
>    /**
>     * Creates a new instance of {@link KTable} that filters out redundant
> updates and prevents "non-updates" from
>     * propagating to further operations on the returned table.  A
> redundant update onewhere the same value is provided
>     * more than once for a given key.  A user-provided comparator is used
> to compare whether a subsequent update has
>     * the same value.
> 
>     * @return a {@link KTable} that contains the same values as this
> table, but suppresses redundant updates
>     */
>    KTable<K, V> filterRedundant(Comparator<V> comparator);

Reply via email to