Hi Matthias, We changed our timestamp extractor code to this.
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) { Message message = (Message) record.value(); long timeInMillis = Timestamps.toMillis(message.getEventTimestamp()); if (timeInMillis < 0) { LOGGER.error("Negative timestamp: {}", timeInMillis); } return timeInMillis; } We don't see any errors in the logs. However, the skipped-records-rate has not come down. On Fri, Apr 28, 2017 at 5:17 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Streams skips records with timestamp -1 > > The metric you mentioned, reports the number of skipped record. > > Are you sure that `getEventTimestamp()` never returns -1 ? > > > > -Matthias > > On 4/27/17 10:33 AM, Mahendra Kariya wrote: > > Hey Eno, > > > > We are using a custom TimeStampExtractor class. All messages that we have > > in Kafka has a timestamp field. That is what we are using. > > The code looks like this. > > > > public long extract(ConsumerRecord<Object, Object> record, long > > previousTimestamp) { > > Message message = (Message) record.value(); > > return Timestamps.toMillis(message.getEventTimestamp()); > > } > > > > > > > > > > > > On Fri, Apr 28, 2017 at 12:40 AM, Eno Thereska <eno.there...@gmail.com> > > wrote: > > > >> Hi Mahendra, > >> > >> We are currently looking at the skipped-records-rate metric as part of > >> https://issues.apache.org/jira/browse/KAFKA-5055 < > >> https://issues.apache.org/jira/browse/KAFKA-5055>. Could you let us > know > >> if you use any special TimeStampExtractor class, or if it is the > default? > >> > >> Thanks > >> Eno > >>> On 27 Apr 2017, at 13:46, Mahendra Kariya <mahendra.kar...@go-jek.com> > >> wrote: > >>> > >>> Hey All, > >>> > >>> We have a Kafka Streams application which ingests from a topic to which > >> more than 15K messages are generated per second. The app filters a few > of > >> them, counts the number of unique filtered messages (based on one > >> particular field) within a 1 min time window, and dumps it back to > Kafka. > >>> > >>> The issue that we are facing is that for certain minutes, there is no > >> data in the sink topic. I have attached the data from 03:30AM to 10:00 > AM > >> today morning with this mail. And if you notice closely, the data for > quite > >> a few minutes is missing. > >>> > >>> One thing that we have noticed is that the skipped-records-rate metrics > >> emitted by Kafka is around 200 for each thread. By the way, what does > >> metric indicate? Does this represent the filtered out messages? > >>> > >>> We have checked the raw data in the source topic and didn't find any > >> discrepancy. > >>> > >>> We even checked the logs on the stream app boxes and the only errors we > >> found were GC errors. > >>> > >>> > >>> Other relevant info: > >>> > >>> Kafka version: 0.10.2.0 > >>> Number of partitions for source topic: 50 > >>> Stream App cluster: 5 machines with 10 threads each > >>> > >>> How do we debug this? What could be the cause? > >>> > >>> > >>> > >>> <data.txt> > >> > >> > > > >