Hi Guozhang,

 Here is the code for the two concerned classes

If this can help i fugure out that the instances  of

ProcessorStatsByHourSupplier and  ProcessorStatsByMinuteSupplier  which are 
returned are the same.

I think this is the problem. I tried to fix it but i was not to do it.


Thanks Guozhang

Hamza


----------------------

public class StatsByMinute {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByMinute");
        // Where to find Kafka broker(s).
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092");
        // Where to find the corresponding ZooKeeper ensemble.
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());

        TopologyBuilder builder = new TopologyBuilder();

        builder.addSource("Source", "uplink");

        String countStoreName= "CountsStore" + System.currentTimeMillis();
        ////
        builder.addProcessor("Process", new ProcessorStatsByMinuteSupplier(new 
ProcessorStatsByMinute(1, countStoreName)), "Source");
        
builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(),
 "Process");
        builder.addSink("Sink", "statsM", Serdes.String().serializer(), 
Serdes.String().serializer(), "Process");
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
    }
}


---------------------

public class StatsByHour {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByHour");
        // Where to find Kafka broker(s).
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092");
        // Where to find the corresponding ZooKeeper ensemble.
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());

        TopologyBuilder builder = new TopologyBuilder();

        builder.addSource("Source", "statsM");

        String countStoreName= "CountsStore" + System.currentTimeMillis();
        ////
        ProcessorStatsByHourSupplier processorStatsByHourSupplier = new 
ProcessorStatsByHourSupplier(new ProcessorStatsByHour(3, countStoreName));
        System.out.println(processorStatsByHourSupplier);
        builder.addProcessor("Process", processorStatsByHourSupplier, "Source");
        
builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(),
 "Process");
        builder.addSink("Sink", "statsH", Serdes.String().serializer(), 
Serdes.String().serializer(), "Process");
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
    }
}


---------------

public class ProcessorStatsByHour extends BaseProcessor {

    public ProcessorStatsByHour(int countTimeUnit, String countStoreName) {
        super(TimeUnit.MINUTES, countTimeUnit, countStoreName);
    }

    @Override
    public void process(String key, String json) {
        Stat stat = deserializeStat(json);
        if(stat != null) {
            if ((stat.getNetworkPartnerId() == null) && 
(stat.getBaseStationId() == null)) {
                String opKey = stat.getOperatorId();
                Stat statOp = deserializeStat(this.kvStore.get(opKey));
                if (statOp == null) {
                    statOp = new Stat();
                    statOp.setCount(stat.getCount());
                    statOp.setOperatorId(stat.getOperatorId());
                    this.kvStore.put(opKey, serializeStat(statOp));
                } else {
                    statOp.setCount(statOp.getCount() + stat.getCount());
                    this.kvStore.put(opKey, serializeStat(statOp));
                }
            } else if (stat.getBaseStationId() == null) {
                String npKey = stat.getOperatorId() + "_" + 
stat.getNetworkPartnerId();
                Stat statNp = deserializeStat(this.kvStore.get(npKey));
                if (statNp == null) {
                    statNp = new Stat();
                    statNp.setCount(stat.getCount());
                    statNp.setOperatorId(stat.getOperatorId());
                    statNp.setNetworkPartnerId(stat.getNetworkPartnerId());
                    this.kvStore.put(npKey, serializeStat(statNp));
                } else {
                    statNp.setCount(statNp.getCount() + stat.getCount());
                    this.kvStore.put(npKey, serializeStat(statNp));
                }

            } else {
                String bsKey = stat.getOperatorId() + "_" + 
stat.getNetworkPartnerId() + "_" + stat.getBaseStationId();
                Stat statBs = deserializeStat(this.kvStore.get(bsKey));
                if (statBs == null) {
                    statBs = new Stat();
                    statBs.setCount(stat.getCount());
                    statBs.setOperatorId(stat.getOperatorId());
                    statBs.setNetworkPartnerId(stat.getNetworkPartnerId());
                    statBs.setBaseStationId(stat.getBaseStationId());
                    this.kvStore.put(bsKey, serializeStat(statBs));
                } else {
                    statBs.setCount(statBs.getCount() + stat.getCount());
                    this.kvStore.put(bsKey, serializeStat(statBs));
                }


            }
        }
    }
}

