Not sure what you mean by "does not reset recordContext". Note, that the "contract" for `flatMapValues` is that the output records inherit the timestamp of the input record.
Not sure what behavior you expect? Maybe you can elaborate? -Matthias On 7/9/18 7:27 PM, Sicheng Liu wrote: > Hi, > > I found out that doing windowed aggregation on records generated by > flatMapValues gets incorrect result. > > Take this topology as an example: > > myStream.flatMapValues(...) > .groupByKey(...) > .aggregate(...) > > Since inside KStreamWindowAggregateProcessor, the timestamp used to query > the aggregation window is retrieved by context.timestamp(). However, > flatMapValues does not reset recordContext, which gives the wrong result if > the new records generated by flatMapValues fall into different windows. > > Not sure this is a bug or by-design. > > It feels like to me that the flatMapValues is somewhat counter-intuitive in > this case and the aggregate processor should extract timestamps from the > records being aggregated instead of retrieving from the context. > > Thanks, > Sicheng >
signature.asc
Description: OpenPGP digital signature