[ https://issues.apache.org/jira/browse/KAFKA-3642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-3642: --------------------------------- Labels: architecture (was: ) > Fix NPE from ProcessorStateManager when the changelog topic not exists > ---------------------------------------------------------------------- > > Key: KAFKA-3642 > URL: https://issues.apache.org/jira/browse/KAFKA-3642 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.0.1 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > Labels: architecture > Fix For: 0.10.1.0 > > > # Fix NPE from ProcessorStateManager when the changelog topic not exists > When the following two conditions satisifed, ProcessorStateManager throws NPE: > - A state configured with logging enabled but the corresponding -changelog > topic not exists, > - zookeeper.connect wasn't supplied in streams config. > so Streams should, > - expected that the -changelog topic is not exists and throw much meaningful > exception. > - warn users if there's no -changelog topic prepared but zookeeper.connect > wasn't also supplied. > BTW, I think making zookeeper.connect as mandatory argument should be another > option if it doens't hurts. > {code} > $ git diff > > diff --git > a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java > > b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java > index 34c35b7..c5339f1 100644 > --- > a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java > +++ > b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java > @@ -108,7 +108,7 @@ public class WordCountProcessorDemo { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "streams-wordcount-processor"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); > + // props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > "localhost:2181"); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > > $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --list 2>/dev/null | grep > '\-changelog' > $ ./bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo > ... > [2016-04-30 02:25:04,960] ERROR User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > streams-wordcount-processor failed on partition assignment > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:189) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) > at > org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:64) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:609) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:71) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:126) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:226) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:221) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:430) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:416) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:652) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:397) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:338) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:191) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:161) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:237) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:343) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:902) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:864) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:327) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250) > Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Failed to rebalance > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:331) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250) > Caused by: java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:189) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) > at > org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:64) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:609) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:71) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:126) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:226) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:221) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:430) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:416) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:652) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:397) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:338) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:191) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:161) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:237) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:343) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:902) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:864) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:327) > ... 1 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)