There is also `ValueTransformerWithKey` that gives you read-only acess to the key.
-Matthias On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > Hi Bruno, > > Thank you for the quick answer. > > I'm actually trying to do that since it seems there is really no way to > have it use `Processor<K, V>`. > I just wanted (if that would've made any sense) to use the Processor in > both DSL and non-DSL pipelines. > > Anyway, regarding `transformValues()` I don't think I can use it as I need > the message key since that is the discriminating value for the filter (I > want to exclude old values per sensor ID so per message key) > > Right now I've this > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java > and > i'm using it with `transform()` . > > One thing I've found confusing is this > https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process > > transform is essentially equivalent to adding the Transformer via >> Topology#addProcessor() to yourprocessor topology >> <https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology> >> . > > > is it? Doesn't `transform` need a TransformSupplier while `addProcessor` > uses a ProcessorSupplier? > > Thank you again for your help > > -- > Alessandro Tagliapietra > > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io> wrote: > >> Hi Alessandro, >> >> Have you considered using `transform()` (actually in your case you should >> use `transformValues()`) instead of `.process()`? `transform()` and >> `transformValues()` are stateful operations similar to `.process` but they >> return a `KStream`. On a `KStream` you can then apply a windowed >> aggregation. >> >> Hope that helps. >> >> Best, >> Bruno >> >> >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra < >> tagliapietra.alessan...@gmail.com> wrote: >> >>> Hi there, >>> >>> I'm just starting with Kafka and I'm trying to create a stream processor >>> that in multiple stages: >>> - filters messages using a kv store so that only messages with higher >>> timestamp gets processed >>> - aggregates the message metrics by minute giving e.g. the avg of those >>> metrics in that minute >>> >>> The message is simple, the key is the sensor ID and the value is e.g. { >>> timestamp: UNIX_TIMESTAMP, speed: INT }. >>> >>> I've started by creating a processor to use the kv store and filter old >>> messages: >>> >>> >>> >> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java >>> >>> Then I was trying to implement windowing, I saw very nice windowing >>> examples for the DSL but none for the Processor API (only a small >> reference >>> to the windowed store), can someone point me in the right direction? >>> >>> Now, since I wasn't able to find any example I tried to use the DSL but >>> haven't found a way to use my processor with it, I saw this >>> >>> >> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration >>> but >>> it explains mostly transformers not processors. I also saw after that the >>> example usage of the processor but `.process(...)` returns void, so I >>> cannot have a KStream from a processor? >>> >>> Thank you all in advance >>> >>> -- >>> Alessandro Tagliapietra >>> >> >
signature.asc
Description: OpenPGP digital signature