This is a specific example:

We are sending metrics to Kafka Stream with the following layout:

Record Key:
--------------------------------------
|    metric name    |    tags    |
--------------------------------------

Record Value:
----------------------------------------------------------------------------
------------------------------------------------------
|    timestamp1    |    value1    |    timestamp2    |    value2    |
...    |    timestampN    |    valueN    |
----------------------------------------------------------------------------
------------------------------------------------------

The record value contains a list of timestamps and values. The timestamps
may have a range of 10 minutes.

You may ask why we do this, the reason is to save Kafka disk space by
avoiding sending duplicated the large record key. (we know Kafka can do
compaction on producer side, but this format with compaction uses less
space).

So, in this case, we want to aggregate metrics into 5-minute window and we
used faltMapValues to flat the metric series into individual metrics:

myStream.flatMapValues(...)
                .groupByKey(...)
                .aggregate(...)

Because with flatMapValues, all flatten metrics share the same timestamp.
They are all aggregated into the same window (instead of two), which gives
incorrect result.

But when I look at the API document, I had the impression that it would
behave like sending individual metrics.

Hopefully I gave a better explanation of what I try to achieve.

Thanks,
Sicheng

On Tue, Jul 10, 2018 at 9:32 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Not sure what you mean by "does not reset recordContext".
>
> Note, that the "contract" for `flatMapValues` is that the output records
> inherit the timestamp of the input record.
>
> Not sure what behavior you expect? Maybe you can elaborate?
>
>
> -Matthias
>
> On 7/9/18 7:27 PM, Sicheng Liu wrote:
> > Hi,
> >
> > I found out that doing windowed aggregation on records generated by
> > flatMapValues gets incorrect result.
> >
> > Take this topology as an example:
> >
> > myStream.flatMapValues(...)
> >                 .groupByKey(...)
> >                 .aggregate(...)
> >
> > Since inside KStreamWindowAggregateProcessor, the timestamp used to
> query
> > the aggregation window is retrieved by context.timestamp(). However,
> > flatMapValues does not reset recordContext, which gives the wrong result
> if
> > the new records generated by flatMapValues fall into different windows.
> >
> > Not sure this is a bug or by-design.
> >
> > It feels like to me that the flatMapValues is somewhat counter-intuitive
> in
> > this case and the aggregate processor should extract timestamps from the
> > records being aggregated instead of retrieving from the context.
> >
> > Thanks,
> > Sicheng
> >
>
>

Reply via email to