> Hello All,
> I have been trying to create an application on top of Kafka Streams. I am
> newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
> wrong.
> I got the application running fine on a single instance ec2 instance in
> AWS. Now I am looking at scaling and ran in to some issues. The application
> has a global state store and couple of other local one's backed by RocksDB.
> It uses the processor API's and the stream is built using the
> TopologyBuilder. The global state store is fed by a topic which send a key
> value pair (both are protobuf objects) and connected to a processor which
> then transforms the value by applying some logic, finally stores the key
> and the modified data to the store. Similarly the local stores are
> connected via processors which are fed by different topics. Now the issue
> is that when I launch a new instance of the app, task re-allocation and
> state restoration happens, and the stores get replicated on to the new
> instance. But the global store which is replicated on to the new instance
> has some other data (I guess thats the raw data) as opposed to the
> processed data.
> *Application Topology*
> *Global Store*
> Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
> false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
> disabled) -> Global Store
> *Local Store*
> Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
> true)
>  -> LocalStoreProcessor (
> Persistent, Caching enabled, Logging enabled
> ) -> Local state stores on different partitions
> *Sample Code (Written in Kotlin)*
> val streams: KafkaStreams
> init {
>     val builder = KStreamBuilder().apply {
>         val globalStore = Stores.create(Config.DICTIONARY)
>                                 .withKeys(Serdes.String())
>                                 .withValues(Serdes.String())
>                                 .persistent()
>                                 .enableCaching()
>                                 .disableLogging()
>                                 .build() as 
> StateStoreSupplier<KeyValueStore<*, *>>
>         addGlobalStore(globalStore, "dictionary-words-source", 
> Serdes.String().deserializer(), Serdes.String().deserializer(),
>                 Config.DICTIONARY_WORDS_TOPIC, "dictionary-words-processor", 
> DictionaryWordsProcessor.Supplier)
>         addSource("query-source", Serdes.String().deserializer(), 
> Serdes.String().deserializer(), Config.QUERIES_TOPIC)
>         addProcessor("query-processor", QueryProcessor.Supplier, 
> "query-source")
>     }
>     val config = StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to 
>             StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
>             StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
>             ))
>     streams = KafkaStreams(builder, config)
>     Runtime.getRuntime().addShutdownHook(Thread {
>         println("Shutting down Kafka Streams...")
>         streams.close()
>         println("Shut down successfully")
>     })
> }
> fun run() {
>     Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1, 
>     Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT, 
>     streams.start()
> }
> *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
> instances.
> So just wanted to know the process of state store restoration while
> scaling up and down. How does the streams manage to restore the data? I was
> expecting when the new instance gets launched, the data flows through the
> same processor so that it gets modified using the same logic which is
> applied when it was stored in instance 1. Could you please help me
> understand this little better. Please let me know if there is anyway to get
> the restoration process to route the data via the same processor.
> Thanks,
> Tony

