Hi Michelle, Are you sure you do not pass a null instead of your custom store to your topology by mistake? How does the implementation of the `build()` method of your `MyCustomStoreBuilder` look like?
Best, Bruno On Mon, Dec 30, 2019 at 12:06 AM Michelle Francois <mic.fra...@gmail.com> wrote: > > Hello, > I want to have two way communication in Apache Kafka and since the Apache > Kafka Topology permits no cyclic topology I was suggested by my supervisor > to use State Stores as remote objects. > > I created custom State Stores as described here: > https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores > > (I did not use interactive queries I just created some custom states stores > for my Kafka Processors) > > If I declare the state store as: > > * public class MyCustomStore<K,V> implements StateStore, > MyWriteableCustomStore<String, String>, Serializable* > and I pass my Custom State Stores to some remote method of another remote > object and since it implements Serializable it passes by value and I want > it to be passed by reference. > > I think my Custom State Store in order to be passed by reference should now > extend UnicastRemoteObject like that: > > *public class MyCustomStore<K,V> extends UnicastRemoteObject implements > StateStore, MyWriteableCustomStore<String, String>* > *...* > *public MyCustomStore(...) throws RemoteException{* > *...* > *}* > but now I get this Exception at initialization > > ERROR stream-thread [site-client-StreamThread-1] Encountered the following > error during processing: > (org.apache.kafka.streams.processor.internals.StreamThread:744) > java.lang.NullPointerException at > org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:226) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:225) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:314) > at > org.apache.kafka.streams.processor.StreamThread.runOnce(StreamThread.java:824) > at > org.apache.kafka.streams.processor.StreamThread.runLoop(StreamThread.java:767) > at > org.apache.kafka.streams.processor.StreamThread.run(StreamThread.java:736) > > > I don't list my code of State Stores since it is consisted of hundreds of > lines.If I have to I will enlist. > > My version of Apache Kafka Streams in the cluster I use is : > kafka-streams-2.0.0.3.1.0.0-78.jar (in kafka-broker/libs folder) > > Whenever an instance of my Custom State Store is created I use try- catch > clause since the constructor throws RemoteException > > Thanks in advance