Hi Bruno, I'm using the confluent docker images 5.2.1, so kafka 2.2. Anyway I'll try to make a small reproduction repo with all the different cases soon.
Thank you -- Alessandro Tagliapietra On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io> wrote: > Hi Alessandro, > > What version of Kafka do you use? > > Could you please give a more detailed example for the issues with the two > keys you see? > > Could the following bug be related to the duplicates you see? > > > https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22 > > How do you restart the processor? > > Best, > Bruno > > On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > Thank you Bruno, > > > > I'll look into those, however average is just a simple thing I'm trying > > right now just to get an initial windowing flow working. > > In the future I'll probably still need the actual values for other > > calculations. We won't have more than 60 elements per window for sure. > > > > So far to not manually serialize/deserialize the array list I've created > an > > Avro model with an array field containing the values. > > I had issues with suppress as explained here > > > > > > > https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198 > > > > but I got that working. > > So far everything seems to be working, except a couple things: > > - if I generate data with 1 key, I correctly get a value each 10 > seconds, > > if I later start generating data with another key (while key 1 is still > > generating) the windowing emits a value only after the timestamp of key 2 > > reaches the last generated window > > - while generating data, if I restart the processor as soon as it starts > > it sometimes generates 2 aggregates for the same window even if I'm using > > the suppress > > > > Anyway, I'll look into your link and try to find out the cause of these > > issues, probably starting from scratch with a simpler example > > > > Thank you for your help! > > > > -- > > Alessandro Tagliapietra > > > > On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io> > wrote: > > > > > Hi Alessandro, > > > > > > Have a look at this Kafka Usage Pattern for computing averages without > > > using an ArrayList. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average > > > ? > > > > > > The advantages of this pattern over the ArrayList approach is the > reduced > > > space needed to compute the aggregate. Note that you will still need to > > > implement a SerDe. However, the SerDe should be a bit easier to > implement > > > than a SerDe for an ArrayList. > > > > > > Hope that helps. > > > > > > Best, > > > Bruno > > > > > > On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra < > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > Sorry but it seemed harder than I thought, > > > > > > > > to have the custom aggregation working I need to get an ArrayList of > > all > > > > the values in the window, so far my aggregate DSL method creates an > > > > ArrayList on the initializer and adds each value to the list in the > > > > aggregator. > > > > Then I think I'll have to provide a serder to change the output type > of > > > > that method. > > > > I was looking at > > > > > > > > > > > > > > https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api > > > > but > > > > that seems more towards a list of longs and already uses longSerde. > > > > I'm currently trying to implement another avro model that has a field > > of > > > > type array so I can use the regular avro serializer to implement > this. > > > > Should I create my own serdes instead or is this the right way? > > > > > > > > Thank you in advance > > > > > > > > -- > > > > Alessandro Tagliapietra > > > > > > > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra < > > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > > > Thank you Bruno and Matthias, > > > > > > > > > > I've modified the transformer to implement the > > ValueTransformerWithKey > > > > > interface and everything is working fine. > > > > > I've now to window the data and manually aggregate each window data > > > since > > > > > I've to do some averages and sum of differences. > > > > > So far I've just having some issues with message types since I'm > > > changing > > > > > the data type when aggregating the window but I think it's an easy > > > > problem. > > > > > > > > > > Thank you again > > > > > Best > > > > > > > > > > -- > > > > > Alessandro Tagliapietra > > > > > > > > > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <br...@confluent.io > > > > > > wrote: > > > > > > > > > >> Hi Alessandro, > > > > >> > > > > >> the `TransformSupplier` is internally wrapped with a > > > > `ProcessorSupplier`, > > > > >> so the statement > > > > >> > > > > >> `transform` is essentially equivalent to adding the Transformer > via > > > > >> Topology#addProcessor() to your processor topology > > > > >> > > > > >> is correct. > > > > >> > > > > >> If you do not change the key, you should definitely use one of the > > > > >> overloads of `transformValues` to avoid internal data > > redistribution. > > > In > > > > >> your case the overload with `ValueTransformerWithKeySupplier` as > > > > suggested > > > > >> by Matthias would fit. > > > > >> > > > > >> Best, > > > > >> Bruno > > > > >> > > > > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax < > > > matth...@confluent.io > > > > > > > > > >> wrote: > > > > >> > > > > >> > There is also `ValueTransformerWithKey` that gives you read-only > > > acess > > > > >> > to the key. > > > > >> > > > > > >> > -Matthias > > > > >> > > > > > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > > > > >> > > Hi Bruno, > > > > >> > > > > > > >> > > Thank you for the quick answer. > > > > >> > > > > > > >> > > I'm actually trying to do that since it seems there is really > no > > > way > > > > >> to > > > > >> > > have it use `Processor<K, V>`. > > > > >> > > I just wanted (if that would've made any sense) to use the > > > Processor > > > > >> in > > > > >> > > both DSL and non-DSL pipelines. > > > > >> > > > > > > >> > > Anyway, regarding `transformValues()` I don't think I can use > it > > > as > > > > I > > > > >> > need > > > > >> > > the message key since that is the discriminating value for the > > > > filter > > > > >> (I > > > > >> > > want to exclude old values per sensor ID so per message key) > > > > >> > > > > > > >> > > Right now I've this > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java > > > > >> > > and > > > > >> > > i'm using it with `transform()` . > > > > >> > > > > > > >> > > One thing I've found confusing is this > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process > > > > >> > > > > > > >> > > transform is essentially equivalent to adding the Transformer > > via > > > > >> > >> Topology#addProcessor() to yourprocessor topology > > > > >> > >> < > > > > >> > > > > > >> > > > > > > > > > > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology > > > > >> > > > > > > >> > >> . > > > > >> > > > > > > >> > > > > > > >> > > is it? Doesn't `transform` need a TransformSupplier while > > > > >> `addProcessor` > > > > >> > > uses a ProcessorSupplier? > > > > >> > > > > > > >> > > Thank you again for your help > > > > >> > > > > > > >> > > -- > > > > >> > > Alessandro Tagliapietra > > > > >> > > > > > > >> > > > > > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna < > > br...@confluent.io > > > > > > > > >> > wrote: > > > > >> > > > > > > >> > >> Hi Alessandro, > > > > >> > >> > > > > >> > >> Have you considered using `transform()` (actually in your > case > > > you > > > > >> > should > > > > >> > >> use `transformValues()`) instead of `.process()`? > `transform()` > > > and > > > > >> > >> `transformValues()` are stateful operations similar to > > `.process` > > > > but > > > > >> > they > > > > >> > >> return a `KStream`. On a `KStream` you can then apply a > > windowed > > > > >> > >> aggregation. > > > > >> > >> > > > > >> > >> Hope that helps. > > > > >> > >> > > > > >> > >> Best, > > > > >> > >> Bruno > > > > >> > >> > > > > >> > >> > > > > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra < > > > > >> > >> tagliapietra.alessan...@gmail.com> wrote: > > > > >> > >> > > > > >> > >>> Hi there, > > > > >> > >>> > > > > >> > >>> I'm just starting with Kafka and I'm trying to create a > stream > > > > >> > processor > > > > >> > >>> that in multiple stages: > > > > >> > >>> - filters messages using a kv store so that only messages > > with > > > > >> higher > > > > >> > >>> timestamp gets processed > > > > >> > >>> - aggregates the message metrics by minute giving e.g. the > > avg > > > of > > > > >> > those > > > > >> > >>> metrics in that minute > > > > >> > >>> > > > > >> > >>> The message is simple, the key is the sensor ID and the > value > > is > > > > >> e.g. { > > > > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }. > > > > >> > >>> > > > > >> > >>> I've started by creating a processor to use the kv store and > > > > filter > > > > >> old > > > > >> > >>> messages: > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java > > > > >> > >>> > > > > >> > >>> Then I was trying to implement windowing, I saw very nice > > > > windowing > > > > >> > >>> examples for the DSL but none for the Processor API (only a > > > small > > > > >> > >> reference > > > > >> > >>> to the windowed store), can someone point me in the right > > > > direction? > > > > >> > >>> > > > > >> > >>> Now, since I wasn't able to find any example I tried to use > > the > > > > DSL > > > > >> but > > > > >> > >>> haven't found a way to use my processor with it, I saw this > > > > >> > >>> > > > > >> > >>> > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration > > > > >> > >>> but > > > > >> > >>> it explains mostly transformers not processors. I also saw > > after > > > > >> that > > > > >> > the > > > > >> > >>> example usage of the processor but `.process(...)` returns > > void, > > > > so > > > > >> I > > > > >> > >>> cannot have a KStream from a processor? > > > > >> > >>> > > > > >> > >>> Thank you all in advance > > > > >> > >>> > > > > >> > >>> -- > > > > >> > >>> Alessandro Tagliapietra > > > > >> > >>> > > > > >> > >> > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > > > > > >