Just a follow up (we identified a bug in the "skipped records" metric). The reported value is not correct.
On 4/28/17 9:12 PM, Matthias J. Sax wrote: > Ok. That makes sense. > > Question: why do you use .aggregate() instead of .count() ? > > Also, can you share the code of you AggregatorFunction()? Did you change > any default setting of StreamsConfig? > > I have still no idea what could go wrong. Maybe you can run with log > level TRACE? Maybe we can get some insight from those. > > > -Matthias > > On 4/27/17 11:41 PM, Mahendra Kariya wrote: >> Oh good point! >> >> The reason why there is only one row corresponding to each time window is >> because it only contains the latest value for the time window. So what we >> did was we just dumped the data present in the sink topic to a db using an >> upsert query. The primary key of the table was time window. The file that I >> attached is actually the data present in the DB. And we know that there is >> no bug in our db dump code because we have been using it for a long time in >> production without any issues. >> >> The reason the count is zero for some time windows is because I subtracted >> a random number the actual values and rounded it off to zero; for privacy >> reason. The actual data doesn't have any zero values. I should have >> mentioned this earlier. My bad! >> >> The stream topology code looks something like this. >> >> stream >> .filter() >> .map((key, value) -> new KeyValue<>(transform(key), value) >> .groupByKey() >> .aggregate(HashSet::new, AggregatorFunction(), >> TimeWindows.of(60000).until(3600000)) >> .mapValues(HashSet::size) >> .toStream() >> .map((key, value) -> convertToProtobufObject(key, value)) >> .to() >> >> >> >> >> >> >> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Thanks for the details (sorry that I forgot that you did share the >>> output already). >>> >>> Might be a dumb question, but what is the count for missing windows in >>> your seconds implementation? >>> >>> If there is no data for a window, it should not emit a window with count >>> zero, but nothing. >>> >>> Thus, looking at your output, I am wondering how it could contain line >>> like: >>> >>>> 2017-04-27T04:53:00 0 >>> >>> I am also wondering why your output only contains a single value per >>> window. As Streams outputs multiple updates per window while the count >>> is increasing, you should actually see multiple records per window. >>> >>> Your code is like this: >>> >>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to(); >>> >>> Or do you have something more complex? >>> >>> >>> -Matthias >>> >>> >>> On 4/27/17 9:16 PM, Mahendra Kariya wrote: >>>>> Can you somehow verify your output? >>>> >>>> >>>> Do you mean the Kafka streams output? In the Kafka Streams output, we do >>>> see some missing values. I have attached the Kafka Streams output (for a >>>> few hours) in the very first email of this thread for reference. >>>> >>>> Let me also summarise what we have done so far. >>>> >>>> We took a dump of the raw data present in the source topic. We wrote a >>>> script to read this data and do the exact same aggregations that we do >>>> using Kafka Streams. And then we compared the output from Kafka Streams >>> and >>>> our script. >>>> >>>> The difference that we observed in the two outputs is that there were a >>> few >>>> rows (corresponding to some time windows) missing in the Streams output. >>>> For the time windows for which the data was present, the aggregated >>> numbers >>>> matched exactly. >>>> >>>> This means, either all the records for a particular time window are being >>>> skipped, or none. Now this is highly unlikely to happen. Maybe there is a >>>> bug somewhere in the rocksdb state stores? Just a speculation, not sure >>>> though. And there could even be a bug in the reported metric. >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature