The NPE in KafkaStreams 2.4.0 looke like this:

2020-02-05 14:54:09.392
[mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2]
INFO  org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread
[mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2]
State transition from STARTING to PARTITIONS_ASSIGNED
2020-02-05 14:54:09.400
[mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2]
ERROR org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread
[mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2]
Error caught during partition assignment, will abort the current process
and re-throw at the end of rebalance
java.lang.NullPointerException: null
        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:234)
~[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:176)
~[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
~[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
~[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
~[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
~[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
~[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
[app.jar:?]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
[app.jar:?]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
[app.jar:?]
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
[app.jar:?]
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
[app.jar:?]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
[app.jar:?]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
[app.jar:?]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
[app.jar:?]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
[app.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
[app.jar:?]
2020-02-05 14:54:09.405
[mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2]
INFO  org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread
[mico-repartitioner-665ab051-b233-4a45-88af-5dab135fde8b-StreamThread-2]
partition assignment took 13 ms.
        currently assigned active tasks: []
        currently assigned standby tasks: []
        revoked active tasks: []
        revoked standby tasks: []

On Wed, 5 Feb 2020 at 13:34, Murilo Tavares <murilo...@gmail.com> wrote:

> Hi
> I have a KafkaStreams application that's pretty simple, and acts as a
> repartitioner... It reads from input topics and send to output topics,
> based on a input-to-output topics map. It has a custom Repartitioner that
> will be responsible for assigning new partitions for the data in the output
> topics.
>
> The topology is simple:
>   protected Topology buildTopology(Map<String, String> topics) {
>     StreamsBuilder builder = new StreamsBuilder();
>     for (Entry<String, String> entry : topics.entrySet()) {
>       builder
>           .stream(entry.getKey(), Consumed.with(Serdes.ByteArray(),
> Serdes.ByteArray()))
>           .to(entry.getValue(), Produced.with(Serdes.ByteArray(),
> Serdes.ByteArray()));
>     }
>     return builder.build();
>   }
>
> The resulting topolgy will look like this:
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-0000000000 (topics: [kafka.inputA])
>    --> KSTREAM-SINK-0000000001
>  Sink: KSTREAM-SINK-0000000001 (topic: kafka.inputA-repartitioned)
>    <-- KSTREAM-SOURCE-0000000000
>
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-0000000002 (topics: [kafka.inputB])
>    --> KSTREAM-SINK-0000000003
>  Sink: KSTREAM-SINK-0000000003 (topic: kafka.inputB-repartitioned)
>    <-- KSTREAM-SOURCE-0000000002
>
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-0000000004 (topics: [kafka.inputC])
>    --> KSTREAM-SINK-0000000005
>  Sink: KSTREAM-SINK-0000000005 (topic: kafka.inputC-repartitioned)
>    <-- KSTREAM-SOURCE-0000000004
> ... (about 25 diffent sub-topologies in total)
>
> I am facing some inconsistent NPE in some environments, that I'll send in
> the next e-mail not to polute this one anymore.
>
> But I am wondering if I can really do that, and what would happen if I:
> 1) add a new sub-topolgy;
> 2) adding the new sub-topology breaks the order of the iteration;
>
> Will any of the above operations require a reset to the KafkaStreams
> application (or new app.id)?
>
> Thanks
> Murilo
>

Reply via email to