[ https://issues.apache.org/jira/browse/KAFKA-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205175#comment-16205175 ]
Matthias J. Sax commented on KAFKA-6063: ---------------------------------------- It's doubt that it's related to KAFKA-6047 but Streams fails for this scenario by design (the error message is not very good and we need to improve it though). You cannot change the number of input partitions on the fly -- this breaks your application and you need to either change the application id (as you did already) or reset you application using {{bin/kafka-streams-application-reset.sh}} (note, you will also need do clean you local state via {{KafkaStreams#cleanup()}} for a proper reset (cf. https://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool and https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/) > StreamsException is thrown after the changing `partitions` > ---------------------------------------------------------- > > Key: KAFKA-6063 > URL: https://issues.apache.org/jira/browse/KAFKA-6063 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.0 > Environment: macOS 10.12 > kafka 0.11.0.1 > Reporter: Akihito Nakano > > Hi. > "org.apache.kafka.streams.errors.StreamsException" is thrown in following > case. > h3. Create topic > {code:java} > $ bin/kafka-topics.sh --create --zookeeper localhost:2181 > --replication-factor 1 --partitions 6 --topic word-count-input > {code} > h3. Create Kafka Streams Application > {code:java} > public class WordCountApp { > public static void main(String[] args) { > Properties config = new Properties(); > config.put(StreamsConfig.APPLICATION_ID_CONFIG, > "wordcount-application"); > ... > ... > {code} > h3. Ensure that it works fine > {code:java} > $ java -jar wordcount.jar > KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b > StreamsThread appId: wordcount-application > ... > ... > {code} > h3. Change "partitions" > {code:java} > $ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 > --topic word-count-input > Adding partitions succeeded! > {code} > h3. When I start Application, StreamsException is thrown > {code:java} > $ java -jar wordcount.jar > KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522 > StreamsThread appId: wordcount-applicationn > StreamsThread clientId: > wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522 > StreamsThread threadId: > wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1 > Active tasks: > Running: > Suspended: > Restoring: > New: > Standby tasks: > Running: > Suspended: > Restoring: > New: > Exception in thread > "wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Could not create internal > topics. > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:82) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:660) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:398) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:522) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > {code} > If I change the application id, Application works again. > Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)