Hi Damian, Thanks a lot for the response. Just saw your reply when I visited the mailer-list archive <http://mail-archives.apache.org/mod_mbox/kafka-users/201710.mbox/browser>. Unfortunately I haven't received the same on my inbox and I didn't even see the update in the archive when I checked earlier today. Anyways once again thanks a lot for the response. I will raise a JIRA as you suggested and I hope this isn't the case with local state stores.
Thanks, Tony On Wed, Oct 18, 2017 at 9:21 PM, Tony John <tonyjohnant...@gmail.com> wrote: > Hello All, > > I have been trying to create an application on top of Kafka Streams. I am > newbie to Kafka & Kakfa streams. So please excuse if I my understanding are > wrong. > > I got the application running fine on a single instance ec2 instance in > AWS. Now I am looking at scaling and ran in to some issues. The application > has a global state store and couple of other local one's backed by RocksDB. > It uses the processor API's and the stream is built using the > TopologyBuilder. The global state store is fed by a topic which send a key > value pair (both are protobuf objects) and connected to a processor which > then transforms the value by applying some logic, finally stores the key > and the modified data to the store. Similarly the local stores are > connected via processors which are fed by different topics. Now the issue > is that when I launch a new instance of the app, task re-allocation and > state restoration happens, and the stores get replicated on to the new > instance. But the global store which is replicated on to the new instance > has some other data (I guess thats the raw data) as opposed to the > processed data. > > *Application Topology* > > *Global Store* > > Source Topic (Partition Count = 1, Replication Factor = 2, Compacted = > false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging > disabled) -> Global Store > > *Local Store* > > Source Topic (Partition Count = 16, Replication Factor = 2, Compacted = > true) > > -> LocalStoreProcessor ( > Persistent, Caching enabled, Logging enabled > > ) -> Local state stores on different partitions > > *Sample Code (Written in Kotlin)* > > val streams: KafkaStreams > init { > val builder = KStreamBuilder().apply { > > val globalStore = Stores.create(Config.DICTIONARY) > .withKeys(Serdes.String()) > .withValues(Serdes.String()) > .persistent() > .enableCaching() > .disableLogging() > .build() as > StateStoreSupplier<KeyValueStore<*, *>> > > addGlobalStore(globalStore, "dictionary-words-source", > Serdes.String().deserializer(), Serdes.String().deserializer(), > Config.DICTIONARY_WORDS_TOPIC, "dictionary-words-processor", > DictionaryWordsProcessor.Supplier) > > > addSource("query-source", Serdes.String().deserializer(), > Serdes.String().deserializer(), Config.QUERIES_TOPIC) > addProcessor("query-processor", QueryProcessor.Supplier, > "query-source") > > } > > val config = StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to > Config.APPLICATION_ID, > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS, > StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR > )) > streams = KafkaStreams(builder, config) > > Runtime.getRuntime().addShutdownHook(Thread { > println("Shutting down Kafka Streams...") > streams.close() > println("Shut down successfully") > }) > } > > fun run() { > Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1, > Config.REPLICATION_FACTOR, true) > Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT, > Config.REPLICATION_FACTOR, false) > streams.start() > } > > > *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application > instances. > > > So just wanted to know the process of state store restoration while > scaling up and down. How does the streams manage to restore the data? I was > expecting when the new instance gets launched, the data flows through the > same processor so that it gets modified using the same logic which is > applied when it was stored in instance 1. Could you please help me > understand this little better. Please let me know if there is anyway to get > the restoration process to route the data via the same processor. > > > Thanks, > Tony >