Some more logs from Kakja WARN [2017-05-01 15:21:19,132] kafka.server.ReplicaFetcherThread:[Logging$class:warn:83] - [ReplicaFetcherThread-0-3] - [ReplicaFetcherThread-0-3], Replica 0 for partition [Topic3,17] reset its fetch offset from 45039137 to current leader 3's latest offset 45039132
INFO [2017-05-01 15:21:19,150] kafka.log.Log:[Logging$class:info:68] - [ReplicaFetcherThread-0-3] - Truncating log Topic3-17 to offset 45039132. ERROR [2017-05-01 15:21:19,248] kafka.server.ReplicaFetcherThread:[Logging$class:error:97] - [ReplicaFetcherThread-0-3] - [ReplicaFetcherThread-0-3], Current offset 45039137 for partition [Topic3,17] out of range; reset offset to 45039132 So it appears that Samza does not have an issue here. It was Kafka itself that went back on offsets, leading to an out-of-bounds offset query by Samza. We need to now dig a bit more into Kafka ! On Tue, May 2, 2017 at 11:35 PM, Gaurav Agarwal <gauravagarw...@gmail.com> wrote: > Looking further, the reason for this "jump back" seems not so straight > forward: > In Kafka's Simple Consumer code: > > private def sendRequest(request: RequestOrResponse): NetworkReceive = { > lock synchronized { > var response: NetworkReceive = null > try { > getOrMakeConnection() > blockingChannel.send(request) > response = blockingChannel.receive() > } catch { > case e : ClosedByInterruptException => > throw e > // Should not observe this exception when running Kafka with Java 1.8 > case e: AsynchronousCloseException => > throw e > case e : Throwable => > info("Reconnect due to error:", e) > // retry once > try { > reconnect() > blockingChannel.send(request) > response = blockingChannel.receive() > } catch { > case e: Throwable => > disconnect() > throw e > } > } > response > } > } > > > Note that first exception is being printed (which is what is logged), but > the kafka client retries and throws back whatever exception it received. It > could be that this exception is an instance of OffsetOutOfRangeException that > caused Samza to thing that the offset is invalid. > > However, I am unable to understand what could have caused this? Did Kafka > return this exception accidentally or did Samza ask for the offset that was > beyond what was present in kafka queue? > > > On Tue, May 2, 2017 at 9:31 AM, Gaurav Agarwal <gauravagarw...@gmail.com> > wrote: > >> This also seems somewhat related to the mail on this group a few days >> back - with subject 'Messages lost after broker failure'. >> >> If someone had set auto.offset.reset to largest, then reverse would >> happen - i.e samza skipping over kafka partition queue in face of such >> failures. >> >> On Tue, May 2, 2017 at 9:17 AM, Gaurav Agarwal <gauravagarw...@gmail.com> >> wrote: >> >>> Hi All, >>> >>> We recently observed an issue in our Samza application (version >>> 0.12.0) where we found that that the message offsets "jumped back" causing >>> many of the messages to be re-processed. >>> >>> On digging deeper - here is what we found: >>> >>> - There was a some network related issues at that time causing >>> temporary communication losses between nodes. >>> - BrokerProxy tried to addTopicPartition() and while doing that, its >>> isValidOffset() method failed. >>> - This caused it to read the reset the offset using the configured >>> setting - which we have set to auto.offset.reset=smallest >>> >>> Now, the validation check simply tries to fetch the message at the given >>> offset and if there is an exception while doing it, it assumes that the >>> offset is invalid. I think that the exception ReplicaNotAvailableE >>> xception is being considered harmless, but I am not sure what is the >>> expected behavior in case of any other exception. What about network >>> related exceptions (that are transient)? >>> >>> I am pasting below the logs that show a *simple network error,* and >>> immediately after that the message that Samza thinks that the offset was >>> invalid and hence it is falling-back to reset. >>> >>> Is there any workaround to get past this problem? I would have thought >>> that ideally, there could be only handful of well-known error codes that >>> would indicate that the offset into a kafka topic/partition is invalid. >>> >>> -- >>> thanks, >>> gaurav >>> >>> >>> INFO [2017-05-01 15:19:27,327] [U:1,898,F:1,159,T:3,056,M:3,056] >>> system.kafka.DefaultFetchSimpleConsumer:[Logging$class:info:76] - >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for >>> client samza_consumer-job4-1] - Reconnect due to error: >>> java.io.EOFException >>> at org.apache.kafka.common.network.NetworkReceive.readFromReada >>> bleChannel(NetworkReceive.java:83) >>> at kafka.network.BlockingChannel.readCompletely(BlockingChannel >>> .scala:129) >>> at kafka.network.BlockingChannel.receive(BlockingChannel.scala: >>> 120) >>> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.s >>> cala:86) >>> at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ >>> $sendRequest(SimpleConsumer.scala:83) >>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl >>> y$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) >>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl >>> y$mcV$sp$1.apply(SimpleConsumer.scala:132) >>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl >>> y$mcV$sp$1.apply(SimpleConsumer.scala:132) >>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( >>> SimpleConsumer.scala:131) >>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC >>> onsumer.scala:131) >>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC >>> onsumer.scala:131) >>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) >>> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fet >>> ch(DefaultFetchSimpleConsumer.scala:48) >>> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.def >>> aultFetch(DefaultFetchSimpleConsumer.scala:41) >>> at org.apache.samza.system.kafka.GetOffset.isValidOffset(GetOff >>> set.scala:60) >>> at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition( >>> BrokerProxy.scala:99) >>> at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r >>> efreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:213) >>> at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r >>> efreshBrokers$2.apply(KafkaSystemConsumer.scala:226) >>> at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r >>> efreshBrokers$2.apply(KafkaSystemConsumer.scala:192) >>> at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti >>> alSleepStrategy.scala:82) >>> at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBro >>> kers(KafkaSystemConsumer.scala:191) >>> at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.ab >>> dicate(KafkaSystemConsumer.scala:293) >>> at org.apache.samza.system.kafka.BrokerProxy.abdicate(BrokerPro >>> xy.scala:207) >>> at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr >>> ors$2.apply(BrokerProxy.scala:245) >>> at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr >>> ors$2.apply(BrokerProxy.scala:245) >>> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) >>> at org.apache.samza.system.kafka.BrokerProxy.handleErrors(Broke >>> rProxy.scala:245) >>> at org.apache.samza.system.kafka.BrokerProxy.org >>> <http://org.apache.samza.system.kafka.brokerproxy.org/>$apache$samz >>> a$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:186) >>> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r >>> un$1.apply(BrokerProxy.scala:147) >>> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r >>> un$1.apply(BrokerProxy.scala:134) >>> at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti >>> alSleepStrategy.scala:82) >>> at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(Broker >>> Proxy.scala:133) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> WARN [2017-05-01 15:19:27,378] [U:1,904,F:1,153,T:3,056,M:3,056] >>> system.kafka.BrokerProxy:[Logging$class:warn:74] - >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for >>> client samza_consumer-job4-1] - It appears that we received an invalid or >>> empty offset Some(45039137) for [Topic3,17]. Attempting to use Kafka's >>> auto.offset.reset setting. This can result in data loss if processing >>> continues. >>> INFO [2017-05-01 15:19:27,398] [U:1,904,F:1,152,T:3,056,M:3,056] >>> system.kafka.KafkaSystemConsumer:[Logging$class:info:63] - >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform3:9092 for >>> client samza_consumer-job4-1] - Refreshing brokers for: Map([Topic3,17] -> >>> 45039137) >>> INFO [2017-05-01 15:19:27,405] [U:1,904,F:1,152,T:3,056,M:3,056] >>> system.kafka.GetOffset:[Logging$class:info:63] - >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for >>> client samza_consumer-job4-1] - Checking if auto.offset.reset is defined >>> for topic Topic3 >>> INFO [2017-05-01 15:19:27,432] [U:1,904,F:1,152,T:3,056,M:3,056] >>> system.kafka.GetOffset:[Logging$class:info:63] - >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for >>> client samza_consumer-job4-1] - Got reset of type smallest. >>> >>> def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { >>> debug("Adding new topic and partition %s to queue for %s" format (tp, >>> host)) >>> if (nextOffsets.containsKey(tp)) { >>> toss("Already consuming TopicPartition %s" format tp) >>> } >>> >>> val offset = if (nextOffset.isDefined && >>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) { >>> nextOffset >>> .get >>> .toLong >>> } else { >>> warn("It appears that we received an invalid or empty offset %s for %s. >>> Attempting to use Kafka's auto.offset.reset setting. This can result in >>> data loss if processing continues." format (nextOffset, tp)) >>> offsetGetter.getResetOffset(simpleConsumer, tp) >>> } >>> >>> debug("Got offset %s for new topic and partition %s." format (offset, tp)) >>> nextOffsets += tp -> offset >>> metrics.topicPartitions(host, port).set(nextOffsets.size) >>> } >>> >>> def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: >>> TopicAndPartition, offset: String) = { >>> info("Validating offset %s for topic and partition %s" format (offset, >>> topicAndPartition)) >>> >>> try { >>> val messages = consumer.defaultFetch((topicAndPartition, offset.toLong)) >>> >>> if (messages.hasError) { >>> >>> KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, >>> topicAndPartition.partition)) >>> } >>> >>> info("Able to successfully read from offset %s for topic and partition >>> %s. Using it to instantiate consumer." format (offset, topicAndPartition)) >>> >>> true >>> } catch { >>> case e: OffsetOutOfRangeException => false >>> } >>> } >>> >>> >> >