Hi Matthias, I've tried this code:
* final Properties streamsConfiguration = new Properties();* * streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "myapp");* * streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");* * streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");* * streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());* * streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());* * final KStreamBuilder builder = new KStreamBuilder();* * final KStream input = builder.stream("myapp-test");* * final KStream<String, Long> searchCounts = input.countByKey("SearchRequests").toStream();* * searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * 1000)).to("outputTopicHourlyCounts");* * final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);* * streams.start();* * Runtime.getRuntime().addShutdownHook(new Thread(streams::close));* However I get an error: *Exception in thread "StreamThread-1" java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String* On the other hand when I try this code: https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a I get an error too which indicates that: *Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 * Here is generated topic: *kafka-console-consumer --zookeeper localhost:2181 --topic myapp-test --from-beginning* * 28952314828122* * 28988681653726* * 29080089383233* I know that I miss something but couldn't find it. Kind Regards, Furkan KAMACI On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > I see. KGroupedStream will be part of 0.10.1.0 (should be release the > next weeks). > > So, instead of > > > .groupByKey().count() > > you need to do > > > .countByKey() > > > > - -Matthias > > On 10/18/16 12:05 PM, Furkan KAMACI wrote: > > Hi Matthias, > > > > Thanks for your detailed answer. By the way I couldn't find > > "KGroupedStream" at version of 0.10.0.1? > > > > Kind Regards, Furkan KAMACI > > > > On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax > > <matth...@confluent.io> wrote: > > > > Hi, > > > > You just need to read you stream and apply an (windowed) > > aggregation on it. > > > > If you use non-windowed aggregation you will get "since the > > beginning". If you use windowed aggregation you can specify the > > window size as 1 hour and get those results. > > > > One comment: it seems that you want to count *all* queries. To > > make this work, you need to make sure all records are using the > > same key (because Kafka Streams only supports aggregation over > > keyed streams). Keep in mind, that this prohibits parallelization > > of you aggregation! > > > > As a workaround, you could also do two consecutive aggregation, and > > do parallelize the first one, and do not parallelize the second one > > (ie, using the first one as a pre aggregation similar to a combine > > step) > > > > Without pre aggregation and assuming all records use the same key > > something like this (for current trunk): > > > > > >>>> KStreamBuilder builder = new KStreamBuilder(): KStream input > >>>> = builder.stream("yourTopic"); > >>>> > >>>> KGroupedStream groupedInput = input.groupByKey(); > >>>> > >>>> groupedInput.count("countStore").to("outputTopicCountFromBeginning" > ); > >>>> > >>>> > > > >>>> > groupedInput.count(TimeWindows.of(3600 * 1000), > > "windowedCountStore").to("outputTopicHourlyCounts"): > > > > > > For more details, please see the docs and examples: > > > > - http://docs.confluent.io/current/streams/index.html - > > https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/ > ka > > > > > fka-streams > > > > > > -Matthias > > > > On 10/18/16 5:00 AM, Furkan KAMACI wrote: > >>>> Hi, > >>>> > >>>> I could successfully run Kafka at my environment. I want to > >>>> monitor Queries per Second at my search application with > >>>> Kafka. Whenever a search request is done I create a > >>>> ProducerRecord which holds current nano time of the system. > >>>> > >>>> I know that I have to use a streaming API for calculation > >>>> i.e. Kafka Streams or Spark Streams. My choice is to use > >>>> Kafka Streams. > >>>> > >>>> For last 1 hours, or since the beginning, I have to calculate > >>>> the queries per second. How can I make such an aggregation at > >>>> Kafka Streams? > >>>> > >>>> Kind Regards, Furkan KAMACI > >>>> > >> > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYBnlRAAoJECnhiMLycopPF+cQAKWt58HvcEebqXC+KlSc5M8c > rcxqTbkH3YT9SEm0zLinoXWaJyd/EHUkaWSStiNekZgRe9BsXBHjFnhy/Pg20D0A > JYKBA0IK4DTBy6sJvu1Wyd08iQ85HTmFlMZDg38EkTJOkp8SnPhQ4O2/IKyudWFD > kLBBJBLSEEdFcP+HWnP469rcfVBcr7kE+bgPxAPTLH0/v0G7+RAwwxi/wfV+c/TB > kvGkn+sYgRtyduUS62wVUTC4tOYAuooqn6/Aiwdu+e/a4+S0DsSoQQi0Oyts+gd9 > 6/aDLPnGrHT1kUMNbGIqOqLLw2rxs3NtQXFB3odjgt+rHtEuItqohgkV5SCjut3Y > Uv89xQOKrx9TgtTUTcra3ckwffVFNsFa+DGuZbMvm2P2hC1k/7yCZGa+0l6vRauk > wQ5dw0Ug/DGWHYFSIBuDz81mDsmgmpLh/QXIcqIJ3rQ1VgDbfopwQhuuaQiaEPDF > p9S524sy3EYMVGqzdWOFC2+7MVYrnWK6CEkxpAvOGqJw951eAObM9OFmiN1o0wJ4 > Kkif20adZRY6HANFyurEkPHs2id/JVh/LVkV6DO/DAtqun4rFesuC3m8bUyOlBjq > UbHmDnq40X6uohvfiurO4NGmOfLBEm6GQPxTyNFgEUCBrORjsgXaY7bpzsxUNvvc > u+554Ztge1RtJCjbbtR1 > =z4/M > -----END PGP SIGNATURE----- >