Don't get scared, this if perfectly normal and easily fixed. :-) The second
topology attempted to fetch messages from an offset in Kafka that does not
exists. This could happen due to Kafka retention policies (messages
deleted) or a bug in your code. Your code needs to catch this exception,
and then ask Kafka for the earliest -- or latest offset (take your pick) --
and then re-issue the fetch using the returned offset.

Are you using a separate path in ZK for the second topology? It is of a
completely different nature than the first?

Philip




On Mon, Nov 18, 2013 at 7:40 PM, Oleg Ruchovets <oruchov...@gmail.com>wrote:

> We are working with kafka  (0.7.2) + storm.
>    1) We deployed 1st topology which subscribed on Kafka topic and it is
> working fine already couple of weeks.
>     2) Yesterday we deploy 2nd topology which subscribed on the  same Kafka
> topic , but 2nd topology immediately failed with exception:
>
> *What can cause such behavior and how we can resolve the issue: *
>
>
> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
>
>                 at
>
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>
>                 at
>
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
>
>                 at
> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
>
>                 at clojure.lang.AFn.run(AFn.java:24)
>
>                 at java.lang.Thread.run(Thread.java:662)
>
> Caused by: kafka.common.OffsetOutOfRangeException
>
>                 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
>                 at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>
>                 at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>
>                 at
> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>
>                 at java.lang.Class.newInstance0(Class.java:355)
>
>                 at java.lang.Class.newInstance(Class.java:308)
>
>                 at
> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
>
>                 at
>
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
>
>                 at
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
>
>                 at
>
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
>
>                 at
>
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
>
>                 at
> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
>
>                 at
>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
>
>                 at
>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
>
>                 at
>
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
>
>                 at
>
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
>
>                 at
>
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
>
>                 at
>
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
>
>                 at
>
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
>
>                 at
>
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
>
>                 ... 6 more
>
> Mon, 18 Nov 2013 12:36:25 +0000
>
> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
>
>                 at
>
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>
>                 at
>
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
>
>                 at
> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
>
>                 at clojure.lang.AFn.run(AFn.java:24)
>
>                 at java.lang.Thread.run(Thread.java:662)
>
> Caused by: kafka.common.OffsetOutOfRangeException
>
>                 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
>                 at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>
>                 at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>
>                 at
> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>
>                 at java.lang.Class.newInstance0(Class.java:355)
>
>                 at java.lang.Class.newInstance(Class.java:308)
>
>                 at
> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
>
>                 at
>
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
>
>                 at
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
>
>                 at
>
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
>
>                 at
>
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
>
>                 at
> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
>
>                 at
>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
>
>                 at
>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
>
>                 at
>
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
>
>                 at
>
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
>
>                 at
>
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
>
>                 at
>
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
>
>                 at
>
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
>
>                 at
>
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
>
>                 ... 6 more
>
> Mon, 18 Nov 2013 12:35:49 +0000
>
> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
>
>                 at
>
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>
>                 at
>
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
>
>                 at
> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
>
>                 at clojure.lang.AFn.run(AFn.java:24)
>
>                 at java.lang.Thread.run(Thread.java:662)
>
> Caused by: kafka.common.OffsetOutOfRangeException
>
>                 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
>                 at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>
>                 at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>
>                 at
> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>
>                 at java.lang.Class.newInstance0(Class.java:355)
>
>                 at java.lang.Class.newInstance(Class.java:308)
>
>                 at
> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
>
>                 at
>
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
>
>                 at
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
>
>                 at
>
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
>
>                 at
>
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
>
>                 at
> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
>
>                 at
>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
>
>                 at
>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
>
>                 at
>
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
>
>                 at
>
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
>
>                 at
>
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
>
>                 at
>
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
>
>                 at
>
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
>
>                 at
>
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
>
>                 at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
>
>                 ... 6 more
>

Reply via email to