Looking further, the reason for this "jump back" seems not so straight
forward:
In Kafka's Simple Consumer code:

private def sendRequest(request: RequestOrResponse): NetworkReceive = {
  lock synchronized {
    var response: NetworkReceive = null
    try {
      getOrMakeConnection()
      blockingChannel.send(request)
      response = blockingChannel.receive()
    } catch {
      case e : ClosedByInterruptException =>
        throw e
      // Should not observe this exception when running Kafka with Java 1.8
      case e: AsynchronousCloseException =>
        throw e
      case e : Throwable =>
        info("Reconnect due to error:", e)
        // retry once
        try {
          reconnect()
          blockingChannel.send(request)
          response = blockingChannel.receive()
        } catch {
          case e: Throwable =>
            disconnect()
            throw e
        }
    }
    response
  }
}


Note that first exception is being printed (which is what is logged), but
the kafka client retries and throws back whatever exception it received. It
could be that this exception is an instance of  OffsetOutOfRangeException that
caused Samza to thing that the offset is invalid.

However, I am unable to understand what could have caused this? Did Kafka
return this exception accidentally or did Samza ask for the offset that was
beyond what was present in kafka queue?


On Tue, May 2, 2017 at 9:31 AM, Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> This also seems somewhat related to the mail on this group a few days back
> - with subject 'Messages lost after broker failure'.
>
> If someone had set auto.offset.reset to largest, then reverse would
> happen - i.e samza skipping over kafka partition queue in face of such
> failures.
>
> On Tue, May 2, 2017 at 9:17 AM, Gaurav Agarwal <gauravagarw...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> We recently observed an issue in our Samza application (version
>> 0.12.0) where we found that that the message offsets "jumped back" causing
>> many of the messages to be re-processed.
>>
>> On digging deeper - here is what we found:
>>
>> -  There was a some network related issues at that time causing temporary
>> communication losses between nodes.
>> -  BrokerProxy tried to addTopicPartition() and while doing that, its
>> isValidOffset() method failed.
>>  - This caused it to read the reset the offset using the configured
>> setting - which we have set to auto.offset.reset=smallest
>>
>> Now, the validation check simply tries to fetch the message at the given
>> offset and if there is an exception while doing it, it assumes that the
>> offset is invalid. I think that the exception ReplicaNotAvailableE
>> xception is being considered harmless, but I am not sure what is the
>> expected behavior in case of any other exception. What about network
>> related exceptions (that are transient)?
>>
>> I am pasting below the logs that show a *simple network error,* and
>> immediately after that the message that Samza thinks that the offset was
>> invalid and hence it is falling-back to reset.
>>
>> Is there any workaround to get past this problem? I would have thought
>> that ideally, there could be only handful of well-known error codes that
>> would indicate that the offset into a kafka topic/partition is invalid.
>>
>> --
>> thanks,
>> gaurav
>>
>>
>> INFO [2017-05-01 15:19:27,327] [U:1,898,F:1,159,T:3,056,M:3,056]
>> system.kafka.DefaultFetchSimpleConsumer:[Logging$class:info:76] -
>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
>> client samza_consumer-job4-1] - Reconnect due to error:
>> java.io.EOFException
>>         at org.apache.kafka.common.network.NetworkReceive.readFromReada
>> bleChannel(NetworkReceive.java:83)
>>         at kafka.network.BlockingChannel.readCompletely(BlockingChannel
>> .scala:129)
>>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:
>> 120)
>>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.s
>> cala:86)
>>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> $sendRequest(SimpleConsumer.scala:83)
>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
>> y$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
>> y$mcV$sp$1.apply(SimpleConsumer.scala:132)
>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
>> y$mcV$sp$1.apply(SimpleConsumer.scala:132)
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> SimpleConsumer.scala:131)
>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC
>> onsumer.scala:131)
>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC
>> onsumer.scala:131)
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
>>         at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fet
>> ch(DefaultFetchSimpleConsumer.scala:48)
>>         at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.def
>> aultFetch(DefaultFetchSimpleConsumer.scala:41)
>>         at org.apache.samza.system.kafka.GetOffset.isValidOffset(GetOff
>> set.scala:60)
>>         at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(
>> BrokerProxy.scala:99)
>>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
>> efreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:213)
>>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
>> efreshBrokers$2.apply(KafkaSystemConsumer.scala:226)
>>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
>> efreshBrokers$2.apply(KafkaSystemConsumer.scala:192)
>>         at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti
>> alSleepStrategy.scala:82)
>>         at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBro
>> kers(KafkaSystemConsumer.scala:191)
>>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.ab
>> dicate(KafkaSystemConsumer.scala:293)
>>         at org.apache.samza.system.kafka.BrokerProxy.abdicate(BrokerPro
>> xy.scala:207)
>>         at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr
>> ors$2.apply(BrokerProxy.scala:245)
>>         at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr
>> ors$2.apply(BrokerProxy.scala:245)
>>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>>         at org.apache.samza.system.kafka.BrokerProxy.handleErrors(Broke
>> rProxy.scala:245)
>>         at org.apache.samza.system.kafka.BrokerProxy.org
>> <http://org.apache.samza.system.kafka.brokerproxy.org/>$apache$samz
>> a$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:186)
>>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r
>> un$1.apply(BrokerProxy.scala:147)
>>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r
>> un$1.apply(BrokerProxy.scala:134)
>>         at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti
>> alSleepStrategy.scala:82)
>>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(Broker
>> Proxy.scala:133)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> WARN [2017-05-01 15:19:27,378] [U:1,904,F:1,153,T:3,056,M:3,056]
>> system.kafka.BrokerProxy:[Logging$class:warn:74] -
>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
>> client samza_consumer-job4-1] - It appears that we received an invalid or
>> empty offset Some(45039137) for [Topic3,17]. Attempting to use Kafka's
>> auto.offset.reset setting. This can result in data loss if processing
>> continues.
>> INFO [2017-05-01 15:19:27,398] [U:1,904,F:1,152,T:3,056,M:3,056]
>> system.kafka.KafkaSystemConsumer:[Logging$class:info:63] -
>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform3:9092 for
>> client samza_consumer-job4-1] - Refreshing brokers for: Map([Topic3,17] ->
>> 45039137)
>> INFO [2017-05-01 15:19:27,405] [U:1,904,F:1,152,T:3,056,M:3,056]
>> system.kafka.GetOffset:[Logging$class:info:63] -
>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
>> client samza_consumer-job4-1] - Checking if auto.offset.reset is defined
>> for topic Topic3
>> INFO [2017-05-01 15:19:27,432] [U:1,904,F:1,152,T:3,056,M:3,056]
>> system.kafka.GetOffset:[Logging$class:info:63] -
>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
>> client samza_consumer-job4-1] - Got reset of type smallest.
>>
>> def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
>>   debug("Adding new topic and partition %s to queue for %s" format (tp, 
>> host))
>>   if (nextOffsets.containsKey(tp)) {
>>     toss("Already consuming TopicPartition %s" format tp)
>>   }
>>
>>   val offset = if (nextOffset.isDefined && 
>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
>>     nextOffset
>>       .get
>>       .toLong
>>   } else {
>>     warn("It appears that we received an invalid or empty offset %s for %s. 
>> Attempting to use Kafka's auto.offset.reset setting. This can result in data 
>> loss if processing continues." format (nextOffset, tp))
>>     offsetGetter.getResetOffset(simpleConsumer, tp)
>>   }
>>
>>   debug("Got offset %s for new topic and partition %s." format (offset, tp))
>>   nextOffsets += tp -> offset
>>   metrics.topicPartitions(host, port).set(nextOffsets.size)
>> }
>>
>> def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: 
>> TopicAndPartition, offset: String) = {
>>   info("Validating offset %s for topic and partition %s" format (offset, 
>> topicAndPartition))
>>
>>   try {
>>     val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
>>
>>     if (messages.hasError) {
>>       
>> KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, 
>> topicAndPartition.partition))
>>     }
>>
>>     info("Able to successfully read from offset %s for topic and partition 
>> %s. Using it to instantiate consumer." format (offset, topicAndPartition))
>>
>>     true
>>   } catch {
>>     case e: OffsetOutOfRangeException => false
>>   }
>> }
>>
>>
>

Reply via email to