Sorry but it seemed harder than I thought, to have the custom aggregation working I need to get an ArrayList of all the values in the window, so far my aggregate DSL method creates an ArrayList on the initializer and adds each value to the list in the aggregator. Then I think I'll have to provide a serder to change the output type of that method. I was looking at https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api but that seems more towards a list of longs and already uses longSerde. I'm currently trying to implement another avro model that has a field of type array so I can use the regular avro serializer to implement this. Should I create my own serdes instead or is this the right way?
Thank you in advance -- Alessandro Tagliapietra On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Thank you Bruno and Matthias, > > I've modified the transformer to implement the ValueTransformerWithKey > interface and everything is working fine. > I've now to window the data and manually aggregate each window data since > I've to do some averages and sum of differences. > So far I've just having some issues with message types since I'm changing > the data type when aggregating the window but I think it's an easy problem. > > Thank you again > Best > > -- > Alessandro Tagliapietra > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <br...@confluent.io> wrote: > >> Hi Alessandro, >> >> the `TransformSupplier` is internally wrapped with a `ProcessorSupplier`, >> so the statement >> >> `transform` is essentially equivalent to adding the Transformer via >> Topology#addProcessor() to your processor topology >> >> is correct. >> >> If you do not change the key, you should definitely use one of the >> overloads of `transformValues` to avoid internal data redistribution. In >> your case the overload with `ValueTransformerWithKeySupplier` as suggested >> by Matthias would fit. >> >> Best, >> Bruno >> >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >> > There is also `ValueTransformerWithKey` that gives you read-only acess >> > to the key. >> > >> > -Matthias >> > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: >> > > Hi Bruno, >> > > >> > > Thank you for the quick answer. >> > > >> > > I'm actually trying to do that since it seems there is really no way >> to >> > > have it use `Processor<K, V>`. >> > > I just wanted (if that would've made any sense) to use the Processor >> in >> > > both DSL and non-DSL pipelines. >> > > >> > > Anyway, regarding `transformValues()` I don't think I can use it as I >> > need >> > > the message key since that is the discriminating value for the filter >> (I >> > > want to exclude old values per sensor ID so per message key) >> > > >> > > Right now I've this >> > > >> > >> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java >> > > and >> > > i'm using it with `transform()` . >> > > >> > > One thing I've found confusing is this >> > > >> > >> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process >> > > >> > > transform is essentially equivalent to adding the Transformer via >> > >> Topology#addProcessor() to yourprocessor topology >> > >> < >> > >> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology >> > > >> > >> . >> > > >> > > >> > > is it? Doesn't `transform` need a TransformSupplier while >> `addProcessor` >> > > uses a ProcessorSupplier? >> > > >> > > Thank you again for your help >> > > >> > > -- >> > > Alessandro Tagliapietra >> > > >> > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io> >> > wrote: >> > > >> > >> Hi Alessandro, >> > >> >> > >> Have you considered using `transform()` (actually in your case you >> > should >> > >> use `transformValues()`) instead of `.process()`? `transform()` and >> > >> `transformValues()` are stateful operations similar to `.process` but >> > they >> > >> return a `KStream`. On a `KStream` you can then apply a windowed >> > >> aggregation. >> > >> >> > >> Hope that helps. >> > >> >> > >> Best, >> > >> Bruno >> > >> >> > >> >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra < >> > >> tagliapietra.alessan...@gmail.com> wrote: >> > >> >> > >>> Hi there, >> > >>> >> > >>> I'm just starting with Kafka and I'm trying to create a stream >> > processor >> > >>> that in multiple stages: >> > >>> - filters messages using a kv store so that only messages with >> higher >> > >>> timestamp gets processed >> > >>> - aggregates the message metrics by minute giving e.g. the avg of >> > those >> > >>> metrics in that minute >> > >>> >> > >>> The message is simple, the key is the sensor ID and the value is >> e.g. { >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }. >> > >>> >> > >>> I've started by creating a processor to use the kv store and filter >> old >> > >>> messages: >> > >>> >> > >>> >> > >>> >> > >> >> > >> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java >> > >>> >> > >>> Then I was trying to implement windowing, I saw very nice windowing >> > >>> examples for the DSL but none for the Processor API (only a small >> > >> reference >> > >>> to the windowed store), can someone point me in the right direction? >> > >>> >> > >>> Now, since I wasn't able to find any example I tried to use the DSL >> but >> > >>> haven't found a way to use my processor with it, I saw this >> > >>> >> > >>> >> > >> >> > >> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration >> > >>> but >> > >>> it explains mostly transformers not processors. I also saw after >> that >> > the >> > >>> example usage of the processor but `.process(...)` returns void, so >> I >> > >>> cannot have a KStream from a processor? >> > >>> >> > >>> Thank you all in advance >> > >>> >> > >>> -- >> > >>> Alessandro Tagliapietra >> > >>> >> > >> >> > > >> > >> > >> >