Hi All,

I am new to kafka.

I have some fundamental questions related to choosing architecture for m
requirement can please suggest me.

I am developing some IOT project, please find the details below.

1. I have moving objects which are  continuously sending their position +
some other information  to central cloud every second and application VM is
running on cloud server to receive the data
2. Every moving object information application VM need to distribute to all
other objects with in 200mt's radius of that particular object


I am doing in this way, please suggest me .

My Question is *"How can i access central state store in all threads to
access all the objects"*

[image: Inline image 1]


all threads state store information is very dynamic and every thread
requires Global DB and should be persistent.

Can you please provide me gobal state store example code with stream.

Every kafka stream will decode this data and will do some processing also
will this be a good design ?

Thanks & Regards,
Ranjit

















On Fri, Nov 10, 2017 at 4:06 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi Ranjit, it sounds like you might want to use a global table for this.
> You can use StreamsBuilder#globalTable(String, Materialized) to create the
> global table. You could do something like:
>
> KeyValueBytesStoreSupplier supplier =
> Stores.inMemoryKeyValueStore("global-store");
> Materialized<String, String, KeyValueStore<Bytes, byte[]>>
> materialized = Materialized.as(supplier);
> builder.globalTable("topic",
> materialized.withKeySerde(Serdes.String()).withValueSerde(
> Serdes.String()));
>
>
> On Fri, 10 Nov 2017 at 09:24 Ranjit Kumar <ranjit...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for the information.
> >
> > My requirement is some thing like this.
> >
> > 1. i want to read the data from one topic (which is continuously
> feeding),
> > so i though of using the kafka streams with threads
> > 2. want to store the data in one in memory data base (not the local data
> > store per thread)
> >
> > If i have to write my own Statestore logic with handling of
> synchronization
> > is it equal to having my own global data structure in all threads ?
> >
> > Any performance impact will be their with our own sync ? Can you pelase
> > share if you have any sample programs or links describing on this .
> >
> > Thanks & Regards,
> > Ranjit
> >
> > On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Ranjit,
> > >
> > > Note that the "testStore" instance you are passing is a
> > StateStoreSupplier
> > > which will generate a new StateStore instance for each thread's task.
> > >
> > > If you really want to have all the thread's share the same state store
> > you
> > > should implement your own StateStoreSupplier that only return the same
> > > StateStore instance in its "get()" call; however, keep in mind that in
> > this
> > > case this state store could be concurrently accessed by multi-threads
> > which
> > > is not protected by the library itself (by default single-thread access
> > is
> > > guaranteed on the state stores).
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ranjit...@gmail.com>
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I want to use one state store in all my kafka stream threads in my
> > > > application, how can i do it.
> > > >
> > > > 1. i created one topic (name: test2) with 3 partitions .
> > > > 2. wrote kafka stream with num.stream.threads = 3 in java code
> > > > 3. using state store (name: count2) in my application.
> > > >
> > > > But state store (count2) is acting like local to thread, but it
> should
> > be
> > > > unique to entire application and the same value to be reflected every
> > > where
> > > > how can i do it ?
> > > >
> > > > Do i need to take care any synch also ?
> > > >
> > > > Code:
> > > > ====
> > > > package com.javatpoint;
> > > > import org.apache.kafka.common.serialization.Serdes;
> > > > import org.apache.kafka.streams.KafkaStreams;
> > > > import org.apache.kafka.streams.StreamsConfig;
> > > > import org.apache.kafka.streams.processor.Processor;
> > > > import org.apache.kafka.streams.processor.ProcessorContext;
> > > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > > import org.apache.kafka.streams.processor.TopologyBuilder;
> > > > import org.apache.kafka.streams.state.Stores;
> > > >
> > > > import org.apache.kafka.streams.kstream.KStreamBuilder;
> > > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > > import org.apache.kafka.streams.state.KeyValueStore;
> > > >
> > > > import java.util.Properties;
> > > > import java.lang.*;
> > > >
> > > > /**
> > > >  * Hello world!
> > > >  *
> > > >  */
> > > > public class App
> > > > {
> > > >     public static void main( String[] args )
> > > >     {
> > > > /*        StateStoreSupplier testStore = Stores.create("count2")
> > > >                 .withKeys(Serdes.String())
> > > >                 .withValues(Serdes.Long())
> > > >                 .persistent()
> > > >                 .build();*/
> > > >         StateStoreSupplier testStore = Stores.create("count2")
> > > >                 .withStringKeys()
> > > >                 .withLongValues()
> > > >                 .persistent()
> > > >                 .build();
> > > >
> > > > //        TopologyBuilder builder = new TopologyBuilder();
> > > >         final KStreamBuilder builder = new KStreamBuilder();
> > > >
> > > >         builder.addSource("source", "test2").addProcessor("process",
> > > > TestProcessor::new, "source").addStateStore(testStore, "process");
> > > >
> > > >         Properties props = new Properties();
> > > >         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
> > > >         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > > "localhost:9092");
> > > >         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > > Serdes.String().getClass());
> > > >         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > > Serdes.String().getClass());
> > > > //        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > > Serdes.ByteArray().getClass().getName());
> > > > //        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > > Serdes.ByteArray().getClass().getName());
> > > >
> > > >         props.put("auto.offset.reset", "latest");
> > > >         props.put("num.stream.threads", 3);
> > > >
> > > >             System.out.println("test1");
> > > >         KafkaStreams streams = new KafkaStreams(builder, props);
> > > >             System.out.println("test2");
> > > >         streams.start();
> > > >     }
> > > >
> > > > //    public static class TestProcessor implements Processor<byte[],
> > > > byte[]> {
> > > >     public static class TestProcessor implements Processor<String,
> > > String>
> > > > {
> > > >          private  KeyValueStore<String, Long> kvStore;
> > > >             private ProcessorContext context;
> > > >
> > > >         @Override
> > > >         public void init(ProcessorContext context) {
> > > >                 this.context = context;
> > > > //            context.getStateStore("count2");
> > > >             System.out.println("Initialized");
> > > >                 this.kvStore = (KeyValueStore<String, Long>)
> > > >                                     context.getStateStore("count2");
> > > >
> > > >         }
> > > >
> > > >         @Override
> > > >        public void process(String k, String v) {
> > > >  //       public void process(byte[] k, byte[] v) {
> > > >             System.out.println("Processing " + k + " -> " + v);
> > > >                 try {
> > > >
> > > >
> > > >                         Long oldValue = this.kvStore.get(v);
> > > >             System.out.println("Oldval " + oldValue + " -> Key " +
> v);
> > > >                         if (oldValue == null) {
> > > >                                 this.kvStore.put(v, 1L);
> > > >                         }
> > > >                         else
> > > >                         {
> > > >                                 this.kvStore.put(v, oldValue + 1L);
> > > >                         }
> > > >                           Thread.sleep(10000);
> > > >            } catch (Exception e) {
> > > >              System.out.println(e);
> > > >            }
> > > >         }
> > > >
> > > >         @Override
> > > >         public void punctuate(long timestamp) {
> > > >
> > > >         }
> > > >
> > > >         @Override
> > > >         public void close() {
> > > >
> > > >         }
> > > >     }
> > > > }
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to