Re: "UnsupportedOperationException: null org.apache.kafka.streams. processor.internals.StandbyContextImpl.recordCollector( StandyContextImpl.java:81)": I think this is a known issue that has been fixed in trunk:
https://github.com/apache/kafka/commit/a4592a18641f84a1983c5fe7e697a8 b0ab43eb25 Guozhang On Fri, Dec 16, 2016 at 11:53 AM, Matthias J. Sax <matth...@confluent.io> wrote: > I guess. It's bugs, so always hard to be 100% sure. > > We know about a null-pointer bug in task assignment/creating -- so I > assume it what you see. > > -Matthias > > On 12/16/16 11:19 AM, Jon Yeargers wrote: > > And these bugs would cause the behaviors Im seeing? > > > > On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax <matth...@confluent.io > > > > wrote: > > > >> We just discovered a couple of bugs with regard to standby tasks... Not > >> all bug fix PRs got merged yet. > >> > >> You can try running on trunk to get those fixes. Should only be a few > >> days until the fixes get merged. > >> > >> > >> -Matthias > >> > >> On 12/16/16 9:10 AM, Jon Yeargers wrote: > >>> Have started having this issue with another KStream based app. Digging > >>> through logs I ran across this message: > >>> > >>> When I've seen it before it certainly does kill the application. At the > >> end > >>> of the SNIP you can see the exit process starting. > >>> > >>> > >>> 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG > >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > >> creating > >>> new standby task 0_0 > >>> > >>> 2016-12-16 17:04:51,507 [StreamThread-1] INFO > >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > >> Creating > >>> new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]] > >>> > >>> 2016-12-16 17:04:51,508 [StreamThread-1] INFO > >>> o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing > state > >>> stores > >>> > >>> 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG > >>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor > >> fetching > >>> committed offsets for partitions: [rtdetail_breakout-0] > >>> > >>> 2016-12-16 17:04:51,819 [StreamThread-1] ERROR > >>> o.a.k.c.c.i.ConsumerCoordinator - User provided listener > >>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group > >>> RtDetailBreakoutProcessor fa > >>> > >>> iled on partition assignment > >>> > >>> java.lang.UnsupportedOperationException: null > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals.StandbyContextImpl. > >> recordCollector(StandbyContextImpl.java:81) > >>> > >>> at > >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>( > >> StoreChangeLogger.java:54) > >>> > >>> at > >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>( > >> StoreChangeLogger.java:46) > >>> > >>> at > >>> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > >> RocksDBWindowStore.java:197) > >>> > >>> at > >>> org.apache.kafka.streams.state.internals.MeteredWindowStore.init( > >> MeteredWindowStore.java:66) > >>> > >>> at > >>> org.apache.kafka.streams.state.internals.CachingWindowStore.init( > >> CachingWindowStore.java:64) > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals.AbstractTask. > >> initializeStateStores(AbstractTask.java:86) > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals.StandbyTask.<init>( > >> StandbyTask.java:68) > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamThread. > >> createStandbyTask(StreamThread.java:733) > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.addStandbyTasks(StreamThread.java:757) > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamThread.access$200( > >> StreamThread.java:69) > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamThread$1. > >> onPartitionsAssigned(StreamThread.java:125) > >>> > >>> at > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > >> onJoinComplete(ConsumerCoordinator.java:229) > >>> > >>> at > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> joinGroupIfNeeded(AbstractCoordinator.java:313) > >>> > >>> at > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> ensureActiveGroup(AbstractCoordinator.java:277) > >>> > >>> at > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > >> ConsumerCoordinator.java:260) > >>> > >>> at > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1013) > >>> > >>> at > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:979) > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:442) > >>> > >>> at > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:242) > >>> > >>> 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG > >>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor > >> fetching > >>> committed offsets for partitions: [rtdetail_breakout-2, > >>> rtdetail_breakout-1, rtd > >>> > >>> etail_breakout-6, rtdetail_breakout-5, rtdetail_breakout_filtered-1] > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition > >>> rtdetail_breakout-2 to the committed offset 1989670807 > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition > >>> rtdetail_breakout-1 to the committed offset 1991427117 > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition > >>> rtdetail_breakout-6 to the committed offset 1986565752 > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition > >>> rtdetail_breakout-5 to the committed offset 1982149459 > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition > >>> rtdetail_breakout_filtered-1 to the committed offset 92917 > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.kafka.clients.NetworkClient - Initiating connection to node 115 at > >>> 54.154.234.110:9092. > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.kafka.clients.NetworkClient - Initiating connection to node 114 at > >>> 54.194.192.105:9092. > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.kafka.clients.NetworkClient - Initiating connection to node 112 at > >>> 54.171.236.113:9092. > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.kafka.clients.NetworkClient - Initiating connection to node 111 at > >>> 54.154.115.41:9092. > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] INFO > >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > >> Shutting > >>> down > >>> > >>> 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG > >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > >>> shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1, > >> 0_5, > >>> 0_6]] and standby tasks [[]] > >>> > >>> On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers < > jon.yearg...@cedexis.com> > >>> wrote: > >>> > >>>> FWIW I put a .warn in my shutdown hook - to make sure it wasn't being > >>>> called unknowingly. > >>>> > >>>> Runtime.getRuntime().addShutdownHook(new Thread(() -> { > >>>> try { > >>>> LOGGER.warn("ShutdownHook"); > >>>> kafkaStreams.close(); > >>>> } catch (Exception e) { > >>>> // ignored > >>>> } > >>>> })); > >>>> > >>>> > >>>> Ran another test and the app closed after ~40min. The above message > >>>> appears 3rd from the end (several seconds after the shutdown process > has > >>>> commenced). > >>>> > >>>> (attaching log section) > >>>> > >>>> This has *got* to be something that I've setup improperly... I just > >> can't > >>>> seem to see it. > >>>> > >>>> On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers < > jon.yearg...@cedexis.com > >>> > >>>> wrote: > >>>> > >>>>> Im seeing instances where my apps are exiting (gracefully, mind you) > >>>>> without any obvious errors or cause. I have debug logs from many > >> instances > >>>>> of this and have yet to find a reason to explain what's happening. > >>>>> > >>>>> - nothing in the app log > >>>>> - nothing in /var/log/messages (IE not OOM killed) > >>>>> - not being closed via /etc/init.d > >>>>> - nothing in the broker logs > >>>>> > >>>>> Running 0.10.1.0 > >>>>> > >>>>> example log: > >>>>> > >>>>> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0 > >>>>> /view?usp=sharing > >>>>> > >>>> > >>>> > >>> > >> > >> > > > > -- -- Guozhang