The NPE in KafkaStreams 2.4.0 looke like this: 2020-02-05 14:54:09.392 [mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2] State transition from STARTING to PARTITIONS_ASSIGNED 2020-02-05 14:54:09.400 [mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:234) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:176) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) [app.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272) [app.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400) [app.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421) [app.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) [app.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) [app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) [app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) [app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [app.jar:?] 2020-02-05 14:54:09.405 [mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2] partition assignment took 13 ms. currently assigned active tasks: [] currently assigned standby tasks: [] revoked active tasks: [] revoked standby tasks: []
On Wed, 5 Feb 2020 at 13:34, Murilo Tavares <murilo...@gmail.com> wrote: > Hi > I have a KafkaStreams application that's pretty simple, and acts as a > repartitioner... It reads from input topics and send to output topics, > based on a input-to-output topics map. It has a custom Repartitioner that > will be responsible for assigning new partitions for the data in the output > topics. > > The topology is simple: > protected Topology buildTopology(Map<String, String> topics) { > StreamsBuilder builder = new StreamsBuilder(); > for (Entry<String, String> entry : topics.entrySet()) { > builder > .stream(entry.getKey(), Consumed.with(Serdes.ByteArray(), > Serdes.ByteArray())) > .to(entry.getValue(), Produced.with(Serdes.ByteArray(), > Serdes.ByteArray())); > } > return builder.build(); > } > > The resulting topolgy will look like this: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [kafka.inputA]) > --> KSTREAM-SINK-0000000001 > Sink: KSTREAM-SINK-0000000001 (topic: kafka.inputA-repartitioned) > <-- KSTREAM-SOURCE-0000000000 > > Sub-topology: 1 > Source: KSTREAM-SOURCE-0000000002 (topics: [kafka.inputB]) > --> KSTREAM-SINK-0000000003 > Sink: KSTREAM-SINK-0000000003 (topic: kafka.inputB-repartitioned) > <-- KSTREAM-SOURCE-0000000002 > > Sub-topology: 2 > Source: KSTREAM-SOURCE-0000000004 (topics: [kafka.inputC]) > --> KSTREAM-SINK-0000000005 > Sink: KSTREAM-SINK-0000000005 (topic: kafka.inputC-repartitioned) > <-- KSTREAM-SOURCE-0000000004 > ... (about 25 diffent sub-topologies in total) > > I am facing some inconsistent NPE in some environments, that I'll send in > the next e-mail not to polute this one anymore. > > But I am wondering if I can really do that, and what would happen if I: > 1) add a new sub-topolgy; > 2) adding the new sub-topology breaks the order of the iteration; > > Will any of the above operations require a reset to the KafkaStreams > application (or new app.id)? > > Thanks > Murilo >