----------------------------------------------------------------

public class ProcessorStatsByDay extends BaseProcessor {

    public ProcessorStatsByDay(int countTimeUnit, String countStoreName) {
        super(TimeUnit.MINUTES, countTimeUnit, countStoreName);
    }

    @Override
    public void process(String key, String json) {
        Stat stat = deserializeStat(json);
        if(stat != null) {
            if ((stat.getNetworkPartnerId() == null) && 
(stat.getBaseStationId() == null)) {
                String opKey = stat.getOperatorId();
                Stat statOp = deserializeStat(this.kvStore.get(opKey));
                if (statOp == null) {
                    statOp = new Stat();
                    statOp.setCount(stat.getCount());
                    statOp.setOperatorId(stat.getOperatorId());
                    this.kvStore.put(opKey, serializeStat(statOp));
                } else {
                    statOp.setCount(statOp.getCount() + stat.getCount());
                    this.kvStore.put(opKey, serializeStat(statOp));
                }
            } else if (stat.getBaseStationId() == null) {
                String npKey = stat.getOperatorId() + "_" + 
stat.getNetworkPartnerId();
                Stat statNp = deserializeStat(this.kvStore.get(npKey));
                if (statNp == null) {
                    statNp = new Stat();
                    statNp.setCount(stat.getCount());
                    statNp.setOperatorId(stat.getOperatorId());
                    statNp.setNetworkPartnerId(stat.getNetworkPartnerId());
                    this.kvStore.put(npKey, serializeStat(statNp));
                } else {
                    statNp.setCount(statNp.getCount() + stat.getCount());
                    this.kvStore.put(npKey, serializeStat(statNp));
                }

            } else {
                String bsKey = stat.getOperatorId() + "_" + 
stat.getNetworkPartnerId() + "_" + stat.getBaseStationId();
                Stat statBs = deserializeStat(this.kvStore.get(bsKey));
                if (statBs == null) {
                    statBs = new Stat();
                    statBs.setCount(stat.getCount());
                    statBs.setOperatorId(stat.getOperatorId());
                    statBs.setNetworkPartnerId(stat.getNetworkPartnerId());
                    statBs.setBaseStationId(stat.getBaseStationId());
                    this.kvStore.put(bsKey, serializeStat(statBs));
                } else {
                    statBs.setCount(statBs.getCount() + stat.getCount());
                    this.kvStore.put(bsKey, serializeStat(statBs));
                }


            }
        }
    }
}

-----------------------------------------------------

public class ProcessorStatsByHourSupplier extends 
ProcessorStatsByTimeSupplier<ProcessorStatsByHour> {

    public ProcessorStatsByHourSupplier(ProcessorStatsByHour processor) {
        super(processor);
    }

    @Override
    public Processor<String, String> get() {
        return processor;
    }
}

-----------------------------------------

public abstract class BaseProcessor implements Processor<String, String> {

    private static ObjectMapper objectMapper = new ObjectMapper();

    private TimeUnit timeUnit;
    private int countTimeUnit;
    private String countStoreName;

    protected ProcessorContext context;
    protected KeyValueStore<String, String> kvStore;

    public BaseProcessor(TimeUnit timeUnit, int countTimeUnit, String 
countStoreName) {
        this.timeUnit = timeUnit;
        this.countTimeUnit = countTimeUnit;
        this.countStoreName = countStoreName;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.schedule(TimeUnit.MILLISECONDS.convert(countTimeUnit, 
timeUnit));
        this.kvStore = (KeyValueStore<String, String>) 
context.getStateStore(countStoreName);
    }

