This is answered in :-

https://stackoverflow.com/questions/51991805/samza-0-14-1-not-correctly-handling-offsetoutofrangeexception-exception/52028830#52028830

On Fri, Aug 24, 2018 at 9:55 AM Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Hi
>
> We are facing an issue with Samza 0.14.1 and Kafka 1.1.0. The details have
> been posted in samza mailing list
> <https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E>and
> stackoverflow
> <https://stackoverflow.com/questions/51991805/samza-0-14-1-not-correctly-handling-offsetoutofrangeexception-exception>.
> We did not get any response there. So posting the details here again. Any
> help is greatly appreciated.
>
> *We are facing an identical problem as described in this thread
> <https://www.mail-archive.com/dev@samza.apache.org/msg06740.html>.*
> *Here - Samza is requesting for an Kafka partition offset that is too old
> (i.e Kafka log has moved ahead). We are setting the
> property consumer.auto.offset.reset to smallestand therefore expecting that
> Samza will reset its checkpoint to earliest available partition offset in
> such a scenario. But that is not happening we are getting exceptions of
> this form continually: *
>
> *INFO [2018-08-21 19:26:20,924]
> [U:669,F:454,T:1,123,M:2,658] 
> kafka.producer.SyncProducer:[Logging_class:info:66]
> - [main] - Disconnecting from vrni-platform-release:9092*
> *INFO [2018-08-21 19:26:20,924]
> [U:669,F:454,T:1,123,M:2,658] system.kafka.GetOffset:[Logging_class:info:63]
> - [main] - Validating offset 56443499 for topic and partition Topic3-0*
> *WARN [2018-08-21 19:26:20,925]
> [U:669,F:454,T:1,123,M:2,658] 
> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74]
> - [main] - While refreshing brokers for Topic3
> 0:org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested*
> *offset is not within the range of offsets maintained by the server..*
> *Retrying*
>
> *Version Details*
>
>    -
> *Samza: 2.11-0.14.1 *
>    -
> *Kafka Client: 1.1.0 *
>    -
> *Kafka Server: 1.1.0 Scala 2.11 *
>
> *Browsing through the code, it appears that GetOffset::isValidOffset
> should be able to catch the exception OffsetOutOfRangeException and convert
> it to a false value. But it appears that this not happening. Could there be
> a mismatch in package of the Exception? GetOffSet
> <https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala#L56>
> class is catching the exception import
> kafka.common.OffsetOutOfRangeException, but from logs, it appears that the
> package of this class is different. Could this be the reason?*
>
> *def isValidOffset(consumer: DefaultFetchSimpleConsumer,
> topicAndPartition: TopicAndPartition, offset: String) = {*
> *    info("Validating offset %s for topic and partition %s" format
> (offset, topicAndPartition))*
> *    try {*
> *      val messages = consumer.defaultFetch((topicAndPartition,
> offset.toLong))*
> *      if (messages.hasError) {*
> *
> KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic,
> topicAndPartition.partition).exception())*
> *      }*
> *      info("Able to successfully read from offset %s for topic and
> partition %s. Using it to instantiate consumer." format (offset,
> topicAndPartition))*
> *      true*
> *    } catch {*
> *      case e: OffsetOutOfRangeException => false*
> *    }*
> *}*
>
> *Also, it Appears that BrokerProxy
> <https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L85>
> class - the caller of GetOffset would print a log "It appears that..." in
> case it gets a false value, but it is not logging this line (indicating
> that some Exception generated in GetOffset method is going uncaught and
> being propagated up):*
>
> *def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String])
> = {*
> *    debug("Adding new topic and partition %s to queue for %s" format (tp,
> host))*
> *    if (nextOffsets.asJava.containsKey(tp)) {*
> *      toss("Already consuming TopicPartition %s" format tp)*
> *    }*
> *    val offset = if (nextOffset.isDefined &&
> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {*
> *      nextOffset*
> *        .get*
> *        .toLong*
> *    } else {*
> *      warn("It appears that we received an invalid or empty offset %s for
> %s. Attempting to use Kafka's auto.offset.reset setting. This can result in
> data loss if processing continues." format (nextOffset, tp))*
> *      offsetGetter.getResetOffset(simpleConsumer, tp)*
> *    }*
> *    debug("Got offset %s for new topic and partition %s." format (offset,
> tp))*
> *    nextOffsets += tp -> offset*
> *    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)*
> *  }*
>
> *Could this be due to the mismatch in Kafka client library version that we
> are using? Is there a recommended Kafka client version we should use with
> Samza 0.14.1 (assuming that Kafka server is 1.x)?*
>
> *Any help regarding this will be greatly appreciated.*
>
>

Reply via email to