Don't get scared, this if perfectly normal and easily fixed. :-) The second topology attempted to fetch messages from an offset in Kafka that does not exists. This could happen due to Kafka retention policies (messages deleted) or a bug in your code. Your code needs to catch this exception, and then ask Kafka for the earliest -- or latest offset (take your pick) -- and then re-issue the fetch using the returned offset.
Are you using a separate path in ZK for the second topology? It is of a completely different nature than the first? Philip On Mon, Nov 18, 2013 at 7:40 PM, Oleg Ruchovets <oruchov...@gmail.com>wrote: > We are working with kafka (0.7.2) + storm. > 1) We deployed 1st topology which subscribed on Kafka topic and it is > working fine already couple of weeks. > 2) Yesterday we deploy 2nd topology which subscribed on the same Kafka > topic , but 2nd topology immediately failed with exception: > > *What can cause such behavior and how we can resolve the issue: * > > > java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) > > at > > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) > > at > > backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) > > at > backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > > at clojure.lang.AFn.run(AFn.java:24) > > at java.lang.Thread.run(Thread.java:662) > > Caused by: kafka.common.OffsetOutOfRangeException > > at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) > > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > > at > java.lang.reflect.Constructor.newInstance(Constructor.java:513) > > at java.lang.Class.newInstance0(Class.java:355) > > at java.lang.Class.newInstance(Class.java:308) > > at > kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) > > at > > kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) > > at > kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) > > at > > kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) > > at > > kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) > > at > storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) > > at > > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) > > at > > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) > > at > > backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) > > at > > backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) > > at > > backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) > > at > > backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) > > at > > backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) > > at > > backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) > > ... 6 more > > Mon, 18 Nov 2013 12:36:25 +0000 > > java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) > > at > > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) > > at > > backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) > > at > backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > > at clojure.lang.AFn.run(AFn.java:24) > > at java.lang.Thread.run(Thread.java:662) > > Caused by: kafka.common.OffsetOutOfRangeException > > at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) > > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > > at > java.lang.reflect.Constructor.newInstance(Constructor.java:513) > > at java.lang.Class.newInstance0(Class.java:355) > > at java.lang.Class.newInstance(Class.java:308) > > at > kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) > > at > > kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) > > at > kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) > > at > > kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) > > at > > kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) > > at > storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) > > at > > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) > > at > > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) > > at > > backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) > > at > > backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) > > at > > backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) > > at > > backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) > > at > > backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) > > at > > backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) > > ... 6 more > > Mon, 18 Nov 2013 12:35:49 +0000 > > java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) > > at > > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) > > at > > backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) > > at > backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > > at clojure.lang.AFn.run(AFn.java:24) > > at java.lang.Thread.run(Thread.java:662) > > Caused by: kafka.common.OffsetOutOfRangeException > > at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) > > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > > at > java.lang.reflect.Constructor.newInstance(Constructor.java:513) > > at java.lang.Class.newInstance0(Class.java:355) > > at java.lang.Class.newInstance(Class.java:308) > > at > kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) > > at > > kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) > > at > kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) > > at > > kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) > > at > > kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) > > at > storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) > > at > > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) > > at > > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) > > at > > backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) > > at > > backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) > > at > > backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) > > at > > backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) > > at > > backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) > > at > > backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) > > at > > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) > > ... 6 more >