Hi,

I found out that doing windowed aggregation on records generated by
flatMapValues gets incorrect result.

Take this topology as an example:

myStream.flatMapValues(...)
                .groupByKey(...)
                .aggregate(...)

Since inside KStreamWindowAggregateProcessor, the timestamp used to query
the aggregation window is retrieved by context.timestamp(). However,
flatMapValues does not reset recordContext, which gives the wrong result if
the new records generated by flatMapValues fall into different windows.

Not sure this is a bug or by-design.

It feels like to me that the flatMapValues is somewhat counter-intuitive in
this case and the aggregate processor should extract timestamps from the
records being aggregated instead of retrieving from the context.

Thanks,
Sicheng

Reply via email to