Hi All,

I want to use one state store in all my kafka stream threads in my
application, how can i do it.

1. i created one topic (name: test2) with 3 partitions .
2. wrote kafka stream with num.stream.threads = 3 in java code
3. using state store (name: count2) in my application.

But state store (count2) is acting like local to thread, but it should be
unique to entire application and the same value to be reflected every where
how can i do it ?

Do i need to take care any synch also ?

Code:
====
package com.javatpoint;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.Stores;

import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Properties;
import java.lang.*;

/**
 * Hello world!
 *
 */
public class App
{
    public static void main( String[] args )
    {
/*        StateStoreSupplier testStore = Stores.create("count2")
                .withKeys(Serdes.String())
                .withValues(Serdes.Long())
                .persistent()
                .build();*/
        StateStoreSupplier testStore = Stores.create("count2")
                .withStringKeys()
                .withLongValues()
                .persistent()
                .build();

//        TopologyBuilder builder = new TopologyBuilder();
        final KStreamBuilder builder = new KStreamBuilder();

        builder.addSource("source", "test2").addProcessor("process",
TestProcessor::new, "source").addStateStore(testStore, "process");

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
//        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
//        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());

        props.put("auto.offset.reset", "latest");
        props.put("num.stream.threads", 3);

            System.out.println("test1");
        KafkaStreams streams = new KafkaStreams(builder, props);
            System.out.println("test2");
        streams.start();
    }

//    public static class TestProcessor implements Processor<byte[],
byte[]> {
    public static class TestProcessor implements Processor<String, String> {
         private  KeyValueStore<String, Long> kvStore;
            private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
                this.context = context;
//            context.getStateStore("count2");
            System.out.println("Initialized");
                this.kvStore = (KeyValueStore<String, Long>)
                                    context.getStateStore("count2");

        }

        @Override
       public void process(String k, String v) {
 //       public void process(byte[] k, byte[] v) {
            System.out.println("Processing " + k + " -> " + v);
                try {


                        Long oldValue = this.kvStore.get(v);
            System.out.println("Oldval " + oldValue + " -> Key " + v);
                        if (oldValue == null) {
                                this.kvStore.put(v, 1L);
                        }
                        else
                        {
                                this.kvStore.put(v, oldValue + 1L);
                        }
                          Thread.sleep(10000);
           } catch (Exception e) {
             System.out.println(e);
           }
        }

        @Override
        public void punctuate(long timestamp) {

        }

        @Override
        public void close() {

        }
    }
}

Reply via email to