Hi,

I'm slightly confused by KTable.count(..). The javadoc says:

 Count number of records of this stream by the selected key into a new
instance of {@link KTable}.

So.. if i send 5 records with the same key to the input topic, as per below

final KafkaProducer<String, Integer> producer = new
KafkaProducer<>(producerProperties, new StringSerializer(), new
IntegerSerializer());
for(int i =0;i<5;i++) {
    producer.send(new ProducerRecord<>("input", "A",i));
}
producer.flush();

and then setup my stream like so:

final KStreamBuilder builder = new KStreamBuilder();
final KTable<String, Integer> table = builder.table(Serdes.String(),
Serdes.Integer(), "input");
final KTable<String, Long> count = table.count(new
KeyValueMapper<String, Integer, String>() {
    @Override
    public String apply(final String key, final Integer value) {
        return key;
    }
}, Serdes.String(), Serdes.Integer(),"count");

count.to(Serdes.String(), Serdes.Long(),"count");

And then consume the data from the "count" topic I thought i should
eventually get a record where the key is A and the value is 5, i.e, the
number of times the key A was seen in the input stream. However, this is
not the case. What i receive on the count topic is:
A:1
A:2
A:1
A:2
A:1
A:2
A:1
A:2
A:1

Is this expected behaviour? Have i misunderstood how count is supposed to
work?

Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a
NullPointerException

Thanks,
Damian

Reply via email to