This also seems somewhat related to the mail on this group a few days back - with subject 'Messages lost after broker failure'.
If someone had set auto.offset.reset to largest, then reverse would happen - i.e samza skipping over kafka partition queue in face of such failures. On Tue, May 2, 2017 at 9:17 AM, Gaurav Agarwal <gauravagarw...@gmail.com> wrote: > Hi All, > > We recently observed an issue in our Samza application (version > 0.12.0) where we found that that the message offsets "jumped back" causing > many of the messages to be re-processed. > > On digging deeper - here is what we found: > > - There was a some network related issues at that time causing temporary > communication losses between nodes. > - BrokerProxy tried to addTopicPartition() and while doing that, its > isValidOffset() method failed. > - This caused it to read the reset the offset using the configured > setting - which we have set to auto.offset.reset=smallest > > Now, the validation check simply tries to fetch the message at the given > offset and if there is an exception while doing it, it assumes that the > offset is invalid. I think that the exception ReplicaNotAvailableException is > being considered harmless, but I am not sure what is the expected behavior > in case of any other exception. What about network related exceptions (that > are transient)? > > I am pasting below the logs that show a *simple network error,* and > immediately after that the message that Samza thinks that the offset was > invalid and hence it is falling-back to reset. > > Is there any workaround to get past this problem? I would have thought > that ideally, there could be only handful of well-known error codes that > would indicate that the offset into a kafka topic/partition is invalid. > > -- > thanks, > gaurav > > > INFO [2017-05-01 15:19:27,327] [U:1,898,F:1,159,T:3,056,M:3,056] > system.kafka.DefaultFetchSimpleConsumer:[Logging$class:info:76] - > [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for > client samza_consumer-job4-1] - Reconnect due to error: > java.io.EOFException > at org.apache.kafka.common.network.NetworkReceive.readFromReada > bleChannel(NetworkReceive.java:83) > at kafka.network.BlockingChannel.readCompletely(BlockingChannel > .scala:129) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala: > 120) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.s > cala:86) > at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ > $sendRequest(SimpleConsumer.scala:83) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl > y$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl > y$mcV$sp$1.apply(SimpleConsumer.scala:132) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl > y$mcV$sp$1.apply(SimpleConsumer.scala:132) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( > SimpleConsumer.scala:131) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC > onsumer.scala:131) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC > onsumer.scala:131) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) > at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fet > ch(DefaultFetchSimpleConsumer.scala:48) > at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.def > aultFetch(DefaultFetchSimpleConsumer.scala:41) > at org.apache.samza.system.kafka.GetOffset.isValidOffset(GetOff > set.scala:60) > at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition( > BrokerProxy.scala:99) > at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r > efreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:213) > at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r > efreshBrokers$2.apply(KafkaSystemConsumer.scala:226) > at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r > efreshBrokers$2.apply(KafkaSystemConsumer.scala:192) > at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti > alSleepStrategy.scala:82) > at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBro > kers(KafkaSystemConsumer.scala:191) > at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.ab > dicate(KafkaSystemConsumer.scala:293) > at org.apache.samza.system.kafka.BrokerProxy.abdicate(BrokerPro > xy.scala:207) > at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr > ors$2.apply(BrokerProxy.scala:245) > at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr > ors$2.apply(BrokerProxy.scala:245) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) > at org.apache.samza.system.kafka.BrokerProxy.handleErrors(Broke > rProxy.scala:245) > at org.apache.samza.system.kafka.BrokerProxy.org > <http://org.apache.samza.system.kafka.brokerproxy.org/>$apache$ > samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:186) > at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r > un$1.apply(BrokerProxy.scala:147) > at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r > un$1.apply(BrokerProxy.scala:134) > at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti > alSleepStrategy.scala:82) > at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(Broker > Proxy.scala:133) > at java.lang.Thread.run(Thread.java:745) > > WARN [2017-05-01 15:19:27,378] [U:1,904,F:1,153,T:3,056,M:3,056] > system.kafka.BrokerProxy:[Logging$class:warn:74] - > [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for > client samza_consumer-job4-1] - It appears that we received an invalid or > empty offset Some(45039137) for [Topic3,17]. Attempting to use Kafka's > auto.offset.reset setting. This can result in data loss if processing > continues. > INFO [2017-05-01 15:19:27,398] [U:1,904,F:1,152,T:3,056,M:3,056] > system.kafka.KafkaSystemConsumer:[Logging$class:info:63] - > [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform3:9092 for > client samza_consumer-job4-1] - Refreshing brokers for: Map([Topic3,17] -> > 45039137) > INFO [2017-05-01 15:19:27,405] [U:1,904,F:1,152,T:3,056,M:3,056] > system.kafka.GetOffset:[Logging$class:info:63] - > [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for > client samza_consumer-job4-1] - Checking if auto.offset.reset is defined > for topic Topic3 > INFO [2017-05-01 15:19:27,432] [U:1,904,F:1,152,T:3,056,M:3,056] > system.kafka.GetOffset:[Logging$class:info:63] - > [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for > client samza_consumer-job4-1] - Got reset of type smallest. > > def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { > debug("Adding new topic and partition %s to queue for %s" format (tp, host)) > if (nextOffsets.containsKey(tp)) { > toss("Already consuming TopicPartition %s" format tp) > } > > val offset = if (nextOffset.isDefined && > offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) { > nextOffset > .get > .toLong > } else { > warn("It appears that we received an invalid or empty offset %s for %s. > Attempting to use Kafka's auto.offset.reset setting. This can result in data > loss if processing continues." format (nextOffset, tp)) > offsetGetter.getResetOffset(simpleConsumer, tp) > } > > debug("Got offset %s for new topic and partition %s." format (offset, tp)) > nextOffsets += tp -> offset > metrics.topicPartitions(host, port).set(nextOffsets.size) > } > > def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: > TopicAndPartition, offset: String) = { > info("Validating offset %s for topic and partition %s" format (offset, > topicAndPartition)) > > try { > val messages = consumer.defaultFetch((topicAndPartition, offset.toLong)) > > if (messages.hasError) { > > KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, > topicAndPartition.partition)) > } > > info("Able to successfully read from offset %s for topic and partition > %s. Using it to instantiate consumer." format (offset, topicAndPartition)) > > true > } catch { > case e: OffsetOutOfRangeException => false > } > } > >