Hi Ranjit, it sounds like you might want to use a global table for this.
You can use StreamsBuilder#globalTable(String, Materialized) to create the
global table. You could do something like:

KeyValueBytesStoreSupplier supplier =
Stores.inMemoryKeyValueStore("global-store");
Materialized<String, String, KeyValueStore<Bytes, byte[]>>
materialized = Materialized.as(supplier);
builder.globalTable("topic",
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));


On Fri, 10 Nov 2017 at 09:24 Ranjit Kumar <ranjit...@gmail.com> wrote:

> Hi Guozhang,
>
> Thanks for the information.
>
> My requirement is some thing like this.
>
> 1. i want to read the data from one topic (which is continuously feeding),
> so i though of using the kafka streams with threads
> 2. want to store the data in one in memory data base (not the local data
> store per thread)
>
> If i have to write my own Statestore logic with handling of synchronization
> is it equal to having my own global data structure in all threads ?
>
> Any performance impact will be their with our own sync ? Can you pelase
> share if you have any sample programs or links describing on this .
>
> Thanks & Regards,
> Ranjit
>
> On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Ranjit,
> >
> > Note that the "testStore" instance you are passing is a
> StateStoreSupplier
> > which will generate a new StateStore instance for each thread's task.
> >
> > If you really want to have all the thread's share the same state store
> you
> > should implement your own StateStoreSupplier that only return the same
> > StateStore instance in its "get()" call; however, keep in mind that in
> this
> > case this state store could be concurrently accessed by multi-threads
> which
> > is not protected by the library itself (by default single-thread access
> is
> > guaranteed on the state stores).
> >
> >
> > Guozhang
> >
> > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ranjit...@gmail.com>
> wrote:
> >
> > > Hi All,
> > >
> > > I want to use one state store in all my kafka stream threads in my
> > > application, how can i do it.
> > >
> > > 1. i created one topic (name: test2) with 3 partitions .
> > > 2. wrote kafka stream with num.stream.threads = 3 in java code
> > > 3. using state store (name: count2) in my application.
> > >
> > > But state store (count2) is acting like local to thread, but it should
> be
> > > unique to entire application and the same value to be reflected every
> > where
> > > how can i do it ?
> > >
> > > Do i need to take care any synch also ?
> > >
> > > Code:
> > > ====
> > > package com.javatpoint;
> > > import org.apache.kafka.common.serialization.Serdes;
> > > import org.apache.kafka.streams.KafkaStreams;
> > > import org.apache.kafka.streams.StreamsConfig;
> > > import org.apache.kafka.streams.processor.Processor;
> > > import org.apache.kafka.streams.processor.ProcessorContext;
> > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > import org.apache.kafka.streams.processor.TopologyBuilder;
> > > import org.apache.kafka.streams.state.Stores;
> > >
> > > import org.apache.kafka.streams.kstream.KStreamBuilder;
> > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > import org.apache.kafka.streams.state.KeyValueStore;
> > >
> > > import java.util.Properties;
> > > import java.lang.*;
> > >
> > > /**
> > >  * Hello world!
> > >  *
> > >  */
> > > public class App
> > > {
> > >     public static void main( String[] args )
> > >     {
> > > /*        StateStoreSupplier testStore = Stores.create("count2")
> > >                 .withKeys(Serdes.String())
> > >                 .withValues(Serdes.Long())
> > >                 .persistent()
> > >                 .build();*/
> > >         StateStoreSupplier testStore = Stores.create("count2")
> > >                 .withStringKeys()
> > >                 .withLongValues()
> > >                 .persistent()
> > >                 .build();
> > >
> > > //        TopologyBuilder builder = new TopologyBuilder();
> > >         final KStreamBuilder builder = new KStreamBuilder();
> > >
> > >         builder.addSource("source", "test2").addProcessor("process",
> > > TestProcessor::new, "source").addStateStore(testStore, "process");
> > >
> > >         Properties props = new Properties();
> > >         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
> > >         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > >         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass());
> > >         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass());
> > > //        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.ByteArray().getClass().getName());
> > > //        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.ByteArray().getClass().getName());
> > >
> > >         props.put("auto.offset.reset", "latest");
> > >         props.put("num.stream.threads", 3);
> > >
> > >             System.out.println("test1");
> > >         KafkaStreams streams = new KafkaStreams(builder, props);
> > >             System.out.println("test2");
> > >         streams.start();
> > >     }
> > >
> > > //    public static class TestProcessor implements Processor<byte[],
> > > byte[]> {
> > >     public static class TestProcessor implements Processor<String,
> > String>
> > > {
> > >          private  KeyValueStore<String, Long> kvStore;
> > >             private ProcessorContext context;
> > >
> > >         @Override
> > >         public void init(ProcessorContext context) {
> > >                 this.context = context;
> > > //            context.getStateStore("count2");
> > >             System.out.println("Initialized");
> > >                 this.kvStore = (KeyValueStore<String, Long>)
> > >                                     context.getStateStore("count2");
> > >
> > >         }
> > >
> > >         @Override
> > >        public void process(String k, String v) {
> > >  //       public void process(byte[] k, byte[] v) {
> > >             System.out.println("Processing " + k + " -> " + v);
> > >                 try {
> > >
> > >
> > >                         Long oldValue = this.kvStore.get(v);
> > >             System.out.println("Oldval " + oldValue + " -> Key " + v);
> > >                         if (oldValue == null) {
> > >                                 this.kvStore.put(v, 1L);
> > >                         }
> > >                         else
> > >                         {
> > >                                 this.kvStore.put(v, oldValue + 1L);
> > >                         }
> > >                           Thread.sleep(10000);
> > >            } catch (Exception e) {
> > >              System.out.println(e);
> > >            }
> > >         }
> > >
> > >         @Override
> > >         public void punctuate(long timestamp) {
> > >
> > >         }
> > >
> > >         @Override
> > >         public void close() {
> > >
> > >         }
> > >     }
> > > }
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to