    @Override
    public void punctuate(long timestamp) {
        try (KeyValueIterator<String, String> iter = this.kvStore.all()) {
            System.out.println("----------- " + timestamp + " ----------- ");
            while (iter.hasNext()) {
                System.out.println("----------- pass1 ----------- ");
                KeyValue<String, String> entry = iter.next();
                Stat stat = deserializeStat(entry.value);
                if (stat != null) {
                    System.out.println("----------- pass2 ----------- ");
                    stat.setTimestamp(timestamp);
                }
                System.out.println("----------- pass3 ----------- ");
                System.out.println("key"+entry.key);
                System.out.println("stat"+serializeStat(stat));
                System.out.println("context"+context);
                context.forward(entry.key, serializeStat(stat));
                System.out.println("[" + entry.key + ", " + serializeStat(stat) 
+ "]");
                iter.remove();
            }
        } finally {
            context.commit();
        }
    }

    @Override
    public void close() {
        this.kvStore.close();
    }

    protected static Uplink deserialize(String json) {
        try {
            return objectMapper.readValue(json, Uplink.class);
        } catch (IOException e) {
            System.out.println(e.getMessage());
            return new 
Uplink().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimeStampProduce(60L);

        }
    }

    protected static Stat deserializeStat(String json) {
        if (json == null) {
            return null;
        }

        try {
            return objectMapper.readValue(json, Stat.class);
        } catch (IOException e) {
            System.out.println(e.getMessage());
            return new 
Stat().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimestamp(System.currentTimeMillis()).setCount(-1L);
        }
    }

    protected static String serializeStat(Stat stat) {
        try {
            String s = objectMapper.writeValueAsString(stat);

            return s;
        } catch (IOException e) {
            System.out.println(e.getMessage());
            return 
"{'operatorId':'fake','networkPartnerId':'fake','baseStationId':'fake','count':-1,'timestamp':5}";
        }
    }
}


________________________________
De : Guozhang Wang <wangg...@gmail.com>
Envoyé : lundi 19 septembre 2016 12:19:36
À : users@kafka.apache.org
Objet : Re: Error kafka-stream method punctuate in context.forward()

Hello Hamza,

Which Kafka version are you using with this application? Also could you
share your code skeleton of the StatsByDay processor implementation?


Guozhang


On Fri, Sep 16, 2016 at 6:58 AM, Hamza HACHANI <hamza.hach...@supcom.tn>
wrote:

> Good morning,
>
> I have a problem with a kafka-stream application.
>
> In fact I 've created already two kafka stream applications :
>
> StatsByMinute : entry topic : uplinks, out topic : statsM.
>
> StatsByHour : entrey topic : statsM, out topic : statsH.
>
> StatsByDay : entry topic : statsH, out topic : statsD.
>
>
>
> The three of these application hava naerly the same struture (
> StatsByMinute and StatsBy Hour/Stats By Day are only different in the
> application ID KVstore and the mthos process() ).
>
> StatsBy Day and Stats BY Hour have exactly the same structure (the only
> exception is the ID parameters) .
>
>
> The Problem is that stastByMinute and StatsByHour works parfectly.
>
> But this this not the case for StatsByDay where i verified that i do
> receive data and process it (so process works). but in the line
> context.forward in punctuate  there is a problem.
>
> I get the following error :
>
>
> [2016-09-16 15:44:24,467] ERROR Streams application error during
> processing in thread [StreamThread-1]:  (org.apache.kafka.streams.
> processor.internals.StreamThread)
> java.lang.NullPointerException
>         at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:336)
>         at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>         at com.actility.tpk.stat.BaseProcessor.punctuate(
> BaseProcessor.java:54)
>         at org.apache.kafka.streams.processor.internals.
> StreamTask.punctuate(StreamTask.java:227)
>         at org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>         at org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(StreamTask.java:212)
>         at org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:407)
>         at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(StreamThread.java:325)
>         at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:218)
> Exception in thread "StreamThread-1" java.lang.NullPointerException
>         at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:336)
>         at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>         at com.actility.tpk.stat.BaseProcessor.punctuate(
> BaseProcessor.java:54)
>         at org.apache.kafka.streams.processor.internals.
> StreamTask.punctuate(StreamTask.java:227)
>         at org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>         at org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(StreamTask.java:212)
>         at org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:407)
>         at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(StreamThread.java:325)
>         at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:218)
>
>
>


--
-- Guozhang

Reply via email to