This is answered in :- https://stackoverflow.com/questions/51991805/samza-0-14-1-not-correctly-handling-offsetoutofrangeexception-exception/52028830#52028830
On Fri, Aug 24, 2018 at 9:55 AM Debraj Manna <subharaj.ma...@gmail.com> wrote: > Hi > > We are facing an issue with Samza 0.14.1 and Kafka 1.1.0. The details have > been posted in samza mailing list > <https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E>and > stackoverflow > <https://stackoverflow.com/questions/51991805/samza-0-14-1-not-correctly-handling-offsetoutofrangeexception-exception>. > We did not get any response there. So posting the details here again. Any > help is greatly appreciated. > > *We are facing an identical problem as described in this thread > <https://www.mail-archive.com/dev@samza.apache.org/msg06740.html>.* > *Here - Samza is requesting for an Kafka partition offset that is too old > (i.e Kafka log has moved ahead). We are setting the > property consumer.auto.offset.reset to smallestand therefore expecting that > Samza will reset its checkpoint to earliest available partition offset in > such a scenario. But that is not happening we are getting exceptions of > this form continually: * > > *INFO [2018-08-21 19:26:20,924] > [U:669,F:454,T:1,123,M:2,658] > kafka.producer.SyncProducer:[Logging_class:info:66] > - [main] - Disconnecting from vrni-platform-release:9092* > *INFO [2018-08-21 19:26:20,924] > [U:669,F:454,T:1,123,M:2,658] system.kafka.GetOffset:[Logging_class:info:63] > - [main] - Validating offset 56443499 for topic and partition Topic3-0* > *WARN [2018-08-21 19:26:20,925] > [U:669,F:454,T:1,123,M:2,658] > system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] > - [main] - While refreshing brokers for Topic3 > 0:org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested* > *offset is not within the range of offsets maintained by the server..* > *Retrying* > > *Version Details* > > - > *Samza: 2.11-0.14.1 * > - > *Kafka Client: 1.1.0 * > - > *Kafka Server: 1.1.0 Scala 2.11 * > > *Browsing through the code, it appears that GetOffset::isValidOffset > should be able to catch the exception OffsetOutOfRangeException and convert > it to a false value. But it appears that this not happening. Could there be > a mismatch in package of the Exception? GetOffSet > <https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala#L56> > class is catching the exception import > kafka.common.OffsetOutOfRangeException, but from logs, it appears that the > package of this class is different. Could this be the reason?* > > *def isValidOffset(consumer: DefaultFetchSimpleConsumer, > topicAndPartition: TopicAndPartition, offset: String) = {* > * info("Validating offset %s for topic and partition %s" format > (offset, topicAndPartition))* > * try {* > * val messages = consumer.defaultFetch((topicAndPartition, > offset.toLong))* > * if (messages.hasError) {* > * > KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, > topicAndPartition.partition).exception())* > * }* > * info("Able to successfully read from offset %s for topic and > partition %s. Using it to instantiate consumer." format (offset, > topicAndPartition))* > * true* > * } catch {* > * case e: OffsetOutOfRangeException => false* > * }* > *}* > > *Also, it Appears that BrokerProxy > <https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L85> > class - the caller of GetOffset would print a log "It appears that..." in > case it gets a false value, but it is not logging this line (indicating > that some Exception generated in GetOffset method is going uncaught and > being propagated up):* > > *def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) > = {* > * debug("Adding new topic and partition %s to queue for %s" format (tp, > host))* > * if (nextOffsets.asJava.containsKey(tp)) {* > * toss("Already consuming TopicPartition %s" format tp)* > * }* > * val offset = if (nextOffset.isDefined && > offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {* > * nextOffset* > * .get* > * .toLong* > * } else {* > * warn("It appears that we received an invalid or empty offset %s for > %s. Attempting to use Kafka's auto.offset.reset setting. This can result in > data loss if processing continues." format (nextOffset, tp))* > * offsetGetter.getResetOffset(simpleConsumer, tp)* > * }* > * debug("Got offset %s for new topic and partition %s." format (offset, > tp))* > * nextOffsets += tp -> offset* > * metrics.topicPartitions.get((host, port)).set(nextOffsets.size)* > * }* > > *Could this be due to the mismatch in Kafka client library version that we > are using? Is there a recommended Kafka client version we should use with > Samza 0.14.1 (assuming that Kafka server is 1.x)?* > > *Any help regarding this will be greatly appreciated.* > >