Hi Philip. Noted and really appreciate for your inputs. There is no problem to patch the code. I just didn't want to be coupled with forked version of the storm-kafka spout. Since my patch will not be in the main branch of the code.
Thanks Oleg. On Tue, Nov 19, 2013 at 9:44 PM, Philip O'Toole <phi...@loggly.com> wrote: > The Storm mailing list is probably a better place for this thread. If I > understand the issue, it is not a ZK issue, nor a Kafka config issue. We > run multiple topos draining the same topics all the time. > > In any event, you just need to patch the Kafka Spout code to catch this > exception, and ask Kafka for the earliest or latest offset (make that > choice configurable), and reset with that. We did that here at Loggly. > > I don't have a patch to hand, but to be honest, if you want to have any > hope of running Kafka and Storm in production you should attempt to code > the patch yourself. It will teach you stuff you absolutely need to > understand about these two pieces of software, and offset management is > particularly important. > > It's not a difficult change though. Just build and instrument the code to > start. You'll thank me when you hit your next offset-related issue. > > Philip > > On Nov 18, 2013, at 11:10 PM, Oleg Ruchovets <oruchov...@gmail.com> wrote: > > > Hi Philip. > > > > It looks like this is our case: > > https://github.com/nathanmarz/storm-contrib/pull/15 > > > > It is interesting that the issue is still open ( after more then 1 year) > so > > I am curious how people able to work on production without ability to > > deploy another topology. > > Can community please share is this patch resolve the issue and who is > using > > it on production. > > > > Also question : should I change zookeeper , kafka configuration to > resolve > > the issue? If yes please share what should be changed. > > > > Thanks > > Oleg. > > > > > > > > On Tue, Nov 19, 2013 at 11:51 AM, Philip O'Toole <phi...@loggly.com> > wrote: > > > >> Don't get scared, this if perfectly normal and easily fixed. :-) The > second > >> topology attempted to fetch messages from an offset in Kafka that does > not > >> exists. This could happen due to Kafka retention policies (messages > >> deleted) or a bug in your code. Your code needs to catch this exception, > >> and then ask Kafka for the earliest -- or latest offset (take your > pick) -- > >> and then re-issue the fetch using the returned offset. > >> > >> Are you using a separate path in ZK for the second topology? It is of a > >> completely different nature than the first? > >> > >> Philip > >> > >> > >> > >> > >> On Mon, Nov 18, 2013 at 7:40 PM, Oleg Ruchovets <oruchov...@gmail.com > >>> wrote: > >> > >>> We are working with kafka (0.7.2) + storm. > >>> 1) We deployed 1st topology which subscribed on Kafka topic and it is > >>> working fine already couple of weeks. > >>> 2) Yesterday we deploy 2nd topology which subscribed on the same > >> Kafka > >>> topic , but 2nd topology immediately failed with exception: > >>> > >>> *What can cause such behavior and how we can resolve the issue: * > >>> > >>> > >>> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) > >>> > >>> at > >>> > >>> > >> > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) > >>> > >>> at > >>> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > >>> > >>> at clojure.lang.AFn.run(AFn.java:24) > >>> > >>> at java.lang.Thread.run(Thread.java:662) > >>> > >>> Caused by: kafka.common.OffsetOutOfRangeException > >>> > >>> at > >>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > >>> > >>> at > >>> > >>> > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) > >>> > >>> at > >>> > >>> > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > >>> > >>> at > >>> java.lang.reflect.Constructor.newInstance(Constructor.java:513) > >>> > >>> at java.lang.Class.newInstance0(Class.java:355) > >>> > >>> at java.lang.Class.newInstance(Class.java:308) > >>> > >>> at > >>> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) > >>> > >>> at > >>> > >>> > >> > kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) > >>> > >>> at > >>> > >> > kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) > >>> > >>> at > >>> > >>> > >> > kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) > >>> > >>> at > >>> > >>> > >> > kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) > >>> > >>> at > >>> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) > >>> > >>> at > >>> > >>> > >> > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) > >>> > >>> at > >>> > >>> > >> > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) > >>> > >>> at > >>> > >>> > >> > backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) > >>> > >>> at > >>> > >>> > >> > backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) > >>> > >>> at > >>> > >>> > >> > backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) > >>> > >>> at > >>> > >>> > >> > backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) > >>> > >>> ... 6 more > >>> > >>> Mon, 18 Nov 2013 12:36:25 +0000 > >>> > >>> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) > >>> > >>> at > >>> > >>> > >> > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) > >>> > >>> at > >>> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > >>> > >>> at clojure.lang.AFn.run(AFn.java:24) > >>> > >>> at java.lang.Thread.run(Thread.java:662) > >>> > >>> Caused by: kafka.common.OffsetOutOfRangeException > >>> > >>> at > >>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > >>> > >>> at > >>> > >>> > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) > >>> > >>> at > >>> > >>> > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > >>> > >>> at > >>> java.lang.reflect.Constructor.newInstance(Constructor.java:513) > >>> > >>> at java.lang.Class.newInstance0(Class.java:355) > >>> > >>> at java.lang.Class.newInstance(Class.java:308) > >>> > >>> at > >>> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) > >>> > >>> at > >>> > >>> > >> > kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) > >>> > >>> at > >>> > >> > kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) > >>> > >>> at > >>> > >>> > >> > kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) > >>> > >>> at > >>> > >>> > >> > kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) > >>> > >>> at > >>> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) > >>> > >>> at > >>> > >>> > >> > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) > >>> > >>> at > >>> > >>> > >> > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) > >>> > >>> at > >>> > >>> > >> > backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) > >>> > >>> at > >>> > >>> > >> > backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) > >>> > >>> at > >>> > >>> > >> > backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) > >>> > >>> at > >>> > >>> > >> > backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) > >>> > >>> ... 6 more > >>> > >>> Mon, 18 Nov 2013 12:35:49 +0000 > >>> > >>> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) > >>> > >>> at > >>> > >>> > >> > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658) > >>> > >>> at > >>> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > >>> > >>> at clojure.lang.AFn.run(AFn.java:24) > >>> > >>> at java.lang.Thread.run(Thread.java:662) > >>> > >>> Caused by: kafka.common.OffsetOutOfRangeException > >>> > >>> at > >>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > >>> > >>> at > >>> > >>> > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) > >>> > >>> at > >>> > >>> > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > >>> > >>> at > >>> java.lang.reflect.Constructor.newInstance(Constructor.java:513) > >>> > >>> at java.lang.Class.newInstance0(Class.java:355) > >>> > >>> at java.lang.Class.newInstance(Class.java:308) > >>> > >>> at > >>> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) > >>> > >>> at > >>> > >>> > >> > kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) > >>> > >>> at > >>> > >> > kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) > >>> > >>> at > >>> > >>> > >> > kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51) > >>> > >>> at > >>> > >>> > >> > kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50) > >>> > >>> at > >>> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36) > >>> > >>> at > >>> > >>> > >> > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75) > >>> > >>> at > >>> > >>> > >> > storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64) > >>> > >>> at > >>> > >>> > >> > backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90) > >>> > >>> at > >>> > >>> > >> > backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47) > >>> > >>> at > >>> > >>> > >> > backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566) > >>> > >>> at > >>> > >>> > >> > backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345) > >>> > >>> at > >>> > >>> > >> > backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43) > >>> > >>> at > >>> > >>> > >> > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79) > >>> > >>> ... 6 more > >>> > >> >