Just a follow up (we identified a bug in the "skipped records" metric).
The reported value is not correct.


On 4/28/17 9:12 PM, Matthias J. Sax wrote:
> Ok. That makes sense.
> 
> Question: why do you use .aggregate() instead of .count() ?
> 
> Also, can you share the code of you AggregatorFunction()? Did you change
> any default setting of StreamsConfig?
> 
> I have still no idea what could go wrong. Maybe you can run with log
> level TRACE? Maybe we can get some insight from those.
> 
> 
> -Matthias
> 
> On 4/27/17 11:41 PM, Mahendra Kariya wrote:
>> Oh good point!
>>
>> The reason why there is only one row corresponding to each time window is
>> because it only contains the latest value for the time window. So what we
>> did was we just dumped the data present in the sink topic to a db using an
>> upsert query. The primary key of the table was time window. The file that I
>> attached is actually the data present in the DB. And we know that there is
>> no bug in our db dump code because we have been using it for a long time in
>> production without any issues.
>>
>> The reason the count is zero for some time windows is because I subtracted
>> a random number the actual values and rounded it off to zero; for privacy
>> reason. The actual data doesn't have any zero values. I should have
>> mentioned this earlier. My bad!
>>
>> The stream topology code looks something like this.
>>
>> stream
>>     .filter()
>>     .map((key, value) -> new KeyValue<>(transform(key), value)
>>     .groupByKey()
>>     .aggregate(HashSet::new, AggregatorFunction(),
>> TimeWindows.of(60000).until(3600000))
>>     .mapValues(HashSet::size)
>>     .toStream()
>>     .map((key, value) -> convertToProtobufObject(key, value))
>>     .to()
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Thanks for the details (sorry that I forgot that you did share the
>>> output already).
>>>
>>> Might be a dumb question, but what is the count for missing windows in
>>> your seconds implementation?
>>>
>>> If there is no data for a window, it should not emit a window with count
>>> zero, but nothing.
>>>
>>> Thus, looking at your output, I am wondering how it could contain line
>>> like:
>>>
>>>> 2017-04-27T04:53:00 0
>>>
>>> I am also wondering why your output only contains a single value per
>>> window. As Streams outputs multiple updates per window while the count
>>> is increasing, you should actually see multiple records per window.
>>>
>>> Your code is like this:
>>>
>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>>>
>>> Or do you have something more complex?
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
>>>>> Can you somehow verify your output?
>>>>
>>>>
>>>> Do you mean the Kafka streams output? In the Kafka Streams output, we do
>>>> see some missing values. I have attached the Kafka Streams output (for a
>>>> few hours) in the very first email of this thread for reference.
>>>>
>>>> Let me also summarise what we have done so far.
>>>>
>>>> We took a dump of the raw data present in the source topic. We wrote a
>>>> script to read this data and do the exact same aggregations that we do
>>>> using Kafka Streams. And then we compared the output from Kafka Streams
>>> and
>>>> our script.
>>>>
>>>> The difference that we observed in the two outputs is that there were a
>>> few
>>>> rows (corresponding to some time windows) missing in the Streams output.
>>>> For the time windows for which the data was present, the aggregated
>>> numbers
>>>> matched exactly.
>>>>
>>>> This means, either all the records for a particular time window are being
>>>> skipped, or none. Now this is highly unlikely to happen. Maybe there is a
>>>> bug somewhere in the rocksdb state stores? Just a speculation, not sure
>>>> though. And there could even be a bug in the reported metric.
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to