Hi, all,

Thanks for digging to the roots of it! The patch is merged and submitted.
Please let us know whether that fixes your problem.

Please also note that SAMZA-1776 is going to deprecate the old consumer lib
from Samza code base soon.

Thanks!

-Yi

On Sat, Aug 25, 2018 at 5:34 AM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Uploaded Gaurav Agarwala's patch as discussed in this email thread to
> https://issues.apache.org/jira/browse/SAMZA-1822 .
>
> On Sat, Aug 25, 2018 at 4:21 PM Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
>> https://issues.apache.org/jira/browse/SAMZA-1822 is filed for this.
>>
>> On Sat, Aug 25, 2018 at 5:56 AM Yi Pan <nickpa...@gmail.com> wrote:
>>
>>> Hi, Gaurav,
>>>
>>> Thanks for working on the patch for the problem. Could you open a ticket
>>> and PR for the change? The dev mailing list stripping off all attachments
>>> and is hard to follow if the change is not embedded as text in the email.
>>>
>>> And to Debraj's comment, yes, we are aware of it and are working on
>>> removing the old consumer usage in Samza code base as the coming release
>>> (
>>> https://issues.apache.org/jira/browse/SAMZA-1776).
>>>
>>> Thank you all!
>>>
>>> On Fri, Aug 24, 2018 at 4:33 AM, Debraj Manna <subharaj.ma...@gmail.com>
>>> wrote:
>>>
>>> > It seems the above issue is coming because of KIP-35
>>> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> > 109%3A+Old+Consumer+Deprecation>
>>> > &
>>> > its related PR <https://github.com/apache/kafka/pull/2328> (KAFKA-3264
>>> > <https://issues.apache.org/jira/browse/KAFKA-3264>). Filed
>>> > https://issues.apache.org/jira/browse/SAMZA-1822 for this.
>>> >
>>> > On Fri, Aug 24, 2018 at 2:39 PM Gaurav Agarwal <
>>> gauravagarw...@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi All,
>>> > >
>>> > > By patching the samza codebase locally that this error goes away:
>>> > > Patch involves changing the import for OffsetOutOfRangeException
>>> class in
>>> > > file
>>> > >
>>> > > *samza/samza-kafka/src/main/scala/org/apache/samza/system/
>>> > kafka/GetOffset.scala*
>>> > > to *import org.apache.kafka.common.errors.OffsetOutOfRangeException*
>>> > >
>>> > > Can you please confirm if this change is good? And if so can a quick
>>> > patch
>>> > > release with it be made available?
>>> > >
>>> > > Independently, does this release needs to be verified for any more
>>> such
>>> > > similar errors (possibly due to change in class packages etc.)? Not
>>> > trying
>>> > > to cast aspersions on this release, but just trying to ask the next
>>> thing
>>> > > that naturally comes to mind :-)
>>> > >
>>> > > --
>>> > > thanks,
>>> > > gaurav
>>> > >
>>> > >
>>> > > On Thu, Aug 23, 2018 at 7:06 PM Gaurav Agarwal <
>>> gauravagarw...@gmail.com
>>> > >
>>> > > wrote:
>>> > >
>>> > > > Few more notes (based on reading a similar thread from few days
>>> ago):
>>> > > > - this exception is while initializing offset for the data topic
>>> > > partition
>>> > > > (not samza's checkpoint topic/partition)
>>> > > > - we have manually verified that due to some issue, kafka data-logs
>>> > have
>>> > > > rolled over and the earliest available offset is greater than what
>>> > samza
>>> > > > has in its checkpoint - and hence when samza is querying kafka
>>> with the
>>> > > > offset it checkpointed last, it is seeing this error.
>>> > > >
>>> > > > Please let me know if more logs are required.
>>> > > >
>>> > > >
>>> > > > On Thu, Aug 23, 2018 at 4:30 PM Gaurav Agarwal <
>>> > gauravagarw...@gmail.com
>>> > > >
>>> > > > wrote:
>>> > > >
>>> > > >> Hi All,
>>> > > >>
>>> > > >> We are facing identical problem as described in thread
>>> > > >> https://www.mail-archive.com/dev@samza.apache.org/msg06740.html
>>> > > >>
>>> > > >> Here - Samza is requesting for an Kafka partition offset that is
>>> too
>>> > old
>>> > > >> (i.e Kafka log has moved ahead). We are setting the property
>>> > > *consumer.auto.offset.reset
>>> > > >> to smallest* and therefore expecting that Samza will reset its
>>> > > >> checkpoint to earliest available partition offset in such a
>>> scenario.
>>> > > But
>>> > > >> that is not happening  we are getting exceptions of this form
>>> > > continually:
>>> > > >>
>>> > > >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
>>> > > >> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
>>> > > >> Disconnecting from vrni-platform-release:9092
>>> > > >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
>>> > > >> system.kafka.GetOffset:[Logging_class:info:63] - [main] -
>>> Validating
>>> > > offset
>>> > > >> 56443499 for topic and partition Topic3-0
>>> > > >> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
>>> > > >> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] -
>>> [main] -
>>> > > While
>>> > > >> refreshing brokers for Topic3-0:
>>> > > >> org.apache.kafka.common.errors.OffsetOutOfRangeException: The
>>> > requested
>>> > > >> offset is not within the range of offsets maintained by the
>>> server..
>>> > > >> Retrying.
>>> > > >>
>>> > > >> *Version Details:*
>>> > > >>
>>> > > >> *Samza: 2.11-0.14.1*
>>> > > >> *Kafka Client: 1.1.0 *
>>> > > >> *Kafka Server: 1.1.0 Scala 2.11 *
>>> > > >>
>>> > > >>
>>> > > >> Browsing through the code, it appears that
>>> GetOffset::isValidOffset
>>> > > >> should be able to catch the exception OffsetOutOfRangeException
>>> and
>>> > > >> convert it to a false value. But it appears that this not
>>> happening.
>>> > > Could
>>> > > >> there be a mismatch in package of the Exception? This class is
>>> > catching
>>> > > the
>>> > > >> exception import kafka.common.OffsetOutOfRangeException, but from
>>> > logs,
>>> > > >> it appears that the package of this class different. Could this
>>> be the
>>> > > >> reason?
>>> > > >>
>>> > > >>  def isValidOffset(consumer: DefaultFetchSimpleConsumer,
>>> > > >>> topicAndPartition: TopicAndPartition, offset: String) = {
>>> > > >>
>>> > > >>     info("Validating offset %s for topic and partition %s" format
>>> > > >>> (offset, topicAndPartition))
>>> > > >>
>>> > > >>     try {
>>> > > >>
>>> > > >>       val messages = consumer.defaultFetch((topicAndPartition,
>>> > > >>> offset.toLong))
>>> > > >>
>>> > > >>       if (messages.hasError) {
>>> > > >>
>>> > > >>
>>> > > >>> KafkaUtil.maybeThrowException(messages.error(
>>> > topicAndPartition.topic,
>>> > > >>> topicAndPartition.partition).exception())
>>> > > >>
>>> > > >>       }
>>> > > >>
>>> > > >>       info("Able to successfully read from offset %s for topic and
>>> > > >>> partition %s. Using it to instantiate consumer." format (offset,
>>> > > >>> topicAndPartition))
>>> > > >>
>>> > > >>       true
>>> > > >>
>>> > > >>     } catch {
>>> > > >>
>>> > > >>       case e: OffsetOutOfRangeException => false
>>> > > >>
>>> > > >>     }
>>> > > >>
>>> > > >>   }
>>> > > >>
>>> > > >>
>>> > > >> Also, it Appears that BrokerProxy class - the caller of GetOffset
>>> > would
>>> > > >> print a log ("*It appears that...*") in case it gets a false
>>> value,
>>> > but
>>> > > >> it is not logging this line (indicating that some Exception
>>> generated
>>> > in
>>> > > >> GetOffset method is going uncaught and being propagated up):
>>> > > >>
>>> > > >>
>>> > > >>   def addTopicPartition(tp: TopicAndPartition, nextOffset:
>>> > > >>> Option[String]) = {
>>> > > >>
>>> > > >>     debug("Adding new topic and partition %s to queue for %s"
>>> format
>>> > > (tp,
>>> > > >>> host))
>>> > > >>
>>> > > >>     if (nextOffsets.asJava.containsKey(tp)) {
>>> > > >>
>>> > > >>       toss("Already consuming TopicPartition %s" format tp)
>>> > > >>
>>> > > >>     }
>>> > > >>
>>> > > >>     val offset = if (nextOffset.isDefined &&
>>> > > >>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get))
>>> {
>>> > > >>
>>> > > >>       nextOffset
>>> > > >>
>>> > > >>         .get
>>> > > >>
>>> > > >>         .toLong
>>> > > >>
>>> > > >>     } else {
>>> > > >>
>>> > > >>       warn("It appears that we received an invalid or empty
>>> offset %s
>>> > > for
>>> > > >>> %s. Attempting to use Kafka's auto.offset.reset setting. This can
>>> > > result in
>>> > > >>> data loss if processing continues." format (nextOffset, tp))
>>> > > >>
>>> > > >>       offsetGetter.getResetOffset(simpleConsumer, tp)
>>> > > >>
>>> > > >>     }
>>> > > >>
>>> > > >>     debug("Got offset %s for new topic and partition %s." format
>>> > > (offset,
>>> > > >>> tp))
>>> > > >>
>>> > > >>     nextOffsets += tp -> offset
>>> > > >>
>>> > > >>     metrics.topicPartitions.get((host,
>>> port)).set(nextOffsets.size)
>>> > > >>
>>> > > >>   }
>>> > > >>
>>> > > >>
>>> > > >> *Could this be due to mismatch in Kafka client library version
>>> that we
>>> > > >> are using? Is there are commended Kafka client version we should
>>> use
>>> > > with
>>> > > >> Samza 0.14.1 (assuming that Kafka server is 1.x)?*
>>> > > >> Any help regarding this will be greatly appreciated.
>>> > > >>
>>> > > >>
>>> > > >> - -
>>> > > >> thanks,
>>> > > >> gaurav
>>> > > >>
>>> > > >>
>>> > >
>>> >
>>>
>>

Reply via email to