Streams skips records with timestamp -1 The metric you mentioned, reports the number of skipped record.
Are you sure that `getEventTimestamp()` never returns -1 ? -Matthias On 4/27/17 10:33 AM, Mahendra Kariya wrote: > Hey Eno, > > We are using a custom TimeStampExtractor class. All messages that we have > in Kafka has a timestamp field. That is what we are using. > The code looks like this. > > public long extract(ConsumerRecord<Object, Object> record, long > previousTimestamp) { > Message message = (Message) record.value(); > return Timestamps.toMillis(message.getEventTimestamp()); > } > > > > > > On Fri, Apr 28, 2017 at 12:40 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Mahendra, >> >> We are currently looking at the skipped-records-rate metric as part of >> https://issues.apache.org/jira/browse/KAFKA-5055 < >> https://issues.apache.org/jira/browse/KAFKA-5055>. Could you let us know >> if you use any special TimeStampExtractor class, or if it is the default? >> >> Thanks >> Eno >>> On 27 Apr 2017, at 13:46, Mahendra Kariya <mahendra.kar...@go-jek.com> >> wrote: >>> >>> Hey All, >>> >>> We have a Kafka Streams application which ingests from a topic to which >> more than 15K messages are generated per second. The app filters a few of >> them, counts the number of unique filtered messages (based on one >> particular field) within a 1 min time window, and dumps it back to Kafka. >>> >>> The issue that we are facing is that for certain minutes, there is no >> data in the sink topic. I have attached the data from 03:30AM to 10:00 AM >> today morning with this mail. And if you notice closely, the data for quite >> a few minutes is missing. >>> >>> One thing that we have noticed is that the skipped-records-rate metrics >> emitted by Kafka is around 200 for each thread. By the way, what does >> metric indicate? Does this represent the filtered out messages? >>> >>> We have checked the raw data in the source topic and didn't find any >> discrepancy. >>> >>> We even checked the logs on the stream app boxes and the only errors we >> found were GC errors. >>> >>> >>> Other relevant info: >>> >>> Kafka version: 0.10.2.0 >>> Number of partitions for source topic: 50 >>> Stream App cluster: 5 machines with 10 threads each >>> >>> How do we debug this? What could be the cause? >>> >>> >>> >>> <data.txt> >> >> >
signature.asc
Description: OpenPGP digital signature