Hi Damain, The semantics of Count() is still depending on the KTable, where the records are of changelogs based on key. So for your example the KTable is really conceptually from an updating "source table" with only one record but with updating values from 1 to 5. And KTable.count(...) is essentially "count the records per each key in the source table", which will always return:
A: 1 And you get "A: 2" because the results are actually in the format of Change<>, which is a pair of old / new pairs, and hence you actually get: New: A=>1, Old: null New: A=>2, Old: A=>1 (after applying the addition) New: A=>1, Old: A=>2 (after applying the subtraction) New: A=>2, Old: A=>1 (after applying the addition) New: A=>1, Old: A=>2 (after applying the subtraction) .... New: A=>1, Old: A=>2 (after applying the subtraction) And the final result should always be A=>1. Generally speaking, any aggregations that are on the primary key would have meaningless semantics as there will only be one record for each key. "KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a NullPointerException": this seems like a bug, would you like to file a JIRA to keep track of investigation? Guozhang On Mon, Apr 18, 2016 at 3:12 AM, Damian Guy <damian....@gmail.com> wrote: > Hi Liquan, > > Thanks for getting back to me and pointing me to the confluent doco. Based > on what i read and my own assumptions, i'd expect the data consumed from > the output topic to be: > > A:1 > A:2 > A:3 > A:4 > A:5 > > What am i missing? > > Thanks, > Damian > > On Sun, 17 Apr 2016 at 19:20 Liquan Pei <liquan...@gmail.com> wrote: > > > Hi Damin, > > > > I am new to KStreams as well, so my answer might not be 100% precise. In > > KTable, the same key is treated as updates instead of events. Thus > > aggregation on the same key will do some de-dup. The docs for the tech > > preview contains some explanation on this behavior: > > > > > > > http://docs.confluent.io/2.1.0-alpha1/streams/concepts.html#ktable-changelog-stream > > > > > http://docs.confluent.io/2.1.0-alpha1/streams/quickstart.html#inspect-the-output-data > > > > Maybe we can update the Javadoc to make this behavior more explicit? > > > > Thanks, > > Liquan > > > > On Sun, Apr 17, 2016 at 9:59 AM, Damian Guy <damian....@gmail.com> > wrote: > > > > > Hi, > > > > > > I'm slightly confused by KTable.count(..). The javadoc says: > > > > > > Count number of records of this stream by the selected key into a new > > > instance of {@link KTable}. > > > > > > So.. if i send 5 records with the same key to the input topic, as per > > below > > > > > > final KafkaProducer<String, Integer> producer = new > > > KafkaProducer<>(producerProperties, new StringSerializer(), new > > > IntegerSerializer()); > > > for(int i =0;i<5;i++) { > > > producer.send(new ProducerRecord<>("input", "A",i)); > > > } > > > producer.flush(); > > > > > > and then setup my stream like so: > > > > > > final KStreamBuilder builder = new KStreamBuilder(); > > > final KTable<String, Integer> table = builder.table(Serdes.String(), > > > Serdes.Integer(), "input"); > > > final KTable<String, Long> count = table.count(new > > > KeyValueMapper<String, Integer, String>() { > > > @Override > > > public String apply(final String key, final Integer value) { > > > return key; > > > } > > > }, Serdes.String(), Serdes.Integer(),"count"); > > > > > > count.to(Serdes.String(), Serdes.Long(),"count"); > > > > > > And then consume the data from the "count" topic I thought i should > > > eventually get a record where the key is A and the value is 5, i.e, the > > > number of times the key A was seen in the input stream. However, this > is > > > not the case. What i receive on the count topic is: > > > A:1 > > > A:2 > > > A:1 > > > A:2 > > > A:1 > > > A:2 > > > A:1 > > > A:2 > > > A:1 > > > > > > Is this expected behaviour? Have i misunderstood how count is supposed > to > > > work? > > > > > > Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) > causes a > > > NullPointerException > > > > > > Thanks, > > > Damian > > > > > > > > > > > -- > > Liquan Pei > > Software Engineer, Confluent Inc > > > -- -- Guozhang