Hello again, kafka-users,

When I aggregate a KTable, a future input that updates a KTable's
value for a specific key causes the aggregate's subtractor to be
invoked, and then its adder.  This part is great, completely
as-expected.

But what I didn't expect is that the intermediate result of the
subtractor would be sent downstream.  This value doesn't reflect the
reality of the inputs to the aggregator, so sending it downstream is
effectively sending "corrupt" data to the next processing node.  Is
this the expected behavior, or is this a bug?

Take for example, a table of blog articles and an aggregator that
counts the number of words in each category of the blog:

topic: articles
  K1, {"category": "kafka", "text": "word1, word2, word3"}
  K2, {"category": "kafka", "text": "word1, word2"}

articles.groupBy((k,v) -> v.category)
  .aggregate(() -> 0,
    (k,v,t) -> t + v.text.split(" ").length,
    (k,v,t) -> t - v.text.split(" ").length
  )

This aggregator will produce {key: "kafka", value: 3}, then {key:
"kafka", value: 5}.  If I update one of the blog articles and send a
new message to the articles topic:

  K1, {"category": "kafka", "text": "word1, word2, word3, word4"}

The aggregator will first produce {key: "kafka", value: 2} when the
subtractor is called, then will produce {key: "kafka", value: 6} when
the adder is called.  The subtractor's calculation does not actually
match the reality; K1 was never deleted, it was just updated.

Mathieu

Reply via email to