Hi Bruno,
no worries.
No that was an old problem, the latest code is on the gist from the last
email.
Anyway I've pushed the master branch with the same code, I'm not sure I've
done the right thing with the jars but the code should be there.
The gist https://gist.github.com/alex88/43b72e23bda9e156
Hi Alessandro,
Apologies for the late reply.
I tried the code from your repository under
https://github.com/alex88/kafka-test/tree/master and I run into a
`ClassCastException`. I think this is a bug that is described here
https://issues.apache.org/jira/browse/KAFKA-8317 .
Should I have tried one
Ok so I'm not sure if I did this correctly,
I've upgraded both the server (by replacing the JARs in the confluent
docker image with those built from kafka source) and the client (by using
the built JARs as local file dependencies).
I've used this as source: https://github.com/apache/kafka/archive/
Hi Bruno,
thank you for your help, glad to hear that those are only bugs and not a
problem on my implementation,
I'm currently using confluent docker images, I've checked their master
branch which seems to use the SNAPSHOT version however those
images/packages aren't publicly available. Are there
Hi Alessandro,
It seems that the behaviour you described regarding the window aggregation
is due to bugs. The good news is that the bugs have been already fixed.
The relevant bug reports are
https://issues.apache.org/jira/browse/KAFKA-7895
https://issues.apache.org/jira/browse/KAFKA-8204
The fix
Thanks Matthias, one less thing to worry about in the future :)
--
Alessandro Tagliapietra
On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax
wrote:
> Just a side note. There is currently work in progress on
> https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
> configuration p
Just a side note. There is currently work in progress on
https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
configuration problem for Serdes.
-Matthias
On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> Hi Bruno,
> thanks a lot for checking the code, regarding the SpecificAvro
Hi Bruno,
thanks a lot for checking the code, regarding the SpecificAvroSerde I've
found that using
final Serde valueSpecificAvroSerde = new SpecificAvroSerde<>();
final Map serdeConfig =
Collections.singletonMap("schema.registry.url", "http://localhost:8081";);
valueSpecificAvroSerde.configure(se
Hi Alessandro,
I had a look at your code. Regarding your question whether you use the
SpecificAvroSerde correctly, take a look at the following documentation:
https://docs.confluent.io/current/streams/developer-guide/datatypes.html
I haven't had the time yet to take a closer look at your problem
So I've started with a new app with the archetype:generate as in
https://kafka.apache.org/22/documentation/streams/tutorial
I've pushed a sample repo here: https://github.com/alex88/kafka-test
The avro schemas are a Metric with 2 fields: timestamp and production and a
MetricList with a list of rec
Hi Bruno,
I'm using the confluent docker images 5.2.1, so kafka 2.2.
Anyway I'll try to make a small reproduction repo with all the different
cases soon.
Thank you
--
Alessandro Tagliapietra
On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna wrote:
> Hi Alessandro,
>
> What version of Kafka do yo
Hi Alessandro,
What version of Kafka do you use?
Could you please give a more detailed example for the issues with the two
keys you see?
Could the following bug be related to the duplicates you see?
https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%
Thank you Bruno,
I'll look into those, however average is just a simple thing I'm trying
right now just to get an initial windowing flow working.
In the future I'll probably still need the actual values for other
calculations. We won't have more than 60 elements per window for sure.
So far to not
Hi Alessandro,
Have a look at this Kafka Usage Pattern for computing averages without
using an ArrayList.
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average?
The advantages of this pattern over the ArrayList appr
Sorry but it seemed harder than I thought,
to have the custom aggregation working I need to get an ArrayList of all
the values in the window, so far my aggregate DSL method creates an
ArrayList on the initializer and adds each value to the list in the
aggregator.
Then I think I'll have to provide
Thank you Bruno and Matthias,
I've modified the transformer to implement the ValueTransformerWithKey
interface and everything is working fine.
I've now to window the data and manually aggregate each window data since
I've to do some averages and sum of differences.
So far I've just having some iss
Hi Alessandro,
the `TransformSupplier` is internally wrapped with a `ProcessorSupplier`,
so the statement
`transform` is essentially equivalent to adding the Transformer via
Topology#addProcessor() to your processor topology
is correct.
If you do not change the key, you should definitely use on
There is also `ValueTransformerWithKey` that gives you read-only acess
to the key.
-Matthias
On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> Hi Bruno,
>
> Thank you for the quick answer.
>
> I'm actually trying to do that since it seems there is really no way to
> have it use `Processor`.
Hi Bruno,
Thank you for the quick answer.
I'm actually trying to do that since it seems there is really no way to
have it use `Processor`.
I just wanted (if that would've made any sense) to use the Processor in
both DSL and non-DSL pipelines.
Anyway, regarding `transformValues()` I don't think I
Hi Alessandro,
Have you considered using `transform()` (actually in your case you should
use `transformValues()`) instead of `.process()`? `transform()` and
`transformValues()` are stateful operations similar to `.process` but they
return a `KStream`. On a `KStream` you can then apply a windowed
a
20 matches
Mail list logo