Thanks for reporting back! As Eno mentioned, we do have a JIRA for the same report as yours: https://issues.apache.org/jira/browse/KAFKA-5055
We are investigating... Can you somehow verify your output? Do you see missing values (not sure how easy it is to verify -- we are not sure yet if the reported metric is just wrong). Thanks a lot! -Matthias On 4/27/17 8:22 PM, Mahendra Kariya wrote: > Hi Matthias, > > We changed our timestamp extractor code to this. > > public long extract(ConsumerRecord<Object, Object> record, long > previousTimestamp) { > Message message = (Message) record.value(); > long timeInMillis = Timestamps.toMillis(message.getEventTimestamp()); > > if (timeInMillis < 0) { > LOGGER.error("Negative timestamp: {}", timeInMillis); > } > > return timeInMillis; > } > > > We don't see any errors in the logs. However, the skipped-records-rate has > not come down. > > > > > On Fri, Apr 28, 2017 at 5:17 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Streams skips records with timestamp -1 >> >> The metric you mentioned, reports the number of skipped record. >> >> Are you sure that `getEventTimestamp()` never returns -1 ? >> >> >> >> -Matthias >> >> On 4/27/17 10:33 AM, Mahendra Kariya wrote: >>> Hey Eno, >>> >>> We are using a custom TimeStampExtractor class. All messages that we have >>> in Kafka has a timestamp field. That is what we are using. >>> The code looks like this. >>> >>> public long extract(ConsumerRecord<Object, Object> record, long >>> previousTimestamp) { >>> Message message = (Message) record.value(); >>> return Timestamps.toMillis(message.getEventTimestamp()); >>> } >>> >>> >>> >>> >>> >>> On Fri, Apr 28, 2017 at 12:40 AM, Eno Thereska <eno.there...@gmail.com> >>> wrote: >>> >>>> Hi Mahendra, >>>> >>>> We are currently looking at the skipped-records-rate metric as part of >>>> https://issues.apache.org/jira/browse/KAFKA-5055 < >>>> https://issues.apache.org/jira/browse/KAFKA-5055>. Could you let us >> know >>>> if you use any special TimeStampExtractor class, or if it is the >> default? >>>> >>>> Thanks >>>> Eno >>>>> On 27 Apr 2017, at 13:46, Mahendra Kariya <mahendra.kar...@go-jek.com> >>>> wrote: >>>>> >>>>> Hey All, >>>>> >>>>> We have a Kafka Streams application which ingests from a topic to which >>>> more than 15K messages are generated per second. The app filters a few >> of >>>> them, counts the number of unique filtered messages (based on one >>>> particular field) within a 1 min time window, and dumps it back to >> Kafka. >>>>> >>>>> The issue that we are facing is that for certain minutes, there is no >>>> data in the sink topic. I have attached the data from 03:30AM to 10:00 >> AM >>>> today morning with this mail. And if you notice closely, the data for >> quite >>>> a few minutes is missing. >>>>> >>>>> One thing that we have noticed is that the skipped-records-rate metrics >>>> emitted by Kafka is around 200 for each thread. By the way, what does >>>> metric indicate? Does this represent the filtered out messages? >>>>> >>>>> We have checked the raw data in the source topic and didn't find any >>>> discrepancy. >>>>> >>>>> We even checked the logs on the stream app boxes and the only errors we >>>> found were GC errors. >>>>> >>>>> >>>>> Other relevant info: >>>>> >>>>> Kafka version: 0.10.2.0 >>>>> Number of partitions for source topic: 50 >>>>> Stream App cluster: 5 machines with 10 threads each >>>>> >>>>> How do we debug this? What could be the cause? >>>>> >>>>> >>>>> >>>>> <data.txt> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature