By default, Kafka uses the record metadata timestamp. Thus, I assume that you use a custom timestamp extractor? If yes, what timestamp do you extract, as it seems you have multiple?
To make it work, you will need to write the data back to Kafka into a new topic after the flatpMap() and read each record back individually. This allows you, via a custom timestamp extractor, to get the timestamp from the value. stream.flatMap(...).to("myTopic"); builder.stream("myTopic", Consumed.with( /* pass in custom timestamp extractor */)). Hope this helps. -Matthias On 7/10/18 10:32 AM, Sicheng Liu wrote: > This is a specific example: > > We are sending metrics to Kafka Stream with the following layout: > > Record Key: > -------------------------------------- > | metric name | tags | > -------------------------------------- > > Record Value: > ---------------------------------------------------------------------------- > ------------------------------------------------------ > | timestamp1 | value1 | timestamp2 | value2 | > ... | timestampN | valueN | > ---------------------------------------------------------------------------- > ------------------------------------------------------ > > The record value contains a list of timestamps and values. The timestamps > may have a range of 10 minutes. > > You may ask why we do this, the reason is to save Kafka disk space by > avoiding sending duplicated the large record key. (we know Kafka can do > compaction on producer side, but this format with compaction uses less > space). > > So, in this case, we want to aggregate metrics into 5-minute window and we > used faltMapValues to flat the metric series into individual metrics: > > myStream.flatMapValues(...) > .groupByKey(...) > .aggregate(...) > > Because with flatMapValues, all flatten metrics share the same timestamp. > They are all aggregated into the same window (instead of two), which gives > incorrect result. > > But when I look at the API document, I had the impression that it would > behave like sending individual metrics. > > Hopefully I gave a better explanation of what I try to achieve. > > Thanks, > Sicheng > > On Tue, Jul 10, 2018 at 9:32 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Not sure what you mean by "does not reset recordContext". >> >> Note, that the "contract" for `flatMapValues` is that the output records >> inherit the timestamp of the input record. >> >> Not sure what behavior you expect? Maybe you can elaborate? >> >> >> -Matthias >> >> On 7/9/18 7:27 PM, Sicheng Liu wrote: >>> Hi, >>> >>> I found out that doing windowed aggregation on records generated by >>> flatMapValues gets incorrect result. >>> >>> Take this topology as an example: >>> >>> myStream.flatMapValues(...) >>> .groupByKey(...) >>> .aggregate(...) >>> >>> Since inside KStreamWindowAggregateProcessor, the timestamp used to >> query >>> the aggregation window is retrieved by context.timestamp(). However, >>> flatMapValues does not reset recordContext, which gives the wrong result >> if >>> the new records generated by flatMapValues fall into different windows. >>> >>> Not sure this is a bug or by-design. >>> >>> It feels like to me that the flatMapValues is somewhat counter-intuitive >> in >>> this case and the aggregate processor should extract timestamps from the >>> records being aggregated instead of retrieving from the context. >>> >>> Thanks, >>> Sicheng >>> >> >> >
signature.asc
Description: OpenPGP digital signature