Hello, I want to have two way communication in Apache Kafka and since the Apache Kafka Topology permits no cyclic topology I was suggested by my supervisor to use State Stores as remote objects.
I created custom State Stores as described here: https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores (I did not use interactive queries I just created some custom states stores for my Kafka Processors) If I declare the state store as: * public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<String, String>, Serializable* and I pass my Custom State Stores to some remote method of another remote object and since it implements Serializable it passes by value and I want it to be passed by reference. I think my Custom State Store in order to be passed by reference should now extend UnicastRemoteObject like that: *public class MyCustomStore<K,V> extends UnicastRemoteObject implements StateStore, MyWriteableCustomStore<String, String>* *...* *public MyCustomStore(...) throws RemoteException{* *...* *}* but now I get this Exception at initialization ERROR stream-thread [site-client-StreamThread-1] Encountered the following error during processing: (org.apache.kafka.streams.processor.internals.StreamThread:744) java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:226) at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:225) at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:314) at org.apache.kafka.streams.processor.StreamThread.runOnce(StreamThread.java:824) at org.apache.kafka.streams.processor.StreamThread.runLoop(StreamThread.java:767) at org.apache.kafka.streams.processor.StreamThread.run(StreamThread.java:736) I don't list my code of State Stores since it is consisted of hundreds of lines.If I have to I will enlist. My version of Apache Kafka Streams in the cluster I use is : kafka-streams-2.0.0.3.1.0.0-78.jar (in kafka-broker/libs folder) Whenever an instance of my Custom State Store is created I use try- catch clause since the constructor throws RemoteException Thanks in advance