Not sure what you mean by "does not reset recordContext".

Note, that the "contract" for `flatMapValues` is that the output records
inherit the timestamp of the input record.

Not sure what behavior you expect? Maybe you can elaborate?


-Matthias

On 7/9/18 7:27 PM, Sicheng Liu wrote:
> Hi,
> 
> I found out that doing windowed aggregation on records generated by
> flatMapValues gets incorrect result.
> 
> Take this topology as an example:
> 
> myStream.flatMapValues(...)
>                 .groupByKey(...)
>                 .aggregate(...)
> 
> Since inside KStreamWindowAggregateProcessor, the timestamp used to query
> the aggregation window is retrieved by context.timestamp(). However,
> flatMapValues does not reset recordContext, which gives the wrong result if
> the new records generated by flatMapValues fall into different windows.
> 
> Not sure this is a bug or by-design.
> 
> It feels like to me that the flatMapValues is somewhat counter-intuitive in
> this case and the aggregate processor should extract timestamps from the
> records being aggregated instead of retrieving from the context.
> 
> Thanks,
> Sicheng
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to