Hi All,

We recently observed an issue in our Samza application (version
0.12.0) where we found that that the message offsets "jumped back" causing
many of the messages to be re-processed.

On digging deeper - here is what we found:

-  There was a some network related issues at that time causing temporary
communication losses between nodes.
-  BrokerProxy tried to addTopicPartition() and while doing that, its
isValidOffset() method failed.
 - This caused it to read the reset the offset using the configured setting
- which we have set to auto.offset.reset=smallest

Now, the validation check simply tries to fetch the message at the given
offset and if there is an exception while doing it, it assumes that the
offset is invalid. I think that the exception ReplicaNotAvailableException is
being considered harmless, but I am not sure what is the expected behavior
in case of any other exception. What about network related exceptions (that
are transient)?

I am pasting below the logs that show a *simple network error,* and
immediately after that the message that Samza thinks that the offset was
invalid and hence it is falling-back to reset.

Is there any workaround to get past this problem? I would have thought that
ideally, there could be only handful of well-known error codes that would
indicate that the offset into a kafka topic/partition is invalid.

--
thanks,
gaurav


INFO [2017-05-01 15:19:27,327] [U:1,898,F:1,159,T:3,056,M:3,056]
system.kafka.DefaultFetchSimpleConsumer:[Logging$class:info:76] -
[SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for client
samza_consumer-job4-1] - Reconnect due to error:
java.io.EOFException
        at org.apache.kafka.common.network.NetworkReceive.
readFromReadableChannel(NetworkReceive.java:83)
        at kafka.network.BlockingChannel.readCompletely(
BlockingChannel.scala:129)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
scala:86)
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
$sendRequest(SimpleConsumer.scala:83)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
SimpleConsumer.scala:131)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
SimpleConsumer.scala:131)
        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
SimpleConsumer.scala:131)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
        at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(
DefaultFetchSimpleConsumer.scala:48)
        at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.
defaultFetch(DefaultFetchSimpleConsumer.scala:41)
        at org.apache.samza.system.kafka.GetOffset.isValidOffset(
GetOffset.scala:60)
        at org.apache.samza.system.kafka.BrokerProxy.addTopicPartition(
BrokerProxy.scala:99)
        at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$
refreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:213)
        at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$
refreshBrokers$2.apply(KafkaSystemConsumer.scala:226)
        at org.apache.samza.system.kafka.KafkaSystemConsumer$$anonfun$
refreshBrokers$2.apply(KafkaSystemConsumer.scala:192)
        at org.apache.samza.util.ExponentialSleepStrategy.run(
ExponentialSleepStrategy.scala:82)
        at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(
KafkaSystemConsumer.scala:191)
        at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.
abdicate(KafkaSystemConsumer.scala:293)
        at org.apache.samza.system.kafka.BrokerProxy.abdicate(
BrokerProxy.scala:207)
        at org.apache.samza.system.kafka.BrokerProxy$$anonfun$
handleErrors$2.apply(BrokerProxy.scala:245)
        at org.apache.samza.system.kafka.BrokerProxy$$anonfun$
handleErrors$2.apply(BrokerProxy.scala:245)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
        at org.apache.samza.system.kafka.BrokerProxy.handleErrors(
BrokerProxy.scala:245)
        at org.apache.samza.system.kafka.BrokerProxy.org
<http://org.apache.samza.system.kafka.brokerproxy.org/>$apache$samza$
system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:186)
        at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
run$1.apply(BrokerProxy.scala:147)
        at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
run$1.apply(BrokerProxy.scala:134)
        at org.apache.samza.util.ExponentialSleepStrategy.run(
ExponentialSleepStrategy.scala:82)
        at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(
BrokerProxy.scala:133)
        at java.lang.Thread.run(Thread.java:745)

WARN [2017-05-01 15:19:27,378] [U:1,904,F:1,153,T:3,056,M:3,056]
system.kafka.BrokerProxy:[Logging$class:warn:74] -
[SAMZA-BROKER-PROXY-BrokerProxy
thread pointed at platform1:9092 for client samza_consumer-job4-1] - It
appears that we received an invalid or empty offset Some(45039137) for
[Topic3,17]. Attempting to use Kafka's auto.offset.reset setting. This can
result in data loss if processing continues.
INFO [2017-05-01 15:19:27,398] [U:1,904,F:1,152,T:3,056,M:3,056]
system.kafka.KafkaSystemConsumer:[Logging$class:info:63] -
[SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform3:9092 for client
samza_consumer-job4-1] - Refreshing brokers for: Map([Topic3,17] ->
45039137)
INFO [2017-05-01 15:19:27,405] [U:1,904,F:1,152,T:3,056,M:3,056]
system.kafka.GetOffset:[Logging$class:info:63] -
[SAMZA-BROKER-PROXY-BrokerProxy
thread pointed at platform1:9092 for client samza_consumer-job4-1] -
Checking if auto.offset.reset is defined for topic Topic3
INFO [2017-05-01 15:19:27,432] [U:1,904,F:1,152,T:3,056,M:3,056]
system.kafka.GetOffset:[Logging$class:info:63] -
[SAMZA-BROKER-PROXY-BrokerProxy
thread pointed at platform1:9092 for client samza_consumer-job4-1] - Got
reset of type smallest.

def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
  debug("Adding new topic and partition %s to queue for %s" format (tp, host))
  if (nextOffsets.containsKey(tp)) {
    toss("Already consuming TopicPartition %s" format tp)
  }

  val offset = if (nextOffset.isDefined &&
offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
    nextOffset
      .get
      .toLong
  } else {
    warn("It appears that we received an invalid or empty offset %s
for %s. Attempting to use Kafka's auto.offset.reset setting. This can
result in data loss if processing continues." format (nextOffset, tp))
    offsetGetter.getResetOffset(simpleConsumer, tp)
  }

  debug("Got offset %s for new topic and partition %s." format (offset, tp))
  nextOffsets += tp -> offset
  metrics.topicPartitions(host, port).set(nextOffsets.size)
}

def isValidOffset(consumer: DefaultFetchSimpleConsumer,
topicAndPartition: TopicAndPartition, offset: String) = {
  info("Validating offset %s for topic and partition %s" format
(offset, topicAndPartition))

  try {
    val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))

    if (messages.hasError) {
      KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic,
topicAndPartition.partition))
    }

    info("Able to successfully read from offset %s for topic and
partition %s. Using it to instantiate consumer." format (offset,
topicAndPartition))

    true
  } catch {
    case e: OffsetOutOfRangeException => false
  }
}

Reply via email to