Hi Matthias,

We changed our timestamp extractor code to this.

public long extract(ConsumerRecord<Object, Object> record, long
previousTimestamp) {
    Message message = (Message) record.value();
    long timeInMillis = Timestamps.toMillis(message.getEventTimestamp());

    if (timeInMillis < 0) {
        LOGGER.error("Negative timestamp: {}", timeInMillis);
    }

    return timeInMillis;
}


We don't see any errors in the logs. However, the skipped-records-rate has
not come down.




On Fri, Apr 28, 2017 at 5:17 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Streams skips records with timestamp -1
>
> The metric you mentioned, reports the number of skipped record.
>
> Are you sure that `getEventTimestamp()` never returns -1 ?
>
>
>
> -Matthias
>
> On 4/27/17 10:33 AM, Mahendra Kariya wrote:
> > Hey Eno,
> >
> > We are using a custom TimeStampExtractor class. All messages that we have
> > in Kafka has a timestamp field. That is what we are using.
> > The code looks like this.
> >
> > public long extract(ConsumerRecord<Object, Object> record, long
> > previousTimestamp) {
> >     Message message = (Message) record.value();
> >     return Timestamps.toMillis(message.getEventTimestamp());
> > }
> >
> >
> >
> >
> >
> > On Fri, Apr 28, 2017 at 12:40 AM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Hi Mahendra,
> >>
> >> We are currently looking at the skipped-records-rate metric as part of
> >> https://issues.apache.org/jira/browse/KAFKA-5055 <
> >> https://issues.apache.org/jira/browse/KAFKA-5055>. Could you let us
> know
> >> if you use any special TimeStampExtractor class, or if it is the
> default?
> >>
> >> Thanks
> >> Eno
> >>> On 27 Apr 2017, at 13:46, Mahendra Kariya <mahendra.kar...@go-jek.com>
> >> wrote:
> >>>
> >>> Hey All,
> >>>
> >>> We have a Kafka Streams application which ingests from a topic to which
> >> more than 15K messages are generated per second. The app filters a few
> of
> >> them, counts the number of unique filtered messages (based on one
> >> particular field) within a 1 min time window, and dumps it back to
> Kafka.
> >>>
> >>> The issue that we are facing is that for certain minutes, there is no
> >> data in the sink topic. I have attached the data from 03:30AM to 10:00
> AM
> >> today morning with this mail. And if you notice closely, the data for
> quite
> >> a few minutes is missing.
> >>>
> >>> One thing that we have noticed is that the skipped-records-rate metrics
> >> emitted by Kafka is around 200 for each thread. By the way, what does
> >> metric indicate? Does this represent the filtered out messages?
> >>>
> >>> We have checked the raw data in the source topic and didn't find any
> >> discrepancy.
> >>>
> >>> We even checked the logs on the stream app boxes and the only errors we
> >> found were GC errors.
> >>>
> >>>
> >>> Other relevant info:
> >>>
> >>> Kafka version: 0.10.2.0
> >>> Number of partitions for source topic: 50
> >>> Stream App cluster: 5 machines with 10 threads each
> >>>
> >>>  How do we debug this? What could be the cause?
> >>>
> >>>
> >>>
> >>> <data.txt>
> >>
> >>
> >
>
>

Reply via email to