Hi Alessandro,

the `TransformSupplier` is internally wrapped with a `ProcessorSupplier`,
so the statement

`transform` is essentially equivalent to adding the Transformer via
Topology#addProcessor() to your processor topology

is correct.

If you do not change the key, you should definitely use one of the
overloads of `transformValues` to avoid internal data redistribution. In
your case the overload with `ValueTransformerWithKeySupplier` as suggested
by Matthias would fit.

Best,
Bruno

On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> There is also `ValueTransformerWithKey` that gives you read-only acess
> to the key.
>
> -Matthias
>
> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > Hi Bruno,
> >
> > Thank you for the quick answer.
> >
> > I'm actually trying to do that since it seems there is really no way to
> > have it use `Processor<K, V>`.
> > I just wanted (if that would've made any sense) to use the Processor in
> > both DSL and non-DSL pipelines.
> >
> > Anyway, regarding `transformValues()` I don't think I can use it as I
> need
> > the message key since that is the discriminating value for the filter (I
> > want to exclude old values per sensor ID so per message key)
> >
> > Right now I've this
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > and
> > i'm using it with `transform()` .
> >
> > One thing I've found confusing is this
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> >
> > transform is essentially equivalent to adding the Transformer via
> >> Topology#addProcessor() to yourprocessor topology
> >> <
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> >
> >> .
> >
> >
> > is it? Doesn't `transform` need a TransformSupplier while `addProcessor`
> > uses a ProcessorSupplier?
> >
> > Thank you again for your help
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io>
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> Have you considered using `transform()` (actually in your case you
> should
> >> use `transformValues()`) instead of `.process()`? `transform()` and
> >> `transformValues()` are stateful operations similar to `.process` but
> they
> >> return a `KStream`. On a `KStream` you can then apply a windowed
> >> aggregation.
> >>
> >> Hope that helps.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrote:
> >>
> >>> Hi there,
> >>>
> >>> I'm just starting with Kafka and I'm trying to create a stream
> processor
> >>> that in multiple stages:
> >>>  - filters messages using a kv store so that only messages with higher
> >>> timestamp gets processed
> >>>  - aggregates the message metrics by minute giving e.g. the avg of
> those
> >>> metrics in that minute
> >>>
> >>> The message is simple, the key is the sensor ID and the value is e.g. {
> >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> >>>
> >>> I've started by creating a processor to use the kv store and filter old
> >>> messages:
> >>>
> >>>
> >>>
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> >>>
> >>> Then I was trying to implement windowing, I saw very nice windowing
> >>> examples for the DSL but none for the Processor API (only a small
> >> reference
> >>> to the windowed store), can someone point me in the right direction?
> >>>
> >>> Now, since I wasn't able to find any example I tried to use the DSL but
> >>> haven't found a way to use my processor with it, I saw this
> >>>
> >>>
> >>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> >>> but
> >>> it explains mostly transformers not processors. I also saw after that
> the
> >>> example usage of the processor but `.process(...)` returns void, so I
> >>> cannot have a KStream from a processor?
> >>>
> >>> Thank you all in advance
> >>>
> >>> --
> >>> Alessandro Tagliapietra
> >>>
> >>
> >
>
>

Reply via email to