Hello,
I want to have two way communication in Apache Kafka and since the Apache
Kafka Topology permits no cyclic topology I was suggested by my supervisor
to use State Stores as remote objects.

I created custom State Stores as described here:
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores

(I did not use interactive queries I just created some custom states stores
for my Kafka Processors)

If I declare the state store as:

* public class MyCustomStore<K,V> implements StateStore,
MyWriteableCustomStore<String, String>, Serializable*
and I pass my Custom State Stores to some remote method of another remote
object and since it implements Serializable it passes by value and I want
it to be passed by reference.

I think my Custom State Store in order to be passed by reference should now
extend  UnicastRemoteObject like that:

*public class MyCustomStore<K,V> extends UnicastRemoteObject implements
StateStore, MyWriteableCustomStore<String, String>*
*...*
*public MyCustomStore(...) throws RemoteException{*
*...*
*}*
but now I get this Exception at initialization

ERROR stream-thread [site-client-StreamThread-1] Encountered the following
error during processing:
(org.apache.kafka.streams.processor.internals.StreamThread:744)
java.lang.NullPointerException at
org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:226)
                                             at
org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:225)
                                             at
org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
                                             at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:314)
                                             at
org.apache.kafka.streams.processor.StreamThread.runOnce(StreamThread.java:824)
                                             at
org.apache.kafka.streams.processor.StreamThread.runLoop(StreamThread.java:767)
                                             at
org.apache.kafka.streams.processor.StreamThread.run(StreamThread.java:736)


I don't list my code of State Stores since it is consisted of hundreds of
lines.If I have to I will enlist.

My version of Apache Kafka Streams in the cluster I use is :
kafka-streams-2.0.0.3.1.0.0-78.jar (in kafka-broker/libs folder)

Whenever an instance of my Custom State Store is created  I use try- catch
clause since the constructor throws RemoteException

Thanks in advance

Reply via email to