Hi Alessandro, the `TransformSupplier` is internally wrapped with a `ProcessorSupplier`, so the statement
`transform` is essentially equivalent to adding the Transformer via Topology#addProcessor() to your processor topology is correct. If you do not change the key, you should definitely use one of the overloads of `transformValues` to avoid internal data redistribution. In your case the overload with `ValueTransformerWithKeySupplier` as suggested by Matthias would fit. Best, Bruno On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <matth...@confluent.io> wrote: > There is also `ValueTransformerWithKey` that gives you read-only acess > to the key. > > -Matthias > > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > > Hi Bruno, > > > > Thank you for the quick answer. > > > > I'm actually trying to do that since it seems there is really no way to > > have it use `Processor<K, V>`. > > I just wanted (if that would've made any sense) to use the Processor in > > both DSL and non-DSL pipelines. > > > > Anyway, regarding `transformValues()` I don't think I can use it as I > need > > the message key since that is the discriminating value for the filter (I > > want to exclude old values per sensor ID so per message key) > > > > Right now I've this > > > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java > > and > > i'm using it with `transform()` . > > > > One thing I've found confusing is this > > > https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process > > > > transform is essentially equivalent to adding the Transformer via > >> Topology#addProcessor() to yourprocessor topology > >> < > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology > > > >> . > > > > > > is it? Doesn't `transform` need a TransformSupplier while `addProcessor` > > uses a ProcessorSupplier? > > > > Thank you again for your help > > > > -- > > Alessandro Tagliapietra > > > > > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io> > wrote: > > > >> Hi Alessandro, > >> > >> Have you considered using `transform()` (actually in your case you > should > >> use `transformValues()`) instead of `.process()`? `transform()` and > >> `transformValues()` are stateful operations similar to `.process` but > they > >> return a `KStream`. On a `KStream` you can then apply a windowed > >> aggregation. > >> > >> Hope that helps. > >> > >> Best, > >> Bruno > >> > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra < > >> tagliapietra.alessan...@gmail.com> wrote: > >> > >>> Hi there, > >>> > >>> I'm just starting with Kafka and I'm trying to create a stream > processor > >>> that in multiple stages: > >>> - filters messages using a kv store so that only messages with higher > >>> timestamp gets processed > >>> - aggregates the message metrics by minute giving e.g. the avg of > those > >>> metrics in that minute > >>> > >>> The message is simple, the key is the sensor ID and the value is e.g. { > >>> timestamp: UNIX_TIMESTAMP, speed: INT }. > >>> > >>> I've started by creating a processor to use the kv store and filter old > >>> messages: > >>> > >>> > >>> > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java > >>> > >>> Then I was trying to implement windowing, I saw very nice windowing > >>> examples for the DSL but none for the Processor API (only a small > >> reference > >>> to the windowed store), can someone point me in the right direction? > >>> > >>> Now, since I wasn't able to find any example I tried to use the DSL but > >>> haven't found a way to use my processor with it, I saw this > >>> > >>> > >> > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration > >>> but > >>> it explains mostly transformers not processors. I also saw after that > the > >>> example usage of the processor but `.process(...)` returns void, so I > >>> cannot have a KStream from a processor? > >>> > >>> Thank you all in advance > >>> > >>> -- > >>> Alessandro Tagliapietra > >>> > >> > > > >