Hi Ranjit, it sounds like you might want to use a global table for this. You can use StreamsBuilder#globalTable(String, Materialized) to create the global table. You could do something like:
KeyValueBytesStoreSupplier supplier = Stores.inMemoryKeyValueStore("global-store"); Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as(supplier); builder.globalTable("topic", materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())); On Fri, 10 Nov 2017 at 09:24 Ranjit Kumar <ranjit...@gmail.com> wrote: > Hi Guozhang, > > Thanks for the information. > > My requirement is some thing like this. > > 1. i want to read the data from one topic (which is continuously feeding), > so i though of using the kafka streams with threads > 2. want to store the data in one in memory data base (not the local data > store per thread) > > If i have to write my own Statestore logic with handling of synchronization > is it equal to having my own global data structure in all threads ? > > Any performance impact will be their with our own sync ? Can you pelase > share if you have any sample programs or links describing on this . > > Thanks & Regards, > Ranjit > > On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Ranjit, > > > > Note that the "testStore" instance you are passing is a > StateStoreSupplier > > which will generate a new StateStore instance for each thread's task. > > > > If you really want to have all the thread's share the same state store > you > > should implement your own StateStoreSupplier that only return the same > > StateStore instance in its "get()" call; however, keep in mind that in > this > > case this state store could be concurrently accessed by multi-threads > which > > is not protected by the library itself (by default single-thread access > is > > guaranteed on the state stores). > > > > > > Guozhang > > > > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ranjit...@gmail.com> > wrote: > > > > > Hi All, > > > > > > I want to use one state store in all my kafka stream threads in my > > > application, how can i do it. > > > > > > 1. i created one topic (name: test2) with 3 partitions . > > > 2. wrote kafka stream with num.stream.threads = 3 in java code > > > 3. using state store (name: count2) in my application. > > > > > > But state store (count2) is acting like local to thread, but it should > be > > > unique to entire application and the same value to be reflected every > > where > > > how can i do it ? > > > > > > Do i need to take care any synch also ? > > > > > > Code: > > > ==== > > > package com.javatpoint; > > > import org.apache.kafka.common.serialization.Serdes; > > > import org.apache.kafka.streams.KafkaStreams; > > > import org.apache.kafka.streams.StreamsConfig; > > > import org.apache.kafka.streams.processor.Processor; > > > import org.apache.kafka.streams.processor.ProcessorContext; > > > import org.apache.kafka.streams.processor.StateStoreSupplier; > > > import org.apache.kafka.streams.processor.TopologyBuilder; > > > import org.apache.kafka.streams.state.Stores; > > > > > > import org.apache.kafka.streams.kstream.KStreamBuilder; > > > import org.apache.kafka.streams.processor.StateStoreSupplier; > > > import org.apache.kafka.streams.state.KeyValueStore; > > > > > > import java.util.Properties; > > > import java.lang.*; > > > > > > /** > > > * Hello world! > > > * > > > */ > > > public class App > > > { > > > public static void main( String[] args ) > > > { > > > /* StateStoreSupplier testStore = Stores.create("count2") > > > .withKeys(Serdes.String()) > > > .withValues(Serdes.Long()) > > > .persistent() > > > .build();*/ > > > StateStoreSupplier testStore = Stores.create("count2") > > > .withStringKeys() > > > .withLongValues() > > > .persistent() > > > .build(); > > > > > > // TopologyBuilder builder = new TopologyBuilder(); > > > final KStreamBuilder builder = new KStreamBuilder(); > > > > > > builder.addSource("source", "test2").addProcessor("process", > > > TestProcessor::new, "source").addStateStore(testStore, "process"); > > > > > > Properties props = new Properties(); > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1"); > > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > "localhost:9092"); > > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass()); > > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass()); > > > // props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > Serdes.ByteArray().getClass().getName()); > > > // props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > Serdes.ByteArray().getClass().getName()); > > > > > > props.put("auto.offset.reset", "latest"); > > > props.put("num.stream.threads", 3); > > > > > > System.out.println("test1"); > > > KafkaStreams streams = new KafkaStreams(builder, props); > > > System.out.println("test2"); > > > streams.start(); > > > } > > > > > > // public static class TestProcessor implements Processor<byte[], > > > byte[]> { > > > public static class TestProcessor implements Processor<String, > > String> > > > { > > > private KeyValueStore<String, Long> kvStore; > > > private ProcessorContext context; > > > > > > @Override > > > public void init(ProcessorContext context) { > > > this.context = context; > > > // context.getStateStore("count2"); > > > System.out.println("Initialized"); > > > this.kvStore = (KeyValueStore<String, Long>) > > > context.getStateStore("count2"); > > > > > > } > > > > > > @Override > > > public void process(String k, String v) { > > > // public void process(byte[] k, byte[] v) { > > > System.out.println("Processing " + k + " -> " + v); > > > try { > > > > > > > > > Long oldValue = this.kvStore.get(v); > > > System.out.println("Oldval " + oldValue + " -> Key " + v); > > > if (oldValue == null) { > > > this.kvStore.put(v, 1L); > > > } > > > else > > > { > > > this.kvStore.put(v, oldValue + 1L); > > > } > > > Thread.sleep(10000); > > > } catch (Exception e) { > > > System.out.println(e); > > > } > > > } > > > > > > @Override > > > public void punctuate(long timestamp) { > > > > > > } > > > > > > @Override > > > public void close() { > > > > > > } > > > } > > > } > > > > > > > > > > > -- > > -- Guozhang > > >