-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Two things:
1) you should not apply the window to the first count, but to the base stream to get correct results. 2) your windowed aggregation, doew not just return String type, but Window<K> type. Thus, you need to either insert a .map() to transform you data into String typo, or you provide a custom serializer when writing data to output topic (method, .to(...) has multiple overloads) Per default, each topic read/write operation uses Serdes from the streams config. If you data has a different type, you need to provide appropriate Serdes for those operators. - -Matthias On 10/18/16 2:01 PM, Furkan KAMACI wrote: > Hi Matthias, > > I've tried this code: > > * final Properties streamsConfiguration = new > Properties();* * > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > "myapp");* * > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092");* * > streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > "localhost:2181");* * > streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName());* * > streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName());* * final > KStreamBuilder builder = new KStreamBuilder();* * final > KStream input = builder.stream("myapp-test");* > > * final KStream<String, Long> searchCounts = > input.countByKey("SearchRequests").toStream();* * > searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * > 1000)).to("outputTopicHourlyCounts");* > > * final KafkaStreams streams = new KafkaStreams(builder, > streamsConfiguration);* * streams.start();* > > * Runtime.getRuntime().addShutdownHook(new > Thread(streams::close));* > > However I get an error: > > > *Exception in thread "StreamThread-1" > java.lang.ClassCastException: > org.apache.kafka.streams.kstream.Windowed cannot be cast to > java.lang.String* > > On the other hand when I try this code: > > https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a > > I get an error too which indicates that: > > *Exception in thread "StreamThread-1" > org.apache.kafka.common.errors.SerializationException: Size of > data received by LongDeserializer is not 8 * > > Here is generated topic: > > *kafka-console-consumer --zookeeper localhost:2181 --topic > myapp-test --from-beginning* * 28952314828122* * > 28988681653726* * 29080089383233* > > I know that I miss something but couldn't find it. > > Kind Regards, Furkan KAMACI > > On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax > <matth...@confluent.io> wrote: > > I see. KGroupedStream will be part of 0.10.1.0 (should be release > the next weeks). > > So, instead of > >>>> .groupByKey().count() > > you need to do > >>>> .countByKey() > > > > -Matthias > > On 10/18/16 12:05 PM, Furkan KAMACI wrote: >>>> Hi Matthias, >>>> >>>> Thanks for your detailed answer. By the way I couldn't find >>>> "KGroupedStream" at version of 0.10.0.1? >>>> >>>> Kind Regards, Furkan KAMACI >>>> >>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax >>>> <matth...@confluent.io> wrote: >>>> >>>> Hi, >>>> >>>> You just need to read you stream and apply an (windowed) >>>> aggregation on it. >>>> >>>> If you use non-windowed aggregation you will get "since the >>>> beginning". If you use windowed aggregation you can specify >>>> the window size as 1 hour and get those results. >>>> >>>> One comment: it seems that you want to count *all* queries. >>>> To make this work, you need to make sure all records are >>>> using the same key (because Kafka Streams only supports >>>> aggregation over keyed streams). Keep in mind, that this >>>> prohibits parallelization of you aggregation! >>>> >>>> As a workaround, you could also do two consecutive >>>> aggregation, and do parallelize the first one, and do not >>>> parallelize the second one (ie, using the first one as a pre >>>> aggregation similar to a combine step) >>>> >>>> Without pre aggregation and assuming all records use the same >>>> key something like this (for current trunk): >>>> >>>> >>>>>>> KStreamBuilder builder = new KStreamBuilder(): KStream >>>>>>> input = builder.stream("yourTopic"); >>>>>>> >>>>>>> KGroupedStream groupedInput = input.groupByKey(); >>>>>>> >>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBeginni ng" > >>>>>>> ); >>>>>>> >>>>>>> >>>> >>>>>>> > groupedInput.count(TimeWindows.of(3600 * 1000), >>>> "windowedCountStore").to("outputTopicHourlyCounts"): >>>> >>>> >>>> For more details, please see the docs and examples: >>>> >>>> - http://docs.confluent.io/current/streams/index.html - >>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0 .1/ > >>>> ka >>>> >>>> > fka-streams >>>> >>>> >>>> -Matthias >>>> >>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote: >>>>>>> Hi, >>>>>>> >>>>>>> I could successfully run Kafka at my environment. I >>>>>>> want to monitor Queries per Second at my search >>>>>>> application with Kafka. Whenever a search request is >>>>>>> done I create a ProducerRecord which holds current nano >>>>>>> time of the system. >>>>>>> >>>>>>> I know that I have to use a streaming API for >>>>>>> calculation i.e. Kafka Streams or Spark Streams. My >>>>>>> choice is to use Kafka Streams. >>>>>>> >>>>>>> For last 1 hours, or since the beginning, I have to >>>>>>> calculate the queries per second. How can I make such >>>>>>> an aggregation at Kafka Streams? >>>>>>> >>>>>>> Kind Regards, Furkan KAMACI >>>>>>> >>>>> >>>> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYBpDSAAoJECnhiMLycopPguYQAIfe/JwkpDgvePNJceb5s+kr oQQrQ2ja0A7R4aNmnFBFA5QZ9vbtP25CUCAD4y/FAKDoneGi8vYBf0Ky9l3flh5+ admwq5wQyJesgS+mHTo/iUqHJUbTHTHixyKyvMMwmqJvgbaRkLFFx5GFhgrZZhHo 4jc0s1oebdzMA4dNkrdaM6+M0G9pZmE1ILz26EDPXdxfnBIp8zNK8LxqRubzvzML gv+wVU8USB2dkRR6WTB56WKlpfSFjAUweyrv9iEJdvfOwsuBStRf5ex7YG5BWbgi E3yCeKPR0GPy+3zj7Bjjsts5hYA0LZnJZpjGjpSxtd4dl/nH7El+SEEB+aNXv+3f UuSufV335sSDYteLMWySJBKQAu8AgDIeVnqMQwnaNywhhXVXuoLkoRv/h/x9Fiwk g26S7+JN4MQKwbHreMDrLSPEQy0oPdgCTtgcjA0BlOb6wzcUNNiETiyYVy2OoT04 bCAge7KW43afmwiY4t7WetLjSvQMOJMq+tRArpVuX0Fk6IfE5LsiStRTQCnlQxHM ruXSbqPWh3DYU32EL/QwzyiiZhPUmjN5SCehBjmRWEfnEgay2qbXh0Hnft0sk6f0 /SUbl/i11D4hhhPSnNTSQj9qEJT2SD7A7N90FplgQDwfCMWKg76Sfn85qMLiFnRE FDk0ghehl5ROJhXgs1eN =OU4j -----END PGP SIGNATURE-----