Thorsten Hake created KAFKA-10515:
-------------------------------------

             Summary: NPE: Foreign key join serde may not be initialized with 
default serde if application is distributed
                 Key: KAFKA-10515
                 URL: https://issues.apache.org/jira/browse/KAFKA-10515
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.5.1, 2.6.0
            Reporter: Thorsten Hake


The fix of KAFKA-9517 fixed the initialization of the foreign key joins serdes 
for KStream applications that do not run distributed over multiple instances.

However, if an application runs distributed over multiple instances, the 
foreign key join serdes may still not be initialized leading to the following 
NPE:
{noformat}
Encountered the following error during 
processing:java.lang.NullPointerException: null
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
        at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
        at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
        at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
        at 
org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
        at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}

This happens because the processors for foreign key joins will be distributed 
across multiple tasks. The serde will only be initialized with the default 
serde during the initialization of the task containing the sink node 
("subscription-registration-sink"). So if the task containing the 
SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to 
the same instance as the task containing the sink node, a NPE will be thrown 
because the Serde of the state store used within the 
SubscriptionStoreReceiveProcessor is not initialized.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to