Hi Bruno,

Thank you for the quick answer.

I'm actually trying to do that since it seems there is really no way to
have it use `Processor<K, V>`.
I just wanted (if that would've made any sense) to use the Processor in
both DSL and non-DSL pipelines.

Anyway, regarding `transformValues()` I don't think I can use it as I need
the message key since that is the discriminating value for the filter (I
want to exclude old values per sensor ID so per message key)

Right now I've this
https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
and
i'm using it with `transform()` .

One thing I've found confusing is this
https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process

transform is essentially equivalent to adding the Transformer via
> Topology#addProcessor() to yourprocessor topology
> <https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology>
> .


is it? Doesn't `transform` need a TransformSupplier while `addProcessor`
uses a ProcessorSupplier?

Thank you again for your help

--
Alessandro Tagliapietra


On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> Have you considered using `transform()` (actually in your case you should
> use `transformValues()`) instead of `.process()`? `transform()` and
> `transformValues()` are stateful operations similar to `.process` but they
> return a `KStream`. On a `KStream` you can then apply a windowed
> aggregation.
>
> Hope that helps.
>
> Best,
> Bruno
>
>
> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi there,
> >
> > I'm just starting with Kafka and I'm trying to create a stream processor
> > that in multiple stages:
> >  - filters messages using a kv store so that only messages with higher
> > timestamp gets processed
> >  - aggregates the message metrics by minute giving e.g. the avg of those
> > metrics in that minute
> >
> > The message is simple, the key is the sensor ID and the value is e.g. {
> > timestamp: UNIX_TIMESTAMP, speed: INT }.
> >
> > I've started by creating a processor to use the kv store and filter old
> > messages:
> >
> >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> >
> > Then I was trying to implement windowing, I saw very nice windowing
> > examples for the DSL but none for the Processor API (only a small
> reference
> > to the windowed store), can someone point me in the right direction?
> >
> > Now, since I wasn't able to find any example I tried to use the DSL but
> > haven't found a way to use my processor with it, I saw this
> >
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > but
> > it explains mostly transformers not processors. I also saw after that the
> > example usage of the processor but `.process(...)` returns void, so I
> > cannot have a KStream from a processor?
> >
> > Thank you all in advance
> >
> > --
> > Alessandro Tagliapietra
> >
>

Reply via email to