Hi,

I am doing some experiments with kafka-streams KGroupedTable
aggregation, and admittedly I am not wiping data properly on each
restart, partially because I also wonder what would happen if you
change a streams topology without doing a proper reset.

I've noticed that from time to time, kafka-streams
KGroupedTable.reduce() can call subtractor function with null
aggregator value, and if you try to work around that, by interpreting
null aggregator value as zero for numeric value you get incorrect
aggregation result.

I do understand that the proper way of handling this is to do a reset
on topology changes, but I'd like to understand if there's any
legitmate case when kafka-streams can call an adder or a substractor
with null aggregator value, and should I plan for this, or should I
interpret this as an invalid state, and terminate the application, and
do a proper reset?

Also, I can't seem to find a guide which explains when application
reset is necessary. Intuitively it seems that it should be done every
time a topology changes. Any other cases?

I tried to debug where the null value comes from and it seems that
KTableReduce.process() is getting called with Change<V> value with
newValue == null, and some non-null oldValue. Which leads to and to
subtractor being called with null aggregator value. I wonder how it is
possible to have an old value for a key without a new value (does it
happen because of the auto commit interval?).

I've also noticed that it's possible for an input value from a topic
to bypass aggregation function entirely and be directly transmitted to
the output in certain cases: oldAgg is null, newValue is not null and
oldValue is null - in that case newValue will be transmitted directly
to the output. I suppose it's the correct behaviour, but feels a bit
weird nonetheless. And I've actually been able to observe this
behaviour in practice. I suppose it's also caused by this happening
right before a commit happens, and the message is sent to a changelog
topic.

Please can someone with more knowledge shed some light on these issues?

-- 
Best regards,
Vasily Sulatskov

Reply via email to