Hi Matthias,

I've tried this code:

*        final Properties streamsConfiguration = new Properties();*
*        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"myapp");*
*        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");*
*        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
"localhost:2181");*
*        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());*
*        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());*
*        final KStreamBuilder builder = new KStreamBuilder();*
*        final KStream input = builder.stream("myapp-test");*

*        final KStream<String, Long> searchCounts =
input.countByKey("SearchRequests").toStream();*
*        searchCounts.countByKey(TimeWindows.of("Hourly", 3600 *
1000)).to("outputTopicHourlyCounts");*

*        final KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);*
*        streams.start();*

*        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));*

However I get an error:


*Exception in thread "StreamThread-1" java.lang.ClassCastException:
org.apache.kafka.streams.kstream.Windowed cannot be cast to
java.lang.String*

On the other hand when I try this code:

https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a

I get an error too which indicates that:

*Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException: Size of data
received by LongDeserializer is not 8 *

Here is generated topic:

        *kafka-console-consumer --zookeeper localhost:2181 --topic
myapp-test --from-beginning*
*        28952314828122*
*        28988681653726*
*        29080089383233*

I know that I miss something but couldn't find it.

Kind Regards,
Furkan KAMACI

On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> I see. KGroupedStream will be part of 0.10.1.0 (should be release the
> next weeks).
>
> So, instead of
>
> > .groupByKey().count()
>
> you need to do
>
> > .countByKey()
>
>
>
> - -Matthias
>
> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
> > Hi Matthias,
> >
> > Thanks for your detailed answer. By the way I couldn't find
> > "KGroupedStream" at version of 0.10.0.1?
> >
> > Kind Regards, Furkan KAMACI
> >
> > On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax
> > <matth...@confluent.io> wrote:
> >
> > Hi,
> >
> > You just need to read you stream and apply an (windowed)
> > aggregation on it.
> >
> > If you use non-windowed aggregation you will get "since the
> > beginning". If you use windowed aggregation you can specify the
> > window size as 1 hour and get those results.
> >
> > One comment: it seems that you want to count *all* queries. To
> > make this work, you need to make sure all records are using the
> > same key (because Kafka Streams only supports aggregation over
> > keyed streams). Keep in mind, that this prohibits parallelization
> > of you aggregation!
> >
> > As a workaround, you could also do two consecutive aggregation, and
> > do parallelize the first one, and do not parallelize the second one
> > (ie, using the first one as a pre aggregation similar to a combine
> > step)
> >
> > Without pre aggregation and assuming all records use the same key
> > something like this (for current trunk):
> >
> >
> >>>> KStreamBuilder builder = new KStreamBuilder(): KStream input
> >>>> = builder.stream("yourTopic");
> >>>>
> >>>> KGroupedStream groupedInput = input.groupByKey();
> >>>>
> >>>> groupedInput.count("countStore").to("outputTopicCountFromBeginning"
> );
> >>>>
> >>>>
> >
> >>>>
> groupedInput.count(TimeWindows.of(3600 * 1000),
> > "windowedCountStore").to("outputTopicHourlyCounts"):
> >
> >
> > For more details, please see the docs and examples:
> >
> > - http://docs.confluent.io/current/streams/index.html -
> > https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/
> ka
> >
> >
> fka-streams
> >
> >
> > -Matthias
> >
> > On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> >>>> Hi,
> >>>>
> >>>> I could successfully run Kafka at my environment. I want to
> >>>> monitor Queries per Second at my search application with
> >>>> Kafka. Whenever a search request is done I create a
> >>>> ProducerRecord which holds current nano time of the system.
> >>>>
> >>>> I know that I have to use a streaming API for calculation
> >>>> i.e. Kafka Streams or Spark Streams. My choice is to use
> >>>> Kafka Streams.
> >>>>
> >>>> For last 1 hours, or since the beginning, I have to calculate
> >>>> the queries per second. How can I make such an aggregation at
> >>>> Kafka Streams?
> >>>>
> >>>> Kind Regards, Furkan KAMACI
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYBnlRAAoJECnhiMLycopPF+cQAKWt58HvcEebqXC+KlSc5M8c
> rcxqTbkH3YT9SEm0zLinoXWaJyd/EHUkaWSStiNekZgRe9BsXBHjFnhy/Pg20D0A
> JYKBA0IK4DTBy6sJvu1Wyd08iQ85HTmFlMZDg38EkTJOkp8SnPhQ4O2/IKyudWFD
> kLBBJBLSEEdFcP+HWnP469rcfVBcr7kE+bgPxAPTLH0/v0G7+RAwwxi/wfV+c/TB
> kvGkn+sYgRtyduUS62wVUTC4tOYAuooqn6/Aiwdu+e/a4+S0DsSoQQi0Oyts+gd9
> 6/aDLPnGrHT1kUMNbGIqOqLLw2rxs3NtQXFB3odjgt+rHtEuItqohgkV5SCjut3Y
> Uv89xQOKrx9TgtTUTcra3ckwffVFNsFa+DGuZbMvm2P2hC1k/7yCZGa+0l6vRauk
> wQ5dw0Ug/DGWHYFSIBuDz81mDsmgmpLh/QXIcqIJ3rQ1VgDbfopwQhuuaQiaEPDF
> p9S524sy3EYMVGqzdWOFC2+7MVYrnWK6CEkxpAvOGqJw951eAObM9OFmiN1o0wJ4
> Kkif20adZRY6HANFyurEkPHs2id/JVh/LVkV6DO/DAtqun4rFesuC3m8bUyOlBjq
> UbHmDnq40X6uohvfiurO4NGmOfLBEm6GQPxTyNFgEUCBrORjsgXaY7bpzsxUNvvc
> u+554Ztge1RtJCjbbtR1
> =z4/M
> -----END PGP SIGNATURE-----
>

Reply via email to