I'm using the version 10.0 ________________________________ De : Hamza HACHANI Envoyé : lundi 19 septembre 2016 19:20:23 À : users@kafka.apache.org Objet : RE: Error kafka-stream method punctuate in context.forward()
Hi Guozhang, Here is the code for the two concerned classes If this can help i fugure out that the instances of ProcessorStatsByHourSupplier and ProcessorStatsByMinuteSupplier which are returned are the same. I think this is the problem. I tried to fix it but i was not to do it. Thanks Guozhang Hamza ---------------------- public class StatsByMinute { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByMinute"); // Where to find Kafka broker(s). props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092"); // Where to find the corresponding ZooKeeper ensemble. props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("Source", "uplink"); String countStoreName= "CountsStore" + System.currentTimeMillis(); //// builder.addProcessor("Process", new ProcessorStatsByMinuteSupplier(new ProcessorStatsByMinute(1, countStoreName)), "Source"); builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(), "Process"); builder.addSink("Sink", "statsM", Serdes.String().serializer(), Serdes.String().serializer(), "Process"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); } } --------------------- public class StatsByHour { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByHour"); // Where to find Kafka broker(s). props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092"); // Where to find the corresponding ZooKeeper ensemble. props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("Source", "statsM"); String countStoreName= "CountsStore" + System.currentTimeMillis(); //// ProcessorStatsByHourSupplier processorStatsByHourSupplier = new ProcessorStatsByHourSupplier(new ProcessorStatsByHour(3, countStoreName)); System.out.println(processorStatsByHourSupplier); builder.addProcessor("Process", processorStatsByHourSupplier, "Source"); builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(), "Process"); builder.addSink("Sink", "statsH", Serdes.String().serializer(), Serdes.String().serializer(), "Process"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); } } --------------- public class ProcessorStatsByHour extends BaseProcessor { public ProcessorStatsByHour(int countTimeUnit, String countStoreName) { super(TimeUnit.MINUTES, countTimeUnit, countStoreName); } @Override public void process(String key, String json) { Stat stat = deserializeStat(json); if(stat != null) { if ((stat.getNetworkPartnerId() == null) && (stat.getBaseStationId() == null)) { String opKey = stat.getOperatorId(); Stat statOp = deserializeStat(this.kvStore.get(opKey)); if (statOp == null) { statOp = new Stat(); statOp.setCount(stat.getCount()); statOp.setOperatorId(stat.getOperatorId()); this.kvStore.put(opKey, serializeStat(statOp)); } else { statOp.setCount(statOp.getCount() + stat.getCount()); this.kvStore.put(opKey, serializeStat(statOp)); } } else if (stat.getBaseStationId() == null) { String npKey = stat.getOperatorId() + "_" + stat.getNetworkPartnerId(); Stat statNp = deserializeStat(this.kvStore.get(npKey)); if (statNp == null) { statNp = new Stat(); statNp.setCount(stat.getCount()); statNp.setOperatorId(stat.getOperatorId()); statNp.setNetworkPartnerId(stat.getNetworkPartnerId()); this.kvStore.put(npKey, serializeStat(statNp)); } else { statNp.setCount(statNp.getCount() + stat.getCount()); this.kvStore.put(npKey, serializeStat(statNp)); } } else { String bsKey = stat.getOperatorId() + "_" + stat.getNetworkPartnerId() + "_" + stat.getBaseStationId(); Stat statBs = deserializeStat(this.kvStore.get(bsKey)); if (statBs == null) { statBs = new Stat(); statBs.setCount(stat.getCount()); statBs.setOperatorId(stat.getOperatorId()); statBs.setNetworkPartnerId(stat.getNetworkPartnerId()); statBs.setBaseStationId(stat.getBaseStationId()); this.kvStore.put(bsKey, serializeStat(statBs)); } else { statBs.setCount(statBs.getCount() + stat.getCount()); this.kvStore.put(bsKey, serializeStat(statBs)); } } } } } ---------------------------------------------------------------- public class ProcessorStatsByDay extends BaseProcessor { public ProcessorStatsByDay(int countTimeUnit, String countStoreName) { super(TimeUnit.MINUTES, countTimeUnit, countStoreName); } @Override public void process(String key, String json) { Stat stat = deserializeStat(json); if(stat != null) { if ((stat.getNetworkPartnerId() == null) && (stat.getBaseStationId() == null)) { String opKey = stat.getOperatorId(); Stat statOp = deserializeStat(this.kvStore.get(opKey)); if (statOp == null) { statOp = new Stat(); statOp.setCount(stat.getCount()); statOp.setOperatorId(stat.getOperatorId()); this.kvStore.put(opKey, serializeStat(statOp)); } else { statOp.setCount(statOp.getCount() + stat.getCount()); this.kvStore.put(opKey, serializeStat(statOp)); } } else if (stat.getBaseStationId() == null) { String npKey = stat.getOperatorId() + "_" + stat.getNetworkPartnerId(); Stat statNp = deserializeStat(this.kvStore.get(npKey)); if (statNp == null) { statNp = new Stat(); statNp.setCount(stat.getCount()); statNp.setOperatorId(stat.getOperatorId()); statNp.setNetworkPartnerId(stat.getNetworkPartnerId()); this.kvStore.put(npKey, serializeStat(statNp)); } else { statNp.setCount(statNp.getCount() + stat.getCount()); this.kvStore.put(npKey, serializeStat(statNp)); } } else { String bsKey = stat.getOperatorId() + "_" + stat.getNetworkPartnerId() + "_" + stat.getBaseStationId(); Stat statBs = deserializeStat(this.kvStore.get(bsKey)); if (statBs == null) { statBs = new Stat(); statBs.setCount(stat.getCount()); statBs.setOperatorId(stat.getOperatorId()); statBs.setNetworkPartnerId(stat.getNetworkPartnerId()); statBs.setBaseStationId(stat.getBaseStationId()); this.kvStore.put(bsKey, serializeStat(statBs)); } else { statBs.setCount(statBs.getCount() + stat.getCount()); this.kvStore.put(bsKey, serializeStat(statBs)); } } } } } ----------------------------------------------------- public class ProcessorStatsByHourSupplier extends ProcessorStatsByTimeSupplier<ProcessorStatsByHour> { public ProcessorStatsByHourSupplier(ProcessorStatsByHour processor) { super(processor); } @Override public Processor<String, String> get() { return processor; } } ----------------------------------------- public abstract class BaseProcessor implements Processor<String, String> { private static ObjectMapper objectMapper = new ObjectMapper(); private TimeUnit timeUnit; private int countTimeUnit; private String countStoreName; protected ProcessorContext context; protected KeyValueStore<String, String> kvStore; public BaseProcessor(TimeUnit timeUnit, int countTimeUnit, String countStoreName) { this.timeUnit = timeUnit; this.countTimeUnit = countTimeUnit; this.countStoreName = countStoreName; } @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { this.context = context; this.context.schedule(TimeUnit.MILLISECONDS.convert(countTimeUnit, timeUnit)); this.kvStore = (KeyValueStore<String, String>) context.getStateStore(countStoreName); } @Override public void punctuate(long timestamp) { try (KeyValueIterator<String, String> iter = this.kvStore.all()) { System.out.println("----------- " + timestamp + " ----------- "); while (iter.hasNext()) { System.out.println("----------- pass1 ----------- "); KeyValue<String, String> entry = iter.next(); Stat stat = deserializeStat(entry.value); if (stat != null) { System.out.println("----------- pass2 ----------- "); stat.setTimestamp(timestamp); } System.out.println("----------- pass3 ----------- "); System.out.println("key"+entry.key); System.out.println("stat"+serializeStat(stat)); System.out.println("context"+context); context.forward(entry.key, serializeStat(stat)); System.out.println("[" + entry.key + ", " + serializeStat(stat) + "]"); iter.remove(); } } finally { context.commit(); } } @Override public void close() { this.kvStore.close(); } protected static Uplink deserialize(String json) { try { return objectMapper.readValue(json, Uplink.class); } catch (IOException e) { System.out.println(e.getMessage()); return new Uplink().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimeStampProduce(60L); } } protected static Stat deserializeStat(String json) { if (json == null) { return null; } try { return objectMapper.readValue(json, Stat.class); } catch (IOException e) { System.out.println(e.getMessage()); return new Stat().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimestamp(System.currentTimeMillis()).setCount(-1L); } } protected static String serializeStat(Stat stat) { try { String s = objectMapper.writeValueAsString(stat); return s; } catch (IOException e) { System.out.println(e.getMessage()); return "{'operatorId':'fake','networkPartnerId':'fake','baseStationId':'fake','count':-1,'timestamp':5}"; } } } ________________________________ De : Guozhang Wang <wangg...@gmail.com> Envoyé : lundi 19 septembre 2016 12:19:36 À : users@kafka.apache.org Objet : Re: Error kafka-stream method punctuate in context.forward() Hello Hamza, Which Kafka version are you using with this application? Also could you share your code skeleton of the StatsByDay processor implementation? Guozhang On Fri, Sep 16, 2016 at 6:58 AM, Hamza HACHANI <hamza.hach...@supcom.tn> wrote: > Good morning, > > I have a problem with a kafka-stream application. > > In fact I 've created already two kafka stream applications : > > StatsByMinute : entry topic : uplinks, out topic : statsM. > > StatsByHour : entrey topic : statsM, out topic : statsH. > > StatsByDay : entry topic : statsH, out topic : statsD. > > > > The three of these application hava naerly the same struture ( > StatsByMinute and StatsBy Hour/Stats By Day are only different in the > application ID KVstore and the mthos process() ). > > StatsBy Day and Stats BY Hour have exactly the same structure (the only > exception is the ID parameters) . > > > The Problem is that stastByMinute and StatsByHour works parfectly. > > But this this not the case for StatsByDay where i verified that i do > receive data and process it (so process works). but in the line > context.forward in punctuate there is a problem. > > I get the following error : > > > [2016-09-16 15:44:24,467] ERROR Streams application error during > processing in thread [StreamThread-1]: (org.apache.kafka.streams. > processor.internals.StreamThread) > java.lang.NullPointerException > at org.apache.kafka.streams.processor.internals. > StreamTask.forward(StreamTask.java:336) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at com.actility.tpk.stat.BaseProcessor.punctuate( > BaseProcessor.java:54) > at org.apache.kafka.streams.processor.internals. > StreamTask.punctuate(StreamTask.java:227) > at org.apache.kafka.streams.processor.internals. > PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > at org.apache.kafka.streams.processor.internals. > StreamTask.maybePunctuate(StreamTask.java:212) > at org.apache.kafka.streams.processor.internals. > StreamThread.maybePunctuate(StreamThread.java:407) > at org.apache.kafka.streams.processor.internals. > StreamThread.runLoop(StreamThread.java:325) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:218) > Exception in thread "StreamThread-1" java.lang.NullPointerException > at org.apache.kafka.streams.processor.internals. > StreamTask.forward(StreamTask.java:336) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at com.actility.tpk.stat.BaseProcessor.punctuate( > BaseProcessor.java:54) > at org.apache.kafka.streams.processor.internals. > StreamTask.punctuate(StreamTask.java:227) > at org.apache.kafka.streams.processor.internals. > PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > at org.apache.kafka.streams.processor.internals. > StreamTask.maybePunctuate(StreamTask.java:212) > at org.apache.kafka.streams.processor.internals. > StreamThread.maybePunctuate(StreamThread.java:407) > at org.apache.kafka.streams.processor.internals. > StreamThread.runLoop(StreamThread.java:325) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:218) > > > -- -- Guozhang