[ 
https://issues.apache.org/jira/browse/KAFKA-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119253#comment-17119253
 ] 

Alex Sun commented on KAFKA-10036:
----------------------------------

Hi [~mjsax], just to confirm the root cause. It is because that

sharing the same user defined processor(UDP) instance will result in multiple 
StreamThread trying to init ProcessorContext(StreamTask.initializeTopology()) 
in the same UDP instance( other thread-binded TaskManager.ProcessorContext has 
no idea about losing reverse callback from user defined processor instance). 
And by the time TaskManager actually assign records to user defined 
processor(UDP), the UDP callback to a different ProcessorContext(owned by other 
StreamThread) whose currentNode is null.

> Improve error message if user violates `Supplier` pattern
> ---------------------------------------------------------
>
>                 Key: KAFKA-10036
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10036
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Alex Sun
>            Priority: Minor
>              Labels: newbie
>
> Using the Processor API, users need to pass in a `ProcessorSupplier` that 
> needs to return new `Processor` instance each time `get()` is called.
> Users violate this rule on a regular basis and return the same instance on 
> `get()`. This mistake leads to a (not very informative) 
> `NullPointerException` during runtime. (Cf: 
> [https://stackoverflow.com/questions/61790984/kafka-stream-forward-method-throwing-nullpointerexception-because-processornode/61978396)]
> We could improve the error message by checking if `currentNode()` returns 
> `null` 
> ([https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L183)]
>  and throw an informative error message for this case.
> Furthermore, we could do a "sanity" check within `KafkaStreams` constructor 
> before we start the process threads: we get all `Suppliers` for the 
> `Topology` and call `get()` two times on each supplier to compare if the 
> returned object references are different – if they are the same, we throw an 
> informative error message.
> We should improve the JavaDocs, too: 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java#L34]
>  (also for `Transformer` et al. – it seems to be too subtle what "new" means. 
> Similarly for 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/Topology.java]
>  and 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java]
>  (`process()`, `transform()` etc.)
> Furthermore, we should improve the docs: to explain the supplier pattern 
> explicitly: 
> [https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to