Hello Hamza, Returning the same instance of the processor is not recommended in Kafka Streams, since when you have multiple tasks or threads within the same instance, they will call the topology builder, hence the processor supplier to construct a topology for each task; if the same processor is always returned from the supplier it will mean that multiple threads or tasks are sharing the processor, which is risky.
As for your application, since the `ProcessorStatsByMinute` class is not provided I cannot tell for sure what was the issue. But I'd recommend changing the supplier classes to always return a different processor when calling `get`, and see if it fixes the issue. Guozhang On Tue, Sep 20, 2016 at 12:20 AM, Hamza HACHANI <hamza.hach...@supcom.tn> wrote: > Hi Guozhang, > > Here is the code for the two concerned classes > > If this can help i fugure out that the instances of > > ProcessorStatsByHourSupplier and ProcessorStatsByMinuteSupplier which > are returned are the same. > > I think this is the problem. I tried to fix it but i was not to do it. > > > Thanks Guozhang > > Hamza > > > ---------------------- > > public class StatsByMinute { > > public static void main(String[] args) throws Exception { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "processorByMinute"); > // Where to find Kafka broker(s). > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, " > 192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092"); > // Where to find the corresponding ZooKeeper ensemble. > props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, " > 192.168.1.82:2181"); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > > TopologyBuilder builder = new TopologyBuilder(); > > builder.addSource("Source", "uplink"); > > String countStoreName= "CountsStore" + System.currentTimeMillis(); > //// > builder.addProcessor("Process", new ProcessorStatsByMinuteSupplier(new > ProcessorStatsByMinute(1, countStoreName)), "Source"); > builder.addStateStore(Stores.create(countStoreName). > withStringKeys().withStringValues().inMemory().build(), "Process"); > builder.addSink("Sink", "statsM", Serdes.String().serializer(), > Serdes.String().serializer(), "Process"); > KafkaStreams streams = new KafkaStreams(builder, props); > streams.start(); > } > } > > > --------------------- > > public class StatsByHour { > > public static void main(String[] args) throws Exception { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByHour"); > // Where to find Kafka broker(s). > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, " > 192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092"); > // Where to find the corresponding ZooKeeper ensemble. > props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, " > 192.168.1.82:2181"); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > > TopologyBuilder builder = new TopologyBuilder(); > > builder.addSource("Source", "statsM"); > > String countStoreName= "CountsStore" + System.currentTimeMillis(); > //// > ProcessorStatsByHourSupplier processorStatsByHourSupplier = new > ProcessorStatsByHourSupplier(new ProcessorStatsByHour(3, countStoreName)); > System.out.println(processorStatsByHourSupplier); > builder.addProcessor("Process", processorStatsByHourSupplier, > "Source"); > builder.addStateStore(Stores.create(countStoreName). > withStringKeys().withStringValues().inMemory().build(), "Process"); > builder.addSink("Sink", "statsH", Serdes.String().serializer(), > Serdes.String().serializer(), "Process"); > KafkaStreams streams = new KafkaStreams(builder, props); > streams.start(); > } > } > > > --------------- > > public class ProcessorStatsByHour extends BaseProcessor { > > public ProcessorStatsByHour(int countTimeUnit, String countStoreName) { > super(TimeUnit.MINUTES, countTimeUnit, countStoreName); > } > > @Override > public void process(String key, String json) { > Stat stat = deserializeStat(json); > if(stat != null) { > if ((stat.getNetworkPartnerId() == null) && > (stat.getBaseStationId() == null)) { > String opKey = stat.getOperatorId(); > Stat statOp = deserializeStat(this.kvStore.get(opKey)); > if (statOp == null) { > statOp = new Stat(); > statOp.setCount(stat.getCount()); > statOp.setOperatorId(stat.getOperatorId()); > this.kvStore.put(opKey, serializeStat(statOp)); > } else { > statOp.setCount(statOp.getCount() + stat.getCount()); > this.kvStore.put(opKey, serializeStat(statOp)); > } > } else if (stat.getBaseStationId() == null) { > String npKey = stat.getOperatorId() + "_" + > stat.getNetworkPartnerId(); > Stat statNp = deserializeStat(this.kvStore.get(npKey)); > if (statNp == null) { > statNp = new Stat(); > statNp.setCount(stat.getCount()); > statNp.setOperatorId(stat.getOperatorId()); > statNp.setNetworkPartnerId( > stat.getNetworkPartnerId()); > this.kvStore.put(npKey, serializeStat(statNp)); > } else { > statNp.setCount(statNp.getCount() + stat.getCount()); > this.kvStore.put(npKey, serializeStat(statNp)); > } > > } else { > String bsKey = stat.getOperatorId() + "_" + > stat.getNetworkPartnerId() + "_" + stat.getBaseStationId(); > Stat statBs = deserializeStat(this.kvStore.get(bsKey)); > if (statBs == null) { > statBs = new Stat(); > statBs.setCount(stat.getCount()); > statBs.setOperatorId(stat.getOperatorId()); > statBs.setNetworkPartnerId( > stat.getNetworkPartnerId()); > statBs.setBaseStationId(stat.getBaseStationId()); > this.kvStore.put(bsKey, serializeStat(statBs)); > } else { > statBs.setCount(statBs.getCount() + stat.getCount()); > this.kvStore.put(bsKey, serializeStat(statBs)); > } > > > } > } > } > } > > ---------------------------------------------------------------- > > public class ProcessorStatsByDay extends BaseProcessor { > > public ProcessorStatsByDay(int countTimeUnit, String countStoreName) { > super(TimeUnit.MINUTES, countTimeUnit, countStoreName); > } > > @Override > public void process(String key, String json) { > Stat stat = deserializeStat(json); > if(stat != null) { > if ((stat.getNetworkPartnerId() == null) && > (stat.getBaseStationId() == null)) { > String opKey = stat.getOperatorId(); > Stat statOp = deserializeStat(this.kvStore.get(opKey)); > if (statOp == null) { > statOp = new Stat(); > statOp.setCount(stat.getCount()); > statOp.setOperatorId(stat.getOperatorId()); > this.kvStore.put(opKey, serializeStat(statOp)); > } else { > statOp.setCount(statOp.getCount() + stat.getCount()); > this.kvStore.put(opKey, serializeStat(statOp)); > } > } else if (stat.getBaseStationId() == null) { > String npKey = stat.getOperatorId() + "_" + > stat.getNetworkPartnerId(); > Stat statNp = deserializeStat(this.kvStore.get(npKey)); > if (statNp == null) { > statNp = new Stat(); > statNp.setCount(stat.getCount()); > statNp.setOperatorId(stat.getOperatorId()); > statNp.setNetworkPartnerId( > stat.getNetworkPartnerId()); > this.kvStore.put(npKey, serializeStat(statNp)); > } else { > statNp.setCount(statNp.getCount() + stat.getCount()); > this.kvStore.put(npKey, serializeStat(statNp)); > } > > } else { > String bsKey = stat.getOperatorId() + "_" + > stat.getNetworkPartnerId() + "_" + stat.getBaseStationId(); > Stat statBs = deserializeStat(this.kvStore.get(bsKey)); > if (statBs == null) { > statBs = new Stat(); > statBs.setCount(stat.getCount()); > statBs.setOperatorId(stat.getOperatorId()); > statBs.setNetworkPartnerId( > stat.getNetworkPartnerId()); > statBs.setBaseStationId(stat.getBaseStationId()); > this.kvStore.put(bsKey, serializeStat(statBs)); > } else { > statBs.setCount(statBs.getCount() + stat.getCount()); > this.kvStore.put(bsKey, serializeStat(statBs)); > } > > > } > } > } > } > > ----------------------------------------------------- > > public class ProcessorStatsByHourSupplier extends > ProcessorStatsByTimeSupplier<ProcessorStatsByHour> { > > public ProcessorStatsByHourSupplier(ProcessorStatsByHour processor) { > super(processor); > } > > @Override > public Processor<String, String> get() { > return processor; > } > } > > ----------------------------------------- > > public abstract class BaseProcessor implements Processor<String, String> { > > private static ObjectMapper objectMapper = new ObjectMapper(); > > private TimeUnit timeUnit; > private int countTimeUnit; > private String countStoreName; > > protected ProcessorContext context; > protected KeyValueStore<String, String> kvStore; > > public BaseProcessor(TimeUnit timeUnit, int countTimeUnit, String > countStoreName) { > this.timeUnit = timeUnit; > this.countTimeUnit = countTimeUnit; > this.countStoreName = countStoreName; > } > > @Override > @SuppressWarnings("unchecked") > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(TimeUnit.MILLISECONDS.convert(countTimeUnit, > timeUnit)); > this.kvStore = (KeyValueStore<String, String>) > context.getStateStore(countStoreName); > } > > @Override > public void punctuate(long timestamp) { > try (KeyValueIterator<String, String> iter = this.kvStore.all()) { > System.out.println("----------- " + timestamp + " ----------- > "); > while (iter.hasNext()) { > System.out.println("----------- pass1 ----------- "); > KeyValue<String, String> entry = iter.next(); > Stat stat = deserializeStat(entry.value); > if (stat != null) { > System.out.println("----------- pass2 ----------- "); > stat.setTimestamp(timestamp); > } > System.out.println("----------- pass3 ----------- "); > System.out.println("key"+entry.key); > System.out.println("stat"+serializeStat(stat)); > System.out.println("context"+context); > context.forward(entry.key, serializeStat(stat)); > System.out.println("[" + entry.key + ", " + > serializeStat(stat) + "]"); > iter.remove(); > } > } finally { > context.commit(); > } > } > > @Override > public void close() { > this.kvStore.close(); > } > > protected static Uplink deserialize(String json) { > try { > return objectMapper.readValue(json, Uplink.class); > } catch (IOException e) { > System.out.println(e.getMessage()); > return new Uplink().setOperatorId("fake") > .setNetworkPartnerId("fake").setBaseStationId("fake"). > setTimeStampProduce(60L); > > } > } > > protected static Stat deserializeStat(String json) { > if (json == null) { > return null; > } > > try { > return objectMapper.readValue(json, Stat.class); > } catch (IOException e) { > System.out.println(e.getMessage()); > return new Stat().setOperatorId("fake"). > setNetworkPartnerId("fake").setBaseStationId("fake").setTimestamp(System. > currentTimeMillis()).setCount(-1L); > } > } > > protected static String serializeStat(Stat stat) { > try { > String s = objectMapper.writeValueAsString(stat); > > return s; > } catch (IOException e) { > System.out.println(e.getMessage()); > return "{'operatorId':'fake','networkPartnerId':'fake',' > baseStationId':'fake','count':-1,'timestamp':5}"; > } > } > } > > > ________________________________ > De : Guozhang Wang <wangg...@gmail.com> > Envoyé : lundi 19 septembre 2016 12:19:36 > À : users@kafka.apache.org > Objet : Re: Error kafka-stream method punctuate in context.forward() > > Hello Hamza, > > Which Kafka version are you using with this application? Also could you > share your code skeleton of the StatsByDay processor implementation? > > > Guozhang > > > On Fri, Sep 16, 2016 at 6:58 AM, Hamza HACHANI <hamza.hach...@supcom.tn> > wrote: > > > Good morning, > > > > I have a problem with a kafka-stream application. > > > > In fact I 've created already two kafka stream applications : > > > > StatsByMinute : entry topic : uplinks, out topic : statsM. > > > > StatsByHour : entrey topic : statsM, out topic : statsH. > > > > StatsByDay : entry topic : statsH, out topic : statsD. > > > > > > > > The three of these application hava naerly the same struture ( > > StatsByMinute and StatsBy Hour/Stats By Day are only different in the > > application ID KVstore and the mthos process() ). > > > > StatsBy Day and Stats BY Hour have exactly the same structure (the only > > exception is the ID parameters) . > > > > > > The Problem is that stastByMinute and StatsByHour works parfectly. > > > > But this this not the case for StatsByDay where i verified that i do > > receive data and process it (so process works). but in the line > > context.forward in punctuate there is a problem. > > > > I get the following error : > > > > > > [2016-09-16 15:44:24,467] ERROR Streams application error during > > processing in thread [StreamThread-1]: (org.apache.kafka.streams. > > processor.internals.StreamThread) > > java.lang.NullPointerException > > at org.apache.kafka.streams.processor.internals. > > StreamTask.forward(StreamTask.java:336) > > at org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > > at com.actility.tpk.stat.BaseProcessor.punctuate( > > BaseProcessor.java:54) > > at org.apache.kafka.streams.processor.internals. > > StreamTask.punctuate(StreamTask.java:227) > > at org.apache.kafka.streams.processor.internals. > > PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > at org.apache.kafka.streams.processor.internals. > > StreamTask.maybePunctuate(StreamTask.java:212) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.maybePunctuate(StreamThread.java:407) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.runLoop(StreamThread.java:325) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:218) > > Exception in thread "StreamThread-1" java.lang.NullPointerException > > at org.apache.kafka.streams.processor.internals. > > StreamTask.forward(StreamTask.java:336) > > at org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > > at com.actility.tpk.stat.BaseProcessor.punctuate( > > BaseProcessor.java:54) > > at org.apache.kafka.streams.processor.internals. > > StreamTask.punctuate(StreamTask.java:227) > > at org.apache.kafka.streams.processor.internals. > > PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > at org.apache.kafka.streams.processor.internals. > > StreamTask.maybePunctuate(StreamTask.java:212) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.maybePunctuate(StreamThread.java:407) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.runLoop(StreamThread.java:325) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:218) > > > > > > > > > -- > -- Guozhang > -- -- Guozhang