Hello All,

I have been trying to create an application on top of Kafka Streams. I am
newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
wrong.

I got the application running fine on a single instance ec2 instance in
AWS. Now I am looking at scaling and ran in to some issues. The application
has a global state store and couple of other local one's backed by RocksDB.
It uses the processor API's and the stream is built using the
TopologyBuilder. The global state store is fed by a topic which send a key
value pair (both are protobuf objects) and connected to a processor which
then transforms the value by applying some logic, finally stores the key
and the modified data to the store. Similarly the local stores are
connected via processors which are fed by different topics. Now the issue
is that when I launch a new instance of the app, task re-allocation and
state restoration happens, and the stores get replicated on to the new
instance. But the global store which is replicated on to the new instance
has some other data (I guess thats the raw data) as opposed to the
processed data.

*Application Topology*

*Global Store*

Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
disabled) -> Global Store

*Local Store*

Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
true)

 -> LocalStoreProcessor (
Persistent, Caching enabled, Logging enabled

) -> Local state stores on different partitions

*Sample Code (Written in Kotlin)*

val streams: KafkaStreams
init {
    val builder = KStreamBuilder().apply {

        val globalStore = Stores.create(Config.DICTIONARY)
                                .withKeys(Serdes.String())
                                .withValues(Serdes.String())
                                .persistent()
                                .enableCaching()
                                .disableLogging()
                                .build() as
StateStoreSupplier<KeyValueStore<*, *>>

        addGlobalStore(globalStore, "dictionary-words-source",
Serdes.String().deserializer(), Serdes.String().deserializer(),
                Config.DICTIONARY_WORDS_TOPIC,
"dictionary-words-processor", DictionaryWordsProcessor.Supplier)


        addSource("query-source", Serdes.String().deserializer(),
Serdes.String().deserializer(), Config.QUERIES_TOPIC)
        addProcessor("query-processor", QueryProcessor.Supplier, "query-source")

    }

    val config =
StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to
Config.APPLICATION_ID,
            StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
            StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
            ))
    streams = KafkaStreams(builder, config)

    Runtime.getRuntime().addShutdownHook(Thread {
        println("Shutting down Kafka Streams...")
        streams.close()
        println("Shut down successfully")
    })
}

fun run() {
    Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1,
Config.REPLICATION_FACTOR, true)
    Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT,
Config.REPLICATION_FACTOR, false)
    streams.start()
}


*Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
instances.


So just wanted to know the process of state store restoration while scaling
up and down. How does the streams manage to restore the data? I was
expecting when the new instance gets launched, the data flows through the
same processor so that it gets modified using the same logic which is
applied when it was stored in instance 1. Could you please help me
understand this little better. Please let me know if there is anyway to get
the restoration process to route the data via the same processor.


Thanks,
Tony

Reply via email to