Hi Matthias, Thanks for the response. I stream output as follows:
longCounts.toStream((wk, v) -> wk.key()) .to(Serdes.String(), Serdes.Long(), "qps-aggregated"); I want to read last value from that topic at another application. I've tried that: Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "qps-consumer"); *//I'dont know the real purpose of this setting* props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("qps-aggregated")); ConsumerRecords<String, String> records = consumer.poll(1); for (ConsumerRecord<String, String> record : records) { System.out.printf("Connected! offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } I can see that there is data when I check the streamed topic (qps-aggregated) from command line. However, I cannot get any result from that subscription via my application. What can be the reason? Kind Regards, Furkan KAMACI On Wed, Nov 2, 2016 at 10:58 PM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Hi, > > first, AUTO_OFFSET_RESET_CONFIG has only an effect if you start up you > application for the first time. If you start it a second time, it will > resume from where is left off. > > About getting numbers starting from zero: this is expected behavior > because streams **updates** the window computation each time an input > record is added to the window. So you see each intermediate result. > > Furthermore, each time a new window is created, you will see a "1" > again in the output as this is the current count of the new window. If > you want do distinguish windows in the output, you need to look at the > key. It encode the original record-key as well as a window ID. > > > - -Matthias > > On 11/2/16 12:13 PM, Furkan KAMACI wrote: > > I use Kafka 0.10.0.1. I count the messages of a topic as follows: > > > > ... > > streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > "earliest"); ... KStream<String, String> longs = > > builder.stream(Serdes.String(), Serdes.String(), "qps-input"); ... > > KTable<Windowed<String>, Long> longCounts = > > longs.countByKey(TimeWindows.of("qps", 3600 * 1000), > > Serdes.String()); ... > > > > and then I write output to another topic. Result is that: > > > > Numbers which starts from 1 and increase whenever I add something > > to qps-input. > > > > My questions: > > > > 1) Does it calculate really last hour or everything from the > > beginning due you I've set it as earliest? > > > > 2) Sometimes it's been reset and numbers starts from 1. What can be > > the reason for that? > > > > Kind Regards, Furkan KAMACI > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYGlN3AAoJECnhiMLycopPzesP/jo9pX7hM03WeXEMvsGLpUgz > N0/vqH9roEQOT/LoZacwV62CYZ7UvITU/G7hLymp9s8Q1g3+7phdc9OPI2Vy2WFT > RgpK3WYVYK7lKOZiE8i/n/Ibu9H2SJAYBdkyse1RsuMGACLEuOoASV6P67QZKIGI > Cw9Eq5IQDLBPpWoeUfofWIJtFEFF4DtT52zY7CFryKsRngWDZtBcGcqt0mqUrVM6 > vvlCuRsxB/1/n/IzmCF3JqmSL7TSsNrSu2ULKgG0K/+71SxPpzNhLZSlAs92zQH+ > APPWgu4s0Kq4IIzje6eQiny82354zg0E3xbVTC+Ra3o0PEX/skKUdlcj1GA1Yvf8 > sFaGDzXjrhQa9ZmCPYSDyveZRlUKmP6QGdPJro+EIKnOv4VTxsF9LPiiQzDds/sc > bMjCRP+kZdFpow9IcjsLGo39Cu2mVCg7ChbaGVnvVaZ8pZuPdASTbLhWeUPXNhjv > XPEkxqPFexdRL38idWh0CcWv++Dr2Dvbu2lRBDc9SPqRcgzF51pmAmau/TW3WV+J > 8iVL+OH0TRhRx+L3Ie3tiahInXvf7Fwwwmc1fJASeN54zhhJnU8vSVYA0JDX0+N8 > BPVnSoIdHEnCmlFNm1vxxcCk65Fjug+AZQpHCmZzepHTg6LcdNHR9TH9iaTrvjr1 > 6gi7YNmGkeE+jzTf/YC9 > =Vq3G > -----END PGP SIGNATURE----- >