Hi Damian,

Thanks a lot for the response. Just saw your reply when I visited the
mailer-list
archive
<http://mail-archives.apache.org/mod_mbox/kafka-users/201710.mbox/browser>.
Unfortunately I haven't received the same on my inbox and I didn't even see
the update in the archive when I checked earlier today. Anyways once again
thanks a lot for the response. I will raise a JIRA as you suggested and I
hope this isn't the case with local state stores.

Thanks,
Tony

On Wed, Oct 18, 2017 at 9:21 PM, Tony John <tonyjohnant...@gmail.com> wrote:

> Hello All,
>
> I have been trying to create an application on top of Kafka Streams. I am
> newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
> wrong.
>
> I got the application running fine on a single instance ec2 instance in
> AWS. Now I am looking at scaling and ran in to some issues. The application
> has a global state store and couple of other local one's backed by RocksDB.
> It uses the processor API's and the stream is built using the
> TopologyBuilder. The global state store is fed by a topic which send a key
> value pair (both are protobuf objects) and connected to a processor which
> then transforms the value by applying some logic, finally stores the key
> and the modified data to the store. Similarly the local stores are
> connected via processors which are fed by different topics. Now the issue
> is that when I launch a new instance of the app, task re-allocation and
> state restoration happens, and the stores get replicated on to the new
> instance. But the global store which is replicated on to the new instance
> has some other data (I guess thats the raw data) as opposed to the
> processed data.
>
> *Application Topology*
>
> *Global Store*
>
> Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
> false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
> disabled) -> Global Store
>
> *Local Store*
>
> Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
> true)
>
>  -> LocalStoreProcessor (
> Persistent, Caching enabled, Logging enabled
>
> ) -> Local state stores on different partitions
>
> *Sample Code (Written in Kotlin)*
>
> val streams: KafkaStreams
> init {
>     val builder = KStreamBuilder().apply {
>
>         val globalStore = Stores.create(Config.DICTIONARY)
>                                 .withKeys(Serdes.String())
>                                 .withValues(Serdes.String())
>                                 .persistent()
>                                 .enableCaching()
>                                 .disableLogging()
>                                 .build() as 
> StateStoreSupplier<KeyValueStore<*, *>>
>
>         addGlobalStore(globalStore, "dictionary-words-source", 
> Serdes.String().deserializer(), Serdes.String().deserializer(),
>                 Config.DICTIONARY_WORDS_TOPIC, "dictionary-words-processor", 
> DictionaryWordsProcessor.Supplier)
>
>
>         addSource("query-source", Serdes.String().deserializer(), 
> Serdes.String().deserializer(), Config.QUERIES_TOPIC)
>         addProcessor("query-processor", QueryProcessor.Supplier, 
> "query-source")
>
>     }
>
>     val config = StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to 
> Config.APPLICATION_ID,
>             StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
>             StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
>             ))
>     streams = KafkaStreams(builder, config)
>
>     Runtime.getRuntime().addShutdownHook(Thread {
>         println("Shutting down Kafka Streams...")
>         streams.close()
>         println("Shut down successfully")
>     })
> }
>
> fun run() {
>     Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1, 
> Config.REPLICATION_FACTOR, true)
>     Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT, 
> Config.REPLICATION_FACTOR, false)
>     streams.start()
> }
>
>
> *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
> instances.
>
>
> So just wanted to know the process of state store restoration while
> scaling up and down. How does the streams manage to restore the data? I was
> expecting when the new instance gets launched, the data flows through the
> same processor so that it gets modified using the same logic which is
> applied when it was stored in instance 1. Could you please help me
> understand this little better. Please let me know if there is anyway to get
> the restoration process to route the data via the same processor.
>
>
> Thanks,
> Tony
>

Reply via email to