Ranjit,

Note that the "testStore" instance you are passing is a StateStoreSupplier
which will generate a new StateStore instance for each thread's task.

If you really want to have all the thread's share the same state store you
should implement your own StateStoreSupplier that only return the same
StateStore instance in its "get()" call; however, keep in mind that in this
case this state store could be concurrently accessed by multi-threads which
is not protected by the library itself (by default single-thread access is
guaranteed on the state stores).


Guozhang

On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ranjit...@gmail.com> wrote:

> Hi All,
>
> I want to use one state store in all my kafka stream threads in my
> application, how can i do it.
>
> 1. i created one topic (name: test2) with 3 partitions .
> 2. wrote kafka stream with num.stream.threads = 3 in java code
> 3. using state store (name: count2) in my application.
>
> But state store (count2) is acting like local to thread, but it should be
> unique to entire application and the same value to be reflected every where
> how can i do it ?
>
> Do i need to take care any synch also ?
>
> Code:
> ====
> package com.javatpoint;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.processor.StateStoreSupplier;
> import org.apache.kafka.streams.processor.TopologyBuilder;
> import org.apache.kafka.streams.state.Stores;
>
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.processor.StateStoreSupplier;
> import org.apache.kafka.streams.state.KeyValueStore;
>
> import java.util.Properties;
> import java.lang.*;
>
> /**
>  * Hello world!
>  *
>  */
> public class App
> {
>     public static void main( String[] args )
>     {
> /*        StateStoreSupplier testStore = Stores.create("count2")
>                 .withKeys(Serdes.String())
>                 .withValues(Serdes.Long())
>                 .persistent()
>                 .build();*/
>         StateStoreSupplier testStore = Stores.create("count2")
>                 .withStringKeys()
>                 .withLongValues()
>                 .persistent()
>                 .build();
>
> //        TopologyBuilder builder = new TopologyBuilder();
>         final KStreamBuilder builder = new KStreamBuilder();
>
>         builder.addSource("source", "test2").addProcessor("process",
> TestProcessor::new, "source").addStateStore(testStore, "process");
>
>         Properties props = new Properties();
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
>         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
>         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> //        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.ByteArray().getClass().getName());
> //        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.ByteArray().getClass().getName());
>
>         props.put("auto.offset.reset", "latest");
>         props.put("num.stream.threads", 3);
>
>             System.out.println("test1");
>         KafkaStreams streams = new KafkaStreams(builder, props);
>             System.out.println("test2");
>         streams.start();
>     }
>
> //    public static class TestProcessor implements Processor<byte[],
> byte[]> {
>     public static class TestProcessor implements Processor<String, String>
> {
>          private  KeyValueStore<String, Long> kvStore;
>             private ProcessorContext context;
>
>         @Override
>         public void init(ProcessorContext context) {
>                 this.context = context;
> //            context.getStateStore("count2");
>             System.out.println("Initialized");
>                 this.kvStore = (KeyValueStore<String, Long>)
>                                     context.getStateStore("count2");
>
>         }
>
>         @Override
>        public void process(String k, String v) {
>  //       public void process(byte[] k, byte[] v) {
>             System.out.println("Processing " + k + " -> " + v);
>                 try {
>
>
>                         Long oldValue = this.kvStore.get(v);
>             System.out.println("Oldval " + oldValue + " -> Key " + v);
>                         if (oldValue == null) {
>                                 this.kvStore.put(v, 1L);
>                         }
>                         else
>                         {
>                                 this.kvStore.put(v, oldValue + 1L);
>                         }
>                           Thread.sleep(10000);
>            } catch (Exception e) {
>              System.out.println(e);
>            }
>         }
>
>         @Override
>         public void punctuate(long timestamp) {
>
>         }
>
>         @Override
>         public void close() {
>
>         }
>     }
> }
>



-- 
-- Guozhang

Reply via email to