Hi, all, Thanks for digging to the roots of it! The patch is merged and submitted. Please let us know whether that fixes your problem.
Please also note that SAMZA-1776 is going to deprecate the old consumer lib from Samza code base soon. Thanks! -Yi On Sat, Aug 25, 2018 at 5:34 AM, Debraj Manna <subharaj.ma...@gmail.com> wrote: > Uploaded Gaurav Agarwala's patch as discussed in this email thread to > https://issues.apache.org/jira/browse/SAMZA-1822 . > > On Sat, Aug 25, 2018 at 4:21 PM Debraj Manna <subharaj.ma...@gmail.com> > wrote: > >> https://issues.apache.org/jira/browse/SAMZA-1822 is filed for this. >> >> On Sat, Aug 25, 2018 at 5:56 AM Yi Pan <nickpa...@gmail.com> wrote: >> >>> Hi, Gaurav, >>> >>> Thanks for working on the patch for the problem. Could you open a ticket >>> and PR for the change? The dev mailing list stripping off all attachments >>> and is hard to follow if the change is not embedded as text in the email. >>> >>> And to Debraj's comment, yes, we are aware of it and are working on >>> removing the old consumer usage in Samza code base as the coming release >>> ( >>> https://issues.apache.org/jira/browse/SAMZA-1776). >>> >>> Thank you all! >>> >>> On Fri, Aug 24, 2018 at 4:33 AM, Debraj Manna <subharaj.ma...@gmail.com> >>> wrote: >>> >>> > It seems the above issue is coming because of KIP-35 >>> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> > 109%3A+Old+Consumer+Deprecation> >>> > & >>> > its related PR <https://github.com/apache/kafka/pull/2328> (KAFKA-3264 >>> > <https://issues.apache.org/jira/browse/KAFKA-3264>). Filed >>> > https://issues.apache.org/jira/browse/SAMZA-1822 for this. >>> > >>> > On Fri, Aug 24, 2018 at 2:39 PM Gaurav Agarwal < >>> gauravagarw...@gmail.com> >>> > wrote: >>> > >>> > > Hi All, >>> > > >>> > > By patching the samza codebase locally that this error goes away: >>> > > Patch involves changing the import for OffsetOutOfRangeException >>> class in >>> > > file >>> > > >>> > > *samza/samza-kafka/src/main/scala/org/apache/samza/system/ >>> > kafka/GetOffset.scala* >>> > > to *import org.apache.kafka.common.errors.OffsetOutOfRangeException* >>> > > >>> > > Can you please confirm if this change is good? And if so can a quick >>> > patch >>> > > release with it be made available? >>> > > >>> > > Independently, does this release needs to be verified for any more >>> such >>> > > similar errors (possibly due to change in class packages etc.)? Not >>> > trying >>> > > to cast aspersions on this release, but just trying to ask the next >>> thing >>> > > that naturally comes to mind :-) >>> > > >>> > > -- >>> > > thanks, >>> > > gaurav >>> > > >>> > > >>> > > On Thu, Aug 23, 2018 at 7:06 PM Gaurav Agarwal < >>> gauravagarw...@gmail.com >>> > > >>> > > wrote: >>> > > >>> > > > Few more notes (based on reading a similar thread from few days >>> ago): >>> > > > - this exception is while initializing offset for the data topic >>> > > partition >>> > > > (not samza's checkpoint topic/partition) >>> > > > - we have manually verified that due to some issue, kafka data-logs >>> > have >>> > > > rolled over and the earliest available offset is greater than what >>> > samza >>> > > > has in its checkpoint - and hence when samza is querying kafka >>> with the >>> > > > offset it checkpointed last, it is seeing this error. >>> > > > >>> > > > Please let me know if more logs are required. >>> > > > >>> > > > >>> > > > On Thu, Aug 23, 2018 at 4:30 PM Gaurav Agarwal < >>> > gauravagarw...@gmail.com >>> > > > >>> > > > wrote: >>> > > > >>> > > >> Hi All, >>> > > >> >>> > > >> We are facing identical problem as described in thread >>> > > >> https://www.mail-archive.com/dev@samza.apache.org/msg06740.html >>> > > >> >>> > > >> Here - Samza is requesting for an Kafka partition offset that is >>> too >>> > old >>> > > >> (i.e Kafka log has moved ahead). We are setting the property >>> > > *consumer.auto.offset.reset >>> > > >> to smallest* and therefore expecting that Samza will reset its >>> > > >> checkpoint to earliest available partition offset in such a >>> scenario. >>> > > But >>> > > >> that is not happening we are getting exceptions of this form >>> > > continually: >>> > > >> >>> > > >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658] >>> > > >> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] - >>> > > >> Disconnecting from vrni-platform-release:9092 >>> > > >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658] >>> > > >> system.kafka.GetOffset:[Logging_class:info:63] - [main] - >>> Validating >>> > > offset >>> > > >> 56443499 for topic and partition Topic3-0 >>> > > >> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658] >>> > > >> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - >>> [main] - >>> > > While >>> > > >> refreshing brokers for Topic3-0: >>> > > >> org.apache.kafka.common.errors.OffsetOutOfRangeException: The >>> > requested >>> > > >> offset is not within the range of offsets maintained by the >>> server.. >>> > > >> Retrying. >>> > > >> >>> > > >> *Version Details:* >>> > > >> >>> > > >> *Samza: 2.11-0.14.1* >>> > > >> *Kafka Client: 1.1.0 * >>> > > >> *Kafka Server: 1.1.0 Scala 2.11 * >>> > > >> >>> > > >> >>> > > >> Browsing through the code, it appears that >>> GetOffset::isValidOffset >>> > > >> should be able to catch the exception OffsetOutOfRangeException >>> and >>> > > >> convert it to a false value. But it appears that this not >>> happening. >>> > > Could >>> > > >> there be a mismatch in package of the Exception? This class is >>> > catching >>> > > the >>> > > >> exception import kafka.common.OffsetOutOfRangeException, but from >>> > logs, >>> > > >> it appears that the package of this class different. Could this >>> be the >>> > > >> reason? >>> > > >> >>> > > >> def isValidOffset(consumer: DefaultFetchSimpleConsumer, >>> > > >>> topicAndPartition: TopicAndPartition, offset: String) = { >>> > > >> >>> > > >> info("Validating offset %s for topic and partition %s" format >>> > > >>> (offset, topicAndPartition)) >>> > > >> >>> > > >> try { >>> > > >> >>> > > >> val messages = consumer.defaultFetch((topicAndPartition, >>> > > >>> offset.toLong)) >>> > > >> >>> > > >> if (messages.hasError) { >>> > > >> >>> > > >> >>> > > >>> KafkaUtil.maybeThrowException(messages.error( >>> > topicAndPartition.topic, >>> > > >>> topicAndPartition.partition).exception()) >>> > > >> >>> > > >> } >>> > > >> >>> > > >> info("Able to successfully read from offset %s for topic and >>> > > >>> partition %s. Using it to instantiate consumer." format (offset, >>> > > >>> topicAndPartition)) >>> > > >> >>> > > >> true >>> > > >> >>> > > >> } catch { >>> > > >> >>> > > >> case e: OffsetOutOfRangeException => false >>> > > >> >>> > > >> } >>> > > >> >>> > > >> } >>> > > >> >>> > > >> >>> > > >> Also, it Appears that BrokerProxy class - the caller of GetOffset >>> > would >>> > > >> print a log ("*It appears that...*") in case it gets a false >>> value, >>> > but >>> > > >> it is not logging this line (indicating that some Exception >>> generated >>> > in >>> > > >> GetOffset method is going uncaught and being propagated up): >>> > > >> >>> > > >> >>> > > >> def addTopicPartition(tp: TopicAndPartition, nextOffset: >>> > > >>> Option[String]) = { >>> > > >> >>> > > >> debug("Adding new topic and partition %s to queue for %s" >>> format >>> > > (tp, >>> > > >>> host)) >>> > > >> >>> > > >> if (nextOffsets.asJava.containsKey(tp)) { >>> > > >> >>> > > >> toss("Already consuming TopicPartition %s" format tp) >>> > > >> >>> > > >> } >>> > > >> >>> > > >> val offset = if (nextOffset.isDefined && >>> > > >>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) >>> { >>> > > >> >>> > > >> nextOffset >>> > > >> >>> > > >> .get >>> > > >> >>> > > >> .toLong >>> > > >> >>> > > >> } else { >>> > > >> >>> > > >> warn("It appears that we received an invalid or empty >>> offset %s >>> > > for >>> > > >>> %s. Attempting to use Kafka's auto.offset.reset setting. This can >>> > > result in >>> > > >>> data loss if processing continues." format (nextOffset, tp)) >>> > > >> >>> > > >> offsetGetter.getResetOffset(simpleConsumer, tp) >>> > > >> >>> > > >> } >>> > > >> >>> > > >> debug("Got offset %s for new topic and partition %s." format >>> > > (offset, >>> > > >>> tp)) >>> > > >> >>> > > >> nextOffsets += tp -> offset >>> > > >> >>> > > >> metrics.topicPartitions.get((host, >>> port)).set(nextOffsets.size) >>> > > >> >>> > > >> } >>> > > >> >>> > > >> >>> > > >> *Could this be due to mismatch in Kafka client library version >>> that we >>> > > >> are using? Is there are commended Kafka client version we should >>> use >>> > > with >>> > > >> Samza 0.14.1 (assuming that Kafka server is 1.x)?* >>> > > >> Any help regarding this will be greatly appreciated. >>> > > >> >>> > > >> >>> > > >> - - >>> > > >> thanks, >>> > > >> gaurav >>> > > >> >>> > > >> >>> > > >>> > >>> >>