Hi All, We recently observed an issue in our Samza application (version 0.12.0) where we found that that the message offsets "jumped back" causing many of the messages to be re-processed.
On digging deeper - here is what we found: - There was a some network related issues at that time causing temporary communication losses between nodes. - BrokerProxy tried to addTopicPartition() and while doing that, its isValidOffset() method failed. - This caused it to read the reset the offset using the configured setting - which we have set to auto.offset.reset=smallest Now, the validation check simply tries to fetch the message at the given offset and if there is an exception while doing it, it assumes that the offset is invalid. I think that the exception ReplicaNotAvailableException is being considered harmless, but I am not sure what is the expected behavior in case of any other exception. What about network related exceptions (that are transient)? I am pasting below the logs that show a *simple network error,* and immediately after that the message that Samza thinks that the offset was invalid and hence it is falling-back to reset. Is there any workaround to get past this problem? I would have thought that ideally, there could be only handful of well-known error codes that would indicate that the offset into a kafka topic/partition is invalid. -- thanks, gaurav INFO [2017-05-01 15:19:27,327] [U:1,898,F:1,159,T:3,056,M:3,056] system.kafka.DefaultFetchSimpleConsumer:[Logging$class:info:76] - [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for client samza_consumer-job4-1] - Reconnect due to error: java.io.EOFException at org.apache.kafka.common.network.NetworkReceive. readFromReadableChannel(NetworkReceive.java:83) at kafka.network.BlockingChannel.readCompletely( BlockingChannel.scala:129) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer. scala:86) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ $sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ apply$mcV$sp$1.apply(SimpleConsumer.scala:132) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ apply$mcV$sp$1.apply(SimpleConsumer.scala:132) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( SimpleConsumer.scala:131) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( SimpleConsumer.scala:131) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( SimpleConsumer.scala:131) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch( DefaultFetchSimpleConsumer.scala:48) at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer. defaultFetch(DefaultFetchSimpleConsumer.scala:41) at org.apache.samza.system.kafka.GetOffset.isValidOffset( GetOffset.scala:60) at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition( BrokerProxy.scala:99) at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$ refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:213) at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$ refreshBrokers$2.apply(KafkaSystemConsumer.scala:226) at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$ refreshBrokers$2.apply(KafkaSystemConsumer.scala:192) at org.apache.samza.util.ExponentialSleepStrategy.run( ExponentialSleepStrategy.scala:82) at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers( KafkaSystemConsumer.scala:191) at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1. abdicate(KafkaSystemConsumer.scala:293) at org.apache.samza.system.kafka.BrokerProxy.abdicate( BrokerProxy.scala:207) at org.apache.samza.system.kafka.BrokerProxy$$anonfun$ handleErrors$2.apply(BrokerProxy.scala:245) at org.apache.samza.system.kafka.BrokerProxy$$anonfun$ handleErrors$2.apply(BrokerProxy.scala:245) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at org.apache.samza.system.kafka.BrokerProxy.handleErrors( BrokerProxy.scala:245) at org.apache.samza.system.kafka.BrokerProxy.org <http://org.apache.samza.system.kafka.brokerproxy.org/>$apache$samza$ system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:186) at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ run$1.apply(BrokerProxy.scala:147) at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ run$1.apply(BrokerProxy.scala:134) at org.apache.samza.util.ExponentialSleepStrategy.run( ExponentialSleepStrategy.scala:82) at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run( BrokerProxy.scala:133) at java.lang.Thread.run(Thread.java:745) WARN [2017-05-01 15:19:27,378] [U:1,904,F:1,153,T:3,056,M:3,056] system.kafka.BrokerProxy:[Logging$class:warn:74] - [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for client samza_consumer-job4-1] - It appears that we received an invalid or empty offset Some(45039137) for [Topic3,17]. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues. INFO [2017-05-01 15:19:27,398] [U:1,904,F:1,152,T:3,056,M:3,056] system.kafka.KafkaSystemConsumer:[Logging$class:info:63] - [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform3:9092 for client samza_consumer-job4-1] - Refreshing brokers for: Map([Topic3,17] -> 45039137) INFO [2017-05-01 15:19:27,405] [U:1,904,F:1,152,T:3,056,M:3,056] system.kafka.GetOffset:[Logging$class:info:63] - [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for client samza_consumer-job4-1] - Checking if auto.offset.reset is defined for topic Topic3 INFO [2017-05-01 15:19:27,432] [U:1,904,F:1,152,T:3,056,M:3,056] system.kafka.GetOffset:[Logging$class:info:63] - [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for client samza_consumer-job4-1] - Got reset of type smallest. def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { debug("Adding new topic and partition %s to queue for %s" format (tp, host)) if (nextOffsets.containsKey(tp)) { toss("Already consuming TopicPartition %s" format tp) } val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) { nextOffset .get .toLong } else { warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp)) offsetGetter.getResetOffset(simpleConsumer, tp) } debug("Got offset %s for new topic and partition %s." format (offset, tp)) nextOffsets += tp -> offset metrics.topicPartitions(host, port).set(nextOffsets.size) } def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = { info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition)) try { val messages = consumer.defaultFetch((topicAndPartition, offset.toLong)) if (messages.hasError) { KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition)) } info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition)) true } catch { case e: OffsetOutOfRangeException => false } }