We are working with kafka (0.7.2) + storm. 1) We deployed 1st topology which subscribed on Kafka topic and it is working fine already couple of weeks. 2) Yesterday we deploy 2nd topology which subscribed on the same Kafka topic , but 2nd topology immediately failed with exception:
*What can cause such behavior and how we can resolve the issue: * java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) at backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:662) Caused by: kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) at kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) at storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) at backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) at backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) at backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) at backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) at backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) at backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) ... 6 more Mon, 18 Nov 2013 12:36:25 +0000 java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) at backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:662) Caused by: kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) at kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) at storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) at backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) at backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) at backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) at backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) at backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) at backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) ... 6 more Mon, 18 Nov 2013 12:35:49 +0000 java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) at backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:662) Caused by: kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) at kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) at storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) at backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) at backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) at backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) at backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) at backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) at backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) ... 6 more