Hi All, I want to use one state store in all my kafka stream threads in my application, how can i do it.
1. i created one topic (name: test2) with 3 partitions . 2. wrote kafka stream with num.stream.threads = 3 in java code 3. using state store (name: count2) in my application. But state store (count2) is acting like local to thread, but it should be unique to entire application and the same value to be reflected every where how can i do it ? Do i need to take care any synch also ? Code: ==== package com.javatpoint; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Properties; import java.lang.*; /** * Hello world! * */ public class App { public static void main( String[] args ) { /* StateStoreSupplier testStore = Stores.create("count2") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .build();*/ StateStoreSupplier testStore = Stores.create("count2") .withStringKeys() .withLongValues() .persistent() .build(); // TopologyBuilder builder = new TopologyBuilder(); final KStreamBuilder builder = new KStreamBuilder(); builder.addSource("source", "test2").addProcessor("process", TestProcessor::new, "source").addStateStore(testStore, "process"); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); // props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); props.put("auto.offset.reset", "latest"); props.put("num.stream.threads", 3); System.out.println("test1"); KafkaStreams streams = new KafkaStreams(builder, props); System.out.println("test2"); streams.start(); } // public static class TestProcessor implements Processor<byte[], byte[]> { public static class TestProcessor implements Processor<String, String> { private KeyValueStore<String, Long> kvStore; private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; // context.getStateStore("count2"); System.out.println("Initialized"); this.kvStore = (KeyValueStore<String, Long>) context.getStateStore("count2"); } @Override public void process(String k, String v) { // public void process(byte[] k, byte[] v) { System.out.println("Processing " + k + " -> " + v); try { Long oldValue = this.kvStore.get(v); System.out.println("Oldval " + oldValue + " -> Key " + v); if (oldValue == null) { this.kvStore.put(v, 1L); } else { this.kvStore.put(v, oldValue + 1L); } Thread.sleep(10000); } catch (Exception e) { System.out.println(e); } } @Override public void punctuate(long timestamp) { } @Override public void close() { } } }