Hello All, I have been trying to create an application on top of Kafka Streams. I am newbie to Kafka & Kakfa streams. So please excuse if I my understanding are wrong.
I got the application running fine on a single instance ec2 instance in AWS. Now I am looking at scaling and ran in to some issues. The application has a global state store and couple of other local one's backed by RocksDB. It uses the processor API's and the stream is built using the TopologyBuilder. The global state store is fed by a topic which send a key value pair (both are protobuf objects) and connected to a processor which then transforms the value by applying some logic, finally stores the key and the modified data to the store. Similarly the local stores are connected via processors which are fed by different topics. Now the issue is that when I launch a new instance of the app, task re-allocation and state restoration happens, and the stores get replicated on to the new instance. But the global store which is replicated on to the new instance has some other data (I guess thats the raw data) as opposed to the processed data. *Application Topology* *Global Store* Source Topic (Partition Count = 1, Replication Factor = 2, Compacted = false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging disabled) -> Global Store *Local Store* Source Topic (Partition Count = 16, Replication Factor = 2, Compacted = true) -> LocalStoreProcessor ( Persistent, Caching enabled, Logging enabled ) -> Local state stores on different partitions *Sample Code (Written in Kotlin)* val streams: KafkaStreams init { val builder = KStreamBuilder().apply { val globalStore = Stores.create(Config.DICTIONARY) .withKeys(Serdes.String()) .withValues(Serdes.String()) .persistent() .enableCaching() .disableLogging() .build() as StateStoreSupplier<KeyValueStore<*, *>> addGlobalStore(globalStore, "dictionary-words-source", Serdes.String().deserializer(), Serdes.String().deserializer(), Config.DICTIONARY_WORDS_TOPIC, "dictionary-words-processor", DictionaryWordsProcessor.Supplier) addSource("query-source", Serdes.String().deserializer(), Serdes.String().deserializer(), Config.QUERIES_TOPIC) addProcessor("query-processor", QueryProcessor.Supplier, "query-source") } val config = StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to Config.APPLICATION_ID, StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS, StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR )) streams = KafkaStreams(builder, config) Runtime.getRuntime().addShutdownHook(Thread { println("Shutting down Kafka Streams...") streams.close() println("Shut down successfully") }) } fun run() { Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1, Config.REPLICATION_FACTOR, true) Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT, Config.REPLICATION_FACTOR, false) streams.start() } *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application instances. So just wanted to know the process of state store restoration while scaling up and down. How does the streams manage to restore the data? I was expecting when the new instance gets launched, the data flows through the same processor so that it gets modified using the same logic which is applied when it was stored in instance 1. Could you please help me understand this little better. Please let me know if there is anyway to get the restoration process to route the data via the same processor. Thanks, Tony