Hi Damin,

I am new to KStreams as well, so my answer might not be 100% precise. In
KTable, the same key is treated as updates instead of events. Thus
aggregation on the same key will do some de-dup. The docs for the tech
preview contains some explanation on this behavior:

http://docs.confluent.io/2.1.0-alpha1/streams/concepts.html#ktable-changelog-stream
http://docs.confluent.io/2.1.0-alpha1/streams/quickstart.html#inspect-the-output-data

Maybe we can update the Javadoc to make this behavior more explicit?

Thanks,
Liquan

On Sun, Apr 17, 2016 at 9:59 AM, Damian Guy <damian....@gmail.com> wrote:

> Hi,
>
> I'm slightly confused by KTable.count(..). The javadoc says:
>
>  Count number of records of this stream by the selected key into a new
> instance of {@link KTable}.
>
> So.. if i send 5 records with the same key to the input topic, as per below
>
> final KafkaProducer<String, Integer> producer = new
> KafkaProducer<>(producerProperties, new StringSerializer(), new
> IntegerSerializer());
> for(int i =0;i<5;i++) {
>     producer.send(new ProducerRecord<>("input", "A",i));
> }
> producer.flush();
>
> and then setup my stream like so:
>
> final KStreamBuilder builder = new KStreamBuilder();
> final KTable<String, Integer> table = builder.table(Serdes.String(),
> Serdes.Integer(), "input");
> final KTable<String, Long> count = table.count(new
> KeyValueMapper<String, Integer, String>() {
>     @Override
>     public String apply(final String key, final Integer value) {
>         return key;
>     }
> }, Serdes.String(), Serdes.Integer(),"count");
>
> count.to(Serdes.String(), Serdes.Long(),"count");
>
> And then consume the data from the "count" topic I thought i should
> eventually get a record where the key is A and the value is 5, i.e, the
> number of times the key A was seen in the input stream. However, this is
> not the case. What i receive on the count topic is:
> A:1
> A:2
> A:1
> A:2
> A:1
> A:2
> A:1
> A:2
> A:1
>
> Is this expected behaviour? Have i misunderstood how count is supposed to
> work?
>
> Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a
> NullPointerException
>
> Thanks,
> Damian
>



-- 
Liquan Pei
Software Engineer, Confluent Inc

Reply via email to