Hi, I'm slightly confused by KTable.count(..). The javadoc says:
Count number of records of this stream by the selected key into a new instance of {@link KTable}. So.. if i send 5 records with the same key to the input topic, as per below final KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties, new StringSerializer(), new IntegerSerializer()); for(int i =0;i<5;i++) { producer.send(new ProducerRecord<>("input", "A",i)); } producer.flush(); and then setup my stream like so: final KStreamBuilder builder = new KStreamBuilder(); final KTable<String, Integer> table = builder.table(Serdes.String(), Serdes.Integer(), "input"); final KTable<String, Long> count = table.count(new KeyValueMapper<String, Integer, String>() { @Override public String apply(final String key, final Integer value) { return key; } }, Serdes.String(), Serdes.Integer(),"count"); count.to(Serdes.String(), Serdes.Long(),"count"); And then consume the data from the "count" topic I thought i should eventually get a record where the key is A and the value is 5, i.e, the number of times the key A was seen in the input stream. However, this is not the case. What i receive on the count topic is: A:1 A:2 A:1 A:2 A:1 A:2 A:1 A:2 A:1 Is this expected behaviour? Have i misunderstood how count is supposed to work? Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a NullPointerException Thanks, Damian