Hi, I found out that doing windowed aggregation on records generated by flatMapValues gets incorrect result.
Take this topology as an example: myStream.flatMapValues(...) .groupByKey(...) .aggregate(...) Since inside KStreamWindowAggregateProcessor, the timestamp used to query the aggregation window is retrieved by context.timestamp(). However, flatMapValues does not reset recordContext, which gives the wrong result if the new records generated by flatMapValues fall into different windows. Not sure this is a bug or by-design. It feels like to me that the flatMapValues is somewhat counter-intuitive in this case and the aggregate processor should extract timestamps from the records being aggregated instead of retrieving from the context. Thanks, Sicheng