This also seems somewhat related to the mail on this group a few days back
- with subject 'Messages lost after broker failure'.

If someone had set auto.offset.reset to largest, then reverse would happen
- i.e samza skipping over kafka partition queue in face of such failures.

On Tue, May 2, 2017 at 9:17 AM, Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> Hi All,
>
> We recently observed an issue in our Samza application (version
> 0.12.0) where we found that that the message offsets "jumped back" causing
> many of the messages to be re-processed.
>
> On digging deeper - here is what we found:
>
> -  There was a some network related issues at that time causing temporary
> communication losses between nodes.
> -  BrokerProxy tried to addTopicPartition() and while doing that, its
> isValidOffset() method failed.
>  - This caused it to read the reset the offset using the configured
> setting - which we have set to auto.offset.reset=smallest
>
> Now, the validation check simply tries to fetch the message at the given
> offset and if there is an exception while doing it, it assumes that the
> offset is invalid. I think that the exception ReplicaNotAvailableException is
> being considered harmless, but I am not sure what is the expected behavior
> in case of any other exception. What about network related exceptions (that
> are transient)?
>
> I am pasting below the logs that show a *simple network error,* and
> immediately after that the message that Samza thinks that the offset was
> invalid and hence it is falling-back to reset.
>
> Is there any workaround to get past this problem? I would have thought
> that ideally, there could be only handful of well-known error codes that
> would indicate that the offset into a kafka topic/partition is invalid.
>
> --
> thanks,
> gaurav
>
>
> INFO [2017-05-01 15:19:27,327] [U:1,898,F:1,159,T:3,056,M:3,056]
> system.kafka.DefaultFetchSimpleConsumer:[Logging$class:info:76] -
> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
> client samza_consumer-job4-1] - Reconnect due to error:
> java.io.EOFException
>         at org.apache.kafka.common.network.NetworkReceive.readFromReada
> bleChannel(NetworkReceive.java:83)
>         at kafka.network.BlockingChannel.readCompletely(BlockingChannel
> .scala:129)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:
> 120)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.s
> cala:86)
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> $sendRequest(SimpleConsumer.scala:83)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
> y$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
> y$mcV$sp$1.apply(SimpleConsumer.scala:132)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
> y$mcV$sp$1.apply(SimpleConsumer.scala:132)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> SimpleConsumer.scala:131)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC
> onsumer.scala:131)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC
> onsumer.scala:131)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
>         at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fet
> ch(DefaultFetchSimpleConsumer.scala:48)
>         at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.def
> aultFetch(DefaultFetchSimpleConsumer.scala:41)
>         at org.apache.samza.system.kafka.GetOffset.isValidOffset(GetOff
> set.scala:60)
>         at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(
> BrokerProxy.scala:99)
>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
> efreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:213)
>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
> efreshBrokers$2.apply(KafkaSystemConsumer.scala:226)
>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
> efreshBrokers$2.apply(KafkaSystemConsumer.scala:192)
>         at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti
> alSleepStrategy.scala:82)
>         at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBro
> kers(KafkaSystemConsumer.scala:191)
>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.ab
> dicate(KafkaSystemConsumer.scala:293)
>         at org.apache.samza.system.kafka.BrokerProxy.abdicate(BrokerPro
> xy.scala:207)
>         at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr
> ors$2.apply(BrokerProxy.scala:245)
>         at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr
> ors$2.apply(BrokerProxy.scala:245)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at org.apache.samza.system.kafka.BrokerProxy.handleErrors(Broke
> rProxy.scala:245)
>         at org.apache.samza.system.kafka.BrokerProxy.org
> <http://org.apache.samza.system.kafka.brokerproxy.org/>$apache$
> samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:186)
>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r
> un$1.apply(BrokerProxy.scala:147)
>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r
> un$1.apply(BrokerProxy.scala:134)
>         at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti
> alSleepStrategy.scala:82)
>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(Broker
> Proxy.scala:133)
>         at java.lang.Thread.run(Thread.java:745)
>
> WARN [2017-05-01 15:19:27,378] [U:1,904,F:1,153,T:3,056,M:3,056]
> system.kafka.BrokerProxy:[Logging$class:warn:74] -
> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
> client samza_consumer-job4-1] - It appears that we received an invalid or
> empty offset Some(45039137) for [Topic3,17]. Attempting to use Kafka's
> auto.offset.reset setting. This can result in data loss if processing
> continues.
> INFO [2017-05-01 15:19:27,398] [U:1,904,F:1,152,T:3,056,M:3,056]
> system.kafka.KafkaSystemConsumer:[Logging$class:info:63] -
> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform3:9092 for
> client samza_consumer-job4-1] - Refreshing brokers for: Map([Topic3,17] ->
> 45039137)
> INFO [2017-05-01 15:19:27,405] [U:1,904,F:1,152,T:3,056,M:3,056]
> system.kafka.GetOffset:[Logging$class:info:63] -
> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
> client samza_consumer-job4-1] - Checking if auto.offset.reset is defined
> for topic Topic3
> INFO [2017-05-01 15:19:27,432] [U:1,904,F:1,152,T:3,056,M:3,056]
> system.kafka.GetOffset:[Logging$class:info:63] -
> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
> client samza_consumer-job4-1] - Got reset of type smallest.
>
> def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
>   debug("Adding new topic and partition %s to queue for %s" format (tp, host))
>   if (nextOffsets.containsKey(tp)) {
>     toss("Already consuming TopicPartition %s" format tp)
>   }
>
>   val offset = if (nextOffset.isDefined && 
> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
>     nextOffset
>       .get
>       .toLong
>   } else {
>     warn("It appears that we received an invalid or empty offset %s for %s. 
> Attempting to use Kafka's auto.offset.reset setting. This can result in data 
> loss if processing continues." format (nextOffset, tp))
>     offsetGetter.getResetOffset(simpleConsumer, tp)
>   }
>
>   debug("Got offset %s for new topic and partition %s." format (offset, tp))
>   nextOffsets += tp -> offset
>   metrics.topicPartitions(host, port).set(nextOffsets.size)
> }
>
> def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: 
> TopicAndPartition, offset: String) = {
>   info("Validating offset %s for topic and partition %s" format (offset, 
> topicAndPartition))
>
>   try {
>     val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
>
>     if (messages.hasError) {
>       
> KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, 
> topicAndPartition.partition))
>     }
>
>     info("Able to successfully read from offset %s for topic and partition 
> %s. Using it to instantiate consumer." format (offset, topicAndPartition))
>
>     true
>   } catch {
>     case e: OffsetOutOfRangeException => false
>   }
> }
>
>

Reply via email to