Some more logs from Kakja

WARN [2017-05-01 15:21:19,132]
kafka.server.ReplicaFetcherThread:[Logging$class:warn:83] -
[ReplicaFetcherThread-0-3] - [ReplicaFetcherThread-0-3], Replica 0 for
partition [Topic3,17] reset its fetch offset from 45039137 to current
leader 3's latest offset 45039132


INFO [2017-05-01 15:21:19,150] kafka.log.Log:[Logging$class:info:68] -
[ReplicaFetcherThread-0-3] - Truncating log Topic3-17 to offset 45039132.


ERROR [2017-05-01 15:21:19,248]
kafka.server.ReplicaFetcherThread:[Logging$class:error:97] -
[ReplicaFetcherThread-0-3] - [ReplicaFetcherThread-0-3], Current offset
45039137 for partition [Topic3,17] out of range; reset offset to 45039132


So it appears that Samza does not have an issue here. It was Kafka itself
that went back on offsets, leading to an out-of-bounds offset query by
Samza.


We need to now dig a bit more into Kafka !

On Tue, May 2, 2017 at 11:35 PM, Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> Looking further, the reason for this "jump back" seems not so straight
> forward:
> In Kafka's Simple Consumer code:
>
> private def sendRequest(request: RequestOrResponse): NetworkReceive = {
>   lock synchronized {
>     var response: NetworkReceive = null
>     try {
>       getOrMakeConnection()
>       blockingChannel.send(request)
>       response = blockingChannel.receive()
>     } catch {
>       case e : ClosedByInterruptException =>
>         throw e
>       // Should not observe this exception when running Kafka with Java 1.8
>       case e: AsynchronousCloseException =>
>         throw e
>       case e : Throwable =>
>         info("Reconnect due to error:", e)
>         // retry once
>         try {
>           reconnect()
>           blockingChannel.send(request)
>           response = blockingChannel.receive()
>         } catch {
>           case e: Throwable =>
>             disconnect()
>             throw e
>         }
>     }
>     response
>   }
> }
>
>
> Note that first exception is being printed (which is what is logged), but
> the kafka client retries and throws back whatever exception it received. It
> could be that this exception is an instance of  OffsetOutOfRangeException that
> caused Samza to thing that the offset is invalid.
>
> However, I am unable to understand what could have caused this? Did Kafka
> return this exception accidentally or did Samza ask for the offset that was
> beyond what was present in kafka queue?
>
>
> On Tue, May 2, 2017 at 9:31 AM, Gaurav Agarwal <gauravagarw...@gmail.com>
> wrote:
>
>> This also seems somewhat related to the mail on this group a few days
>> back - with subject 'Messages lost after broker failure'.
>>
>> If someone had set auto.offset.reset to largest, then reverse would
>> happen - i.e samza skipping over kafka partition queue in face of such
>> failures.
>>
>> On Tue, May 2, 2017 at 9:17 AM, Gaurav Agarwal <gauravagarw...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> We recently observed an issue in our Samza application (version
>>> 0.12.0) where we found that that the message offsets "jumped back" causing
>>> many of the messages to be re-processed.
>>>
>>> On digging deeper - here is what we found:
>>>
>>> -  There was a some network related issues at that time causing
>>> temporary communication losses between nodes.
>>> -  BrokerProxy tried to addTopicPartition() and while doing that, its
>>> isValidOffset() method failed.
>>>  - This caused it to read the reset the offset using the configured
>>> setting - which we have set to auto.offset.reset=smallest
>>>
>>> Now, the validation check simply tries to fetch the message at the given
>>> offset and if there is an exception while doing it, it assumes that the
>>> offset is invalid. I think that the exception ReplicaNotAvailableE
>>> xception is being considered harmless, but I am not sure what is the
>>> expected behavior in case of any other exception. What about network
>>> related exceptions (that are transient)?
>>>
>>> I am pasting below the logs that show a *simple network error,* and
>>> immediately after that the message that Samza thinks that the offset was
>>> invalid and hence it is falling-back to reset.
>>>
>>> Is there any workaround to get past this problem? I would have thought
>>> that ideally, there could be only handful of well-known error codes that
>>> would indicate that the offset into a kafka topic/partition is invalid.
>>>
>>> --
>>> thanks,
>>> gaurav
>>>
>>>
>>> INFO [2017-05-01 15:19:27,327] [U:1,898,F:1,159,T:3,056,M:3,056]
>>> system.kafka.DefaultFetchSimpleConsumer:[Logging$class:info:76] -
>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
>>> client samza_consumer-job4-1] - Reconnect due to error:
>>> java.io.EOFException
>>>         at org.apache.kafka.common.network.NetworkReceive.readFromReada
>>> bleChannel(NetworkReceive.java:83)
>>>         at kafka.network.BlockingChannel.readCompletely(BlockingChannel
>>> .scala:129)
>>>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:
>>> 120)
>>>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.s
>>> cala:86)
>>>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>>> $sendRequest(SimpleConsumer.scala:83)
>>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
>>> y$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
>>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
>>> y$mcV$sp$1.apply(SimpleConsumer.scala:132)
>>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$appl
>>> y$mcV$sp$1.apply(SimpleConsumer.scala:132)
>>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>>> SimpleConsumer.scala:131)
>>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC
>>> onsumer.scala:131)
>>>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleC
>>> onsumer.scala:131)
>>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
>>>         at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fet
>>> ch(DefaultFetchSimpleConsumer.scala:48)
>>>         at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.def
>>> aultFetch(DefaultFetchSimpleConsumer.scala:41)
>>>         at org.apache.samza.system.kafka.GetOffset.isValidOffset(GetOff
>>> set.scala:60)
>>>         at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(
>>> BrokerProxy.scala:99)
>>>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
>>> efreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:213)
>>>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
>>> efreshBrokers$2.apply(KafkaSystemConsumer.scala:226)
>>>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$r
>>> efreshBrokers$2.apply(KafkaSystemConsumer.scala:192)
>>>         at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti
>>> alSleepStrategy.scala:82)
>>>         at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBro
>>> kers(KafkaSystemConsumer.scala:191)
>>>         at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.ab
>>> dicate(KafkaSystemConsumer.scala:293)
>>>         at org.apache.samza.system.kafka.BrokerProxy.abdicate(BrokerPro
>>> xy.scala:207)
>>>         at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr
>>> ors$2.apply(BrokerProxy.scala:245)
>>>         at org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErr
>>> ors$2.apply(BrokerProxy.scala:245)
>>>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>>>         at org.apache.samza.system.kafka.BrokerProxy.handleErrors(Broke
>>> rProxy.scala:245)
>>>         at org.apache.samza.system.kafka.BrokerProxy.org
>>> <http://org.apache.samza.system.kafka.brokerproxy.org/>$apache$samz
>>> a$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:186)
>>>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r
>>> un$1.apply(BrokerProxy.scala:147)
>>>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$r
>>> un$1.apply(BrokerProxy.scala:134)
>>>         at org.apache.samza.util.ExponentialSleepStrategy.run(Exponenti
>>> alSleepStrategy.scala:82)
>>>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(Broker
>>> Proxy.scala:133)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> WARN [2017-05-01 15:19:27,378] [U:1,904,F:1,153,T:3,056,M:3,056]
>>> system.kafka.BrokerProxy:[Logging$class:warn:74] -
>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
>>> client samza_consumer-job4-1] - It appears that we received an invalid or
>>> empty offset Some(45039137) for [Topic3,17]. Attempting to use Kafka's
>>> auto.offset.reset setting. This can result in data loss if processing
>>> continues.
>>> INFO [2017-05-01 15:19:27,398] [U:1,904,F:1,152,T:3,056,M:3,056]
>>> system.kafka.KafkaSystemConsumer:[Logging$class:info:63] -
>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform3:9092 for
>>> client samza_consumer-job4-1] - Refreshing brokers for: Map([Topic3,17] ->
>>> 45039137)
>>> INFO [2017-05-01 15:19:27,405] [U:1,904,F:1,152,T:3,056,M:3,056]
>>> system.kafka.GetOffset:[Logging$class:info:63] -
>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
>>> client samza_consumer-job4-1] - Checking if auto.offset.reset is defined
>>> for topic Topic3
>>> INFO [2017-05-01 15:19:27,432] [U:1,904,F:1,152,T:3,056,M:3,056]
>>> system.kafka.GetOffset:[Logging$class:info:63] -
>>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
>>> client samza_consumer-job4-1] - Got reset of type smallest.
>>>
>>> def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
>>>   debug("Adding new topic and partition %s to queue for %s" format (tp, 
>>> host))
>>>   if (nextOffsets.containsKey(tp)) {
>>>     toss("Already consuming TopicPartition %s" format tp)
>>>   }
>>>
>>>   val offset = if (nextOffset.isDefined && 
>>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
>>>     nextOffset
>>>       .get
>>>       .toLong
>>>   } else {
>>>     warn("It appears that we received an invalid or empty offset %s for %s. 
>>> Attempting to use Kafka's auto.offset.reset setting. This can result in 
>>> data loss if processing continues." format (nextOffset, tp))
>>>     offsetGetter.getResetOffset(simpleConsumer, tp)
>>>   }
>>>
>>>   debug("Got offset %s for new topic and partition %s." format (offset, tp))
>>>   nextOffsets += tp -> offset
>>>   metrics.topicPartitions(host, port).set(nextOffsets.size)
>>> }
>>>
>>> def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: 
>>> TopicAndPartition, offset: String) = {
>>>   info("Validating offset %s for topic and partition %s" format (offset, 
>>> topicAndPartition))
>>>
>>>   try {
>>>     val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
>>>
>>>     if (messages.hasError) {
>>>       
>>> KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, 
>>> topicAndPartition.partition))
>>>     }
>>>
>>>     info("Able to successfully read from offset %s for topic and partition 
>>> %s. Using it to instantiate consumer." format (offset, topicAndPartition))
>>>
>>>     true
>>>   } catch {
>>>     case e: OffsetOutOfRangeException => false
>>>   }
>>> }
>>>
>>>
>>
>

Reply via email to