Re: Using processor API via DSL

2019-05-08 Thread Alessandro Tagliapietra
Hi Bruno, no worries. No that was an old problem, the latest code is on the gist from the last email. Anyway I've pushed the master branch with the same code, I'm not sure I've done the right thing with the jars but the code should be there. The gist https://gist.github.com/alex88/43b72e23bda9e156

Re: Using processor API via DSL

2019-05-08 Thread Bruno Cadonna
Hi Alessandro, Apologies for the late reply. I tried the code from your repository under https://github.com/alex88/kafka-test/tree/master and I run into a `ClassCastException`. I think this is a bug that is described here https://issues.apache.org/jira/browse/KAFKA-8317 . Should I have tried one

Re: Using processor API via DSL

2019-05-03 Thread Alessandro Tagliapietra
Ok so I'm not sure if I did this correctly, I've upgraded both the server (by replacing the JARs in the confluent docker image with those built from kafka source) and the client (by using the built JARs as local file dependencies). I've used this as source: https://github.com/apache/kafka/archive/

Re: Using processor API via DSL

2019-05-02 Thread Alessandro Tagliapietra
Hi Bruno, thank you for your help, glad to hear that those are only bugs and not a problem on my implementation, I'm currently using confluent docker images, I've checked their master branch which seems to use the SNAPSHOT version however those images/packages aren't publicly available. Are there

Re: Using processor API via DSL

2019-04-23 Thread Bruno Cadonna
Hi Alessandro, It seems that the behaviour you described regarding the window aggregation is due to bugs. The good news is that the bugs have been already fixed. The relevant bug reports are https://issues.apache.org/jira/browse/KAFKA-7895 https://issues.apache.org/jira/browse/KAFKA-8204 The fix

Re: Using processor API via DSL

2019-04-20 Thread Alessandro Tagliapietra
Thanks Matthias, one less thing to worry about in the future :) -- Alessandro Tagliapietra On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax wrote: > Just a side note. There is currently work in progress on > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the > configuration p

Re: Using processor API via DSL

2019-04-20 Thread Matthias J. Sax
Just a side note. There is currently work in progress on https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the configuration problem for Serdes. -Matthias On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote: > Hi Bruno, > thanks a lot for checking the code, regarding the SpecificAvro

Re: Using processor API via DSL

2019-04-19 Thread Alessandro Tagliapietra
Hi Bruno, thanks a lot for checking the code, regarding the SpecificAvroSerde I've found that using final Serde valueSpecificAvroSerde = new SpecificAvroSerde<>(); final Map serdeConfig = Collections.singletonMap("schema.registry.url", "http://localhost:8081";); valueSpecificAvroSerde.configure(se

Re: Using processor API via DSL

2019-04-19 Thread Bruno Cadonna
Hi Alessandro, I had a look at your code. Regarding your question whether you use the SpecificAvroSerde correctly, take a look at the following documentation: https://docs.confluent.io/current/streams/developer-guide/datatypes.html I haven't had the time yet to take a closer look at your problem

Re: Using processor API via DSL

2019-04-17 Thread Alessandro Tagliapietra
So I've started with a new app with the archetype:generate as in https://kafka.apache.org/22/documentation/streams/tutorial I've pushed a sample repo here: https://github.com/alex88/kafka-test The avro schemas are a Metric with 2 fields: timestamp and production and a MetricList with a list of rec

Re: Using processor API via DSL

2019-04-16 Thread Alessandro Tagliapietra
Hi Bruno, I'm using the confluent docker images 5.2.1, so kafka 2.2. Anyway I'll try to make a small reproduction repo with all the different cases soon. Thank you -- Alessandro Tagliapietra On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna wrote: > Hi Alessandro, > > What version of Kafka do yo

Re: Using processor API via DSL

2019-04-16 Thread Bruno Cadonna
Hi Alessandro, What version of Kafka do you use? Could you please give a more detailed example for the issues with the two keys you see? Could the following bug be related to the duplicates you see? https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%

Re: Using processor API via DSL

2019-04-15 Thread Alessandro Tagliapietra
Thank you Bruno, I'll look into those, however average is just a simple thing I'm trying right now just to get an initial windowing flow working. In the future I'll probably still need the actual values for other calculations. We won't have more than 60 elements per window for sure. So far to not

Re: Using processor API via DSL

2019-04-15 Thread Bruno Cadonna
Hi Alessandro, Have a look at this Kafka Usage Pattern for computing averages without using an ArrayList. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average? The advantages of this pattern over the ArrayList appr

Re: Using processor API via DSL

2019-04-15 Thread Alessandro Tagliapietra
Sorry but it seemed harder than I thought, to have the custom aggregation working I need to get an ArrayList of all the values in the window, so far my aggregate DSL method creates an ArrayList on the initializer and adds each value to the list in the aggregator. Then I think I'll have to provide

Re: Using processor API via DSL

2019-04-15 Thread Alessandro Tagliapietra
Thank you Bruno and Matthias, I've modified the transformer to implement the ValueTransformerWithKey interface and everything is working fine. I've now to window the data and manually aggregate each window data since I've to do some averages and sum of differences. So far I've just having some iss

Re: Using processor API via DSL

2019-04-14 Thread Bruno Cadonna
Hi Alessandro, the `TransformSupplier` is internally wrapped with a `ProcessorSupplier`, so the statement `transform` is essentially equivalent to adding the Transformer via Topology#addProcessor() to your processor topology is correct. If you do not change the key, you should definitely use on

Re: Using processor API via DSL

2019-04-13 Thread Matthias J. Sax
There is also `ValueTransformerWithKey` that gives you read-only acess to the key. -Matthias On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > Hi Bruno, > > Thank you for the quick answer. > > I'm actually trying to do that since it seems there is really no way to > have it use `Processor`.

Re: Using processor API via DSL

2019-04-12 Thread Alessandro Tagliapietra
Hi Bruno, Thank you for the quick answer. I'm actually trying to do that since it seems there is really no way to have it use `Processor`. I just wanted (if that would've made any sense) to use the Processor in both DSL and non-DSL pipelines. Anyway, regarding `transformValues()` I don't think I

Re: Using processor API via DSL

2019-04-12 Thread Bruno Cadonna
Hi Alessandro, Have you considered using `transform()` (actually in your case you should use `transformValues()`) instead of `.process()`? `transform()` and `transformValues()` are stateful operations similar to `.process` but they return a `KStream`. On a `KStream` you can then apply a windowed a