-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Two things:

1) you should not apply the window to the first count, but to the base
stream to get correct results.

2) your windowed aggregation, doew not just return String type, but
Window<K> type. Thus, you need to either insert a .map() to transform
you data into String typo, or you provide a custom serializer when
writing data to output topic (method, .to(...) has multiple overloads)

Per default, each topic read/write operation uses Serdes from the
streams config. If you data has a different type, you need to provide
appropriate Serdes for those operators.


- -Matthias

On 10/18/16 2:01 PM, Furkan KAMACI wrote:
> Hi Matthias,
> 
> I've tried this code:
> 
> *        final Properties streamsConfiguration = new
> Properties();* *
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "myapp");* *
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");* *
> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
> "localhost:2181");* *
> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());* *
> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());* *        final
> KStreamBuilder builder = new KStreamBuilder();* *        final
> KStream input = builder.stream("myapp-test");*
> 
> *        final KStream<String, Long> searchCounts = 
> input.countByKey("SearchRequests").toStream();* *
> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * 
> 1000)).to("outputTopicHourlyCounts");*
> 
> *        final KafkaStreams streams = new KafkaStreams(builder, 
> streamsConfiguration);* *        streams.start();*
> 
> *        Runtime.getRuntime().addShutdownHook(new
> Thread(streams::close));*
> 
> However I get an error:
> 
> 
> *Exception in thread "StreamThread-1"
> java.lang.ClassCastException: 
> org.apache.kafka.streams.kstream.Windowed cannot be cast to 
> java.lang.String*
> 
> On the other hand when I try this code:
> 
> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a
>
>  I get an error too which indicates that:
> 
> *Exception in thread "StreamThread-1" 
> org.apache.kafka.common.errors.SerializationException: Size of
> data received by LongDeserializer is not 8 *
> 
> Here is generated topic:
> 
> *kafka-console-consumer --zookeeper localhost:2181 --topic 
> myapp-test --from-beginning* *        28952314828122* *
> 28988681653726* *        29080089383233*
> 
> I know that I miss something but couldn't find it.
> 
> Kind Regards, Furkan KAMACI
> 
> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax
> <matth...@confluent.io> wrote:
> 
> I see. KGroupedStream will be part of 0.10.1.0 (should be release
> the next weeks).
> 
> So, instead of
> 
>>>> .groupByKey().count()
> 
> you need to do
> 
>>>> .countByKey()
> 
> 
> 
> -Matthias
> 
> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
>>>> Hi Matthias,
>>>> 
>>>> Thanks for your detailed answer. By the way I couldn't find 
>>>> "KGroupedStream" at version of 0.10.0.1?
>>>> 
>>>> Kind Regards, Furkan KAMACI
>>>> 
>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax 
>>>> <matth...@confluent.io> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> You just need to read you stream and apply an (windowed) 
>>>> aggregation on it.
>>>> 
>>>> If you use non-windowed aggregation you will get "since the 
>>>> beginning". If you use windowed aggregation you can specify
>>>> the window size as 1 hour and get those results.
>>>> 
>>>> One comment: it seems that you want to count *all* queries.
>>>> To make this work, you need to make sure all records are
>>>> using the same key (because Kafka Streams only supports
>>>> aggregation over keyed streams). Keep in mind, that this
>>>> prohibits parallelization of you aggregation!
>>>> 
>>>> As a workaround, you could also do two consecutive
>>>> aggregation, and do parallelize the first one, and do not
>>>> parallelize the second one (ie, using the first one as a pre
>>>> aggregation similar to a combine step)
>>>> 
>>>> Without pre aggregation and assuming all records use the same
>>>> key something like this (for current trunk):
>>>> 
>>>> 
>>>>>>> KStreamBuilder builder = new KStreamBuilder(): KStream
>>>>>>> input = builder.stream("yourTopic");
>>>>>>> 
>>>>>>> KGroupedStream groupedInput = input.groupByKey();
>>>>>>> 
>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBeginni
ng"
>
>>>>>>> 
);
>>>>>>> 
>>>>>>> 
>>>> 
>>>>>>> 
> groupedInput.count(TimeWindows.of(3600 * 1000),
>>>> "windowedCountStore").to("outputTopicHourlyCounts"):
>>>> 
>>>> 
>>>> For more details, please see the docs and examples:
>>>> 
>>>> - http://docs.confluent.io/current/streams/index.html - 
>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0
.1/
>
>>>> 
ka
>>>> 
>>>> 
> fka-streams
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> I could successfully run Kafka at my environment. I
>>>>>>> want to monitor Queries per Second at my search
>>>>>>> application with Kafka. Whenever a search request is
>>>>>>> done I create a ProducerRecord which holds current nano
>>>>>>> time of the system.
>>>>>>> 
>>>>>>> I know that I have to use a streaming API for
>>>>>>> calculation i.e. Kafka Streams or Spark Streams. My
>>>>>>> choice is to use Kafka Streams.
>>>>>>> 
>>>>>>> For last 1 hours, or since the beginning, I have to
>>>>>>> calculate the queries per second. How can I make such
>>>>>>> an aggregation at Kafka Streams?
>>>>>>> 
>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYBpDSAAoJECnhiMLycopPguYQAIfe/JwkpDgvePNJceb5s+kr
oQQrQ2ja0A7R4aNmnFBFA5QZ9vbtP25CUCAD4y/FAKDoneGi8vYBf0Ky9l3flh5+
admwq5wQyJesgS+mHTo/iUqHJUbTHTHixyKyvMMwmqJvgbaRkLFFx5GFhgrZZhHo
4jc0s1oebdzMA4dNkrdaM6+M0G9pZmE1ILz26EDPXdxfnBIp8zNK8LxqRubzvzML
gv+wVU8USB2dkRR6WTB56WKlpfSFjAUweyrv9iEJdvfOwsuBStRf5ex7YG5BWbgi
E3yCeKPR0GPy+3zj7Bjjsts5hYA0LZnJZpjGjpSxtd4dl/nH7El+SEEB+aNXv+3f
UuSufV335sSDYteLMWySJBKQAu8AgDIeVnqMQwnaNywhhXVXuoLkoRv/h/x9Fiwk
g26S7+JN4MQKwbHreMDrLSPEQy0oPdgCTtgcjA0BlOb6wzcUNNiETiyYVy2OoT04
bCAge7KW43afmwiY4t7WetLjSvQMOJMq+tRArpVuX0Fk6IfE5LsiStRTQCnlQxHM
ruXSbqPWh3DYU32EL/QwzyiiZhPUmjN5SCehBjmRWEfnEgay2qbXh0Hnft0sk6f0
/SUbl/i11D4hhhPSnNTSQj9qEJT2SD7A7N90FplgQDwfCMWKg76Sfn85qMLiFnRE
FDk0ghehl5ROJhXgs1eN
=OU4j
-----END PGP SIGNATURE-----

Reply via email to