-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

About auto-topic creation: If your broker configuration allows for
this, yes it would work. However, keep in mind, that the topic will be
created with default values (according to broker config) with regard
to number of partitions and replication factor. Those value might not
meet your needs. Therefore, it is highly recommended to not rely on
topic auto-create but to create all topics manually (to specify number
of partitions and replication factor with values that do meet your needs
).

And great that you got window-count working.

- From you email, I am not sure if you are stuck again and if yes, what
your question is?

- -Matthias

On 10/19/16 11:03 AM, Furkan KAMACI wrote:
> I could successfully get the total (not average). As far as I see,
> there is no need to create a topic manually before I run the app.
> Topic is created if there is data and topic name not exists. Here
> is my code:
> 
> KStreamBuilder builder = new KStreamBuilder();
> 
> KStream<String, String> longs = builder.stream(Serdes.String(), 
> Serdes.String(), "mytopic");
> 
> KTable<Windowed<String>, Long> longCounts = 
> longs.countByKey(TimeWindows.of("output-topic", 3600 * 1000), 
> Serdes.String());
> 
> KafkaStreams streams = new KafkaStreams(builder, 
> streamsConfiguration); streams.start();
> 
> Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> 
> On Wed, Oct 19, 2016 at 1:58 AM, Matthias J. Sax
> <matth...@confluent.io> wrote:
> 
> You should create input/intermediate and output topic manually
> before you start you Kafka Streams application.
> 
> 
> -Matthias
> 
> On 10/18/16 3:34 PM, Furkan KAMACI wrote:
>>>> Sorry about concurrent questions. Tried below code, didn't
>>>> get any error but couldn't get created output topic:
>>>> 
>>>> Properties props = new Properties();
>>>> props.put("bootstrap.servers", "localhost:9092");
>>>> props.put("acks", "all"); props.put("retries", 0);
>>>> props.put("batch.size", 16384); props.put("linger.ms", 1); 
>>>> props.put("buffer.memory", 33554432);
>>>> props.put("key.serializer", 
>>>> "org.apache.kafka.common.serialization.StringSerializer"); 
>>>> props.put("value.serializer", 
>>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>> 
>>>> Producer<String, String> producer = new
>>>> KafkaProducer<>(props);
>>>> 
>>>> for (int i = 0; i < 1000; i++) { producer.send(new 
>>>> ProducerRecord<>( "input-topic",
>>>> String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}",
>>>> System.nanoTime() * 1e-9, i)));
>>>> 
>>>> 
>>>> final KStreamBuilder builder = new KStreamBuilder(); final 
>>>> KStream<String, Long> qps = builder.stream(Serdes.String(), 
>>>> Serdes.Long(), "input-topic"); 
>>>> qps.countByKey(TimeWindows.of("Hourly", 3600 * 
>>>> 1000)).mapValues(Object::toString).to("output-topic");
>>>> 
>>>> final KafkaStreams streams = new KafkaStreams(builder, 
>>>> streamsConfiguration); streams.start();
>>>> 
>>>> Runtime.getRuntime().addShutdownHook(new
>>>> Thread(streams::close));
>>>> 
>>>> On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax 
>>>> <matth...@confluent.io> wrote:
>>>> 
>>>> Two things:
>>>> 
>>>> 1) you should not apply the window to the first count, but to
>>>> the base stream to get correct results.
>>>> 
>>>> 2) your windowed aggregation, doew not just return String
>>>> type, but Window<K> type. Thus, you need to either insert a
>>>> .map() to transform you data into String typo, or you provide
>>>> a custom serializer when writing data to output topic
>>>> (method, .to(...) has multiple overloads)
>>>> 
>>>> Per default, each topic read/write operation uses Serdes from
>>>> the streams config. If you data has a different type, you
>>>> need to provide appropriate Serdes for those operators.
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 10/18/16 2:01 PM, Furkan KAMACI wrote:
>>>>>>> Hi Matthias,
>>>>>>> 
>>>>>>> I've tried this code:
>>>>>>> 
>>>>>>> *        final Properties streamsConfiguration = new 
>>>>>>> Properties();* * 
>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
"myapp");* *
>>>>>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
"localhost:9092");* *
>>>>>>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
"localhost:2181");* *
>>>>>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
Serdes.String().getClass().getName());* *
>>>>>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
Serdes.String().getClass().getName());* *        final
>>>>>>> KStreamBuilder builder = new KStreamBuilder();* * final
>>>>>>> KStream input = builder.stream("myapp-test");*
>>>>>>> 
>>>>>>> *        final KStream<String, Long> searchCounts = 
>>>>>>> input.countByKey("SearchRequests").toStream();* * 
>>>>>>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600
>>>>>>> * 1000)).to("outputTopicHourlyCounts");*
>>>>>>> 
>>>>>>> *        final KafkaStreams streams = new 
>>>>>>> KafkaStreams(builder, streamsConfiguration);* * 
>>>>>>> streams.start();*
>>>>>>> 
>>>>>>> *        Runtime.getRuntime().addShutdownHook(new 
>>>>>>> Thread(streams::close));*
>>>>>>> 
>>>>>>> However I get an error:
>>>>>>> 
>>>>>>> 
>>>>>>> *Exception in thread "StreamThread-1" 
>>>>>>> java.lang.ClassCastException: 
>>>>>>> org.apache.kafka.streams.kstream.Windowed cannot be
>>>>>>> cast to java.lang.String*
>>>>>>> 
>>>>>>> On the other hand when I try this code:
>>>>>>> 
>>>>>>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c77
60a
>
>>>>>>> 
f3a
>>>>>>> 
>>>>>>> 
>>>>>>> 
> I get an error too which indicates that:
>>>>>>> 
>>>>>>> *Exception in thread "StreamThread-1" 
>>>>>>> org.apache.kafka.common.errors.SerializationException:
>>>>>>> Size of data received by LongDeserializer is not 8 *
>>>>>>> 
>>>>>>> Here is generated topic:
>>>>>>> 
>>>>>>> *kafka-console-consumer --zookeeper localhost:2181
>>>>>>> --topic myapp-test --from-beginning* *
>>>>>>> 28952314828122* * 28988681653726* *
>>>>>>> 29080089383233*
>>>>>>> 
>>>>>>> I know that I miss something but couldn't find it.
>>>>>>> 
>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>> 
>>>>>>> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax 
>>>>>>> <matth...@confluent.io> wrote:
>>>>>>> 
>>>>>>> I see. KGroupedStream will be part of 0.10.1.0 (should
>>>>>>> be release the next weeks).
>>>>>>> 
>>>>>>> So, instead of
>>>>>>> 
>>>>>>>>>> .groupByKey().count()
>>>>>>> 
>>>>>>> you need to do
>>>>>>> 
>>>>>>>>>> .countByKey()
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
>>>>>>>>>> Hi Matthias,
>>>>>>>>>> 
>>>>>>>>>> Thanks for your detailed answer. By the way I
>>>>>>>>>> couldn't find "KGroupedStream" at version of
>>>>>>>>>> 0.10.0.1?
>>>>>>>>>> 
>>>>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>>>>> 
>>>>>>>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax 
>>>>>>>>>> <matth...@confluent.io> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> You just need to read you stream and apply an 
>>>>>>>>>> (windowed) aggregation on it.
>>>>>>>>>> 
>>>>>>>>>> If you use non-windowed aggregation you will get
>>>>>>>>>> "since the beginning". If you use windowed
>>>>>>>>>> aggregation you can specify the window size as 1
>>>>>>>>>> hour and get those results.
>>>>>>>>>> 
>>>>>>>>>> One comment: it seems that you want to count
>>>>>>>>>> *all* queries. To make this work, you need to
>>>>>>>>>> make sure all records are using the same key
>>>>>>>>>> (because Kafka Streams only supports aggregation
>>>>>>>>>> over keyed streams). Keep in mind, that this
>>>>>>>>>> prohibits parallelization of you aggregation!
>>>>>>>>>> 
>>>>>>>>>> As a workaround, you could also do two
>>>>>>>>>> consecutive aggregation, and do parallelize the
>>>>>>>>>> first one, and do not parallelize the second one
>>>>>>>>>> (ie, using the first one as a pre aggregation
>>>>>>>>>> similar to a combine step)
>>>>>>>>>> 
>>>>>>>>>> Without pre aggregation and assuming all records
>>>>>>>>>> use the same key something like this (for current
>>>>>>>>>> trunk):
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>>>> KStreamBuilder builder = new
>>>>>>>>>>>>> KStreamBuilder(): KStream input =
>>>>>>>>>>>>> builder.stream("yourTopic");
>>>>>>>>>>>>> 
>>>>>>>>>>>>> KGroupedStream groupedInput = 
>>>>>>>>>>>>> input.groupByKey();
>>>>>>>>>>>>> 
>>>>>>>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromB
egi
>
>>>>>>>>>>>>> 
nni
>>>> 
>>>>>>>>>>>>> 
> ng"
>>>>>>> 
>>>>>>>>>>>>> 
>>>> );
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>> groupedInput.count(TimeWindows.of(3600 * 1000),
>>>>>>>>>> "windowedCountStore").to("outputTopicHourlyCounts"):
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 
For more details, please see the docs and examples:
>>>>>>>>>> 
>>>>>>>>>> -
>>>>>>>>>> http://docs.confluent.io/current/streams/index.html
>>>>>>>>>>
>>>>>>>>>> 
- -
>>>>>>>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-
cp-
>
>>>>>>>>>> 
3.0
>>>> 
>>>>>>>>>> 
> .1/
>>>>>>> 
>>>>>>>>>> 
>>>> ka
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> fka-streams
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I could successfully run Kafka at my
>>>>>>>>>>>>> environment. I want to monitor Queries per
>>>>>>>>>>>>> Second at my search application with Kafka.
>>>>>>>>>>>>> Whenever a search request is done I create
>>>>>>>>>>>>> a ProducerRecord which holds current nano
>>>>>>>>>>>>> time of the system.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I know that I have to use a streaming API
>>>>>>>>>>>>> for calculation i.e. Kafka Streams or Spark
>>>>>>>>>>>>> Streams. My choice is to use Kafka
>>>>>>>>>>>>> Streams.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> For last 1 hours, or since the beginning, I
>>>>>>>>>>>>> have to calculate the queries per second.
>>>>>>>>>>>>> How can I make such an aggregation at Kafka
>>>>>>>>>>>>> Streams?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYB7zqAAoJECnhiMLycopP24sP/0QsNvVnqOYCpd1m3GI96JrT
Wbn7q5YAUL95JolBd0GmTb1DeCy+c1n5+DS8I8yx2Cr4yr1WoymSKnmoP14H8pMG
JTb4Dc/QZZEUS4pE5JD6/524IcEC7CA/L3qu/Ax2m5BG6hsYheGhP02vMnrThYZh
DqNxjp+7WAB3OZ73X9gOQyuF3qyn+QEyC2ebNRlH5oRowjaEqIixsfzv3OmMCrpL
U+0EkQE4PZzogQ1arp8CxHNozLjVQeca/X4EM9lqfpf2+qEo7MBtjdRSBvRUZMgd
OHssAX3Nldk4y6u+a3EWPcsm1QHfi2qg1MFD42VW02pY8ncCQeQ8OYgLhqvuMjkh
P/7JoSbbRBM62zv92mHruTuUJGOYe6OMwEb4G3Jmi6Yd++2kcZD8kGOLwUqUY+X7
MElpVW7QRrSGwUH9Qc+OffZ8udHNQUNRlZfPJPuMaBL/WZ+krmkd/neq8nsK7UR4
n0l+jNzbn+LNTjYG5Lvp3IzskIaW9D2r/2BaQedKi8BwhvXHIcH9aO1WrH6Nlt0r
omoTK4e/5u4HAGPyy08YKqFd6m5ofDyMf0GJ0oigQ/ifRdZWITNX5W0GSZZ2PI4d
MaRxwz29rr41pM0RP6XFP5nqb+vI23v1kW9b8OnjkEXfMXUaJgyAEQHDrrICS/UM
X0rcIVer8vIjol9JwQyS
=awCG
-----END PGP SIGNATURE-----

Reply via email to