Thank you Bruno,

I'll look into those, however average is just a simple thing I'm trying
right now just to get an initial windowing flow working.
In the future I'll probably still need the actual values for other
calculations. We won't have more than 60 elements per window for sure.

So far to not manually serialize/deserialize the array list I've created an
Avro model with an array field containing the values.
I had issues with suppress as explained here

https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198

but I got that working.
So far everything seems to be working, except a couple things:
 - if I generate data with 1 key, I correctly get a value each 10 seconds,
if I later start generating data with another key (while key 1 is still
generating) the windowing emits a value only after the timestamp of key 2
reaches the last generated window
 - while generating data, if I restart the processor as soon as it starts
it sometimes generates 2 aggregates for the same window even if I'm using
the suppress

Anyway, I'll look into your link and try to find out the cause of these
issues, probably starting from scratch with a simpler example

Thank you for your help!

--
Alessandro Tagliapietra

On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> Have a look at this Kafka Usage Pattern for computing averages without
> using an ArrayList.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> ?
>
> The advantages of this pattern over the ArrayList approach is the reduced
> space needed to compute the aggregate. Note that you will still need to
> implement a SerDe. However, the SerDe should be a bit easier to implement
> than a SerDe for an ArrayList.
>
> Hope that helps.
>
> Best,
> Bruno
>
> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Sorry but it seemed harder than I thought,
> >
> > to have the custom aggregation working I need to get an ArrayList of all
> > the values in the window, so far my aggregate DSL method creates an
> > ArrayList on the initializer and adds each value to the list in the
> > aggregator.
> > Then I think I'll have to provide a serder to change the output type of
> > that method.
> > I was looking at
> >
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > but
> > that seems more towards a list of longs and already uses longSerde.
> > I'm currently trying to implement another avro model that has a field of
> > type array so I can use the regular avro serializer to implement this.
> > Should I create my own serdes instead or is this the right way?
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > Thank you Bruno and Matthias,
> > >
> > > I've modified the transformer to implement the ValueTransformerWithKey
> > > interface and everything is working fine.
> > > I've now to window the data and manually aggregate each window data
> since
> > > I've to do some averages and sum of differences.
> > > So far I've just having some issues with message types since I'm
> changing
> > > the data type when aggregating the window but I think it's an easy
> > problem.
> > >
> > > Thank you again
> > > Best
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <br...@confluent.io>
> > wrote:
> > >
> > >> Hi Alessandro,
> > >>
> > >> the `TransformSupplier` is internally wrapped with a
> > `ProcessorSupplier`,
> > >> so the statement
> > >>
> > >> `transform` is essentially equivalent to adding the Transformer via
> > >> Topology#addProcessor() to your processor topology
> > >>
> > >> is correct.
> > >>
> > >> If you do not change the key, you should definitely use one of the
> > >> overloads of `transformValues` to avoid internal data redistribution.
> In
> > >> your case the overload with `ValueTransformerWithKeySupplier` as
> > suggested
> > >> by Matthias would fit.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> matth...@confluent.io
> > >
> > >> wrote:
> > >>
> > >> > There is also `ValueTransformerWithKey` that gives you read-only
> acess
> > >> > to the key.
> > >> >
> > >> > -Matthias
> > >> >
> > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > >> > > Hi Bruno,
> > >> > >
> > >> > > Thank you for the quick answer.
> > >> > >
> > >> > > I'm actually trying to do that since it seems there is really no
> way
> > >> to
> > >> > > have it use `Processor<K, V>`.
> > >> > > I just wanted (if that would've made any sense) to use the
> Processor
> > >> in
> > >> > > both DSL and non-DSL pipelines.
> > >> > >
> > >> > > Anyway, regarding `transformValues()` I don't think I can use it
> as
> > I
> > >> > need
> > >> > > the message key since that is the discriminating value for the
> > filter
> > >> (I
> > >> > > want to exclude old values per sensor ID so per message key)
> > >> > >
> > >> > > Right now I've this
> > >> > >
> > >> >
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > >> > > and
> > >> > > i'm using it with `transform()` .
> > >> > >
> > >> > > One thing I've found confusing is this
> > >> > >
> > >> >
> > >>
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > >> > >
> > >> > > transform is essentially equivalent to adding the Transformer via
> > >> > >> Topology#addProcessor() to yourprocessor topology
> > >> > >> <
> > >> >
> > >>
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > >> > >
> > >> > >> .
> > >> > >
> > >> > >
> > >> > > is it? Doesn't `transform` need a TransformSupplier while
> > >> `addProcessor`
> > >> > > uses a ProcessorSupplier?
> > >> > >
> > >> > > Thank you again for your help
> > >> > >
> > >> > > --
> > >> > > Alessandro Tagliapietra
> > >> > >
> > >> > >
> > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io
> >
> > >> > wrote:
> > >> > >
> > >> > >> Hi Alessandro,
> > >> > >>
> > >> > >> Have you considered using `transform()` (actually in your case
> you
> > >> > should
> > >> > >> use `transformValues()`) instead of `.process()`? `transform()`
> and
> > >> > >> `transformValues()` are stateful operations similar to `.process`
> > but
> > >> > they
> > >> > >> return a `KStream`. On a `KStream` you can then apply a windowed
> > >> > >> aggregation.
> > >> > >>
> > >> > >> Hope that helps.
> > >> > >>
> > >> > >> Best,
> > >> > >> Bruno
> > >> > >>
> > >> > >>
> > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
> > >> > >> tagliapietra.alessan...@gmail.com> wrote:
> > >> > >>
> > >> > >>> Hi there,
> > >> > >>>
> > >> > >>> I'm just starting with Kafka and I'm trying to create a stream
> > >> > processor
> > >> > >>> that in multiple stages:
> > >> > >>>  - filters messages using a kv store so that only messages with
> > >> higher
> > >> > >>> timestamp gets processed
> > >> > >>>  - aggregates the message metrics by minute giving e.g. the avg
> of
> > >> > those
> > >> > >>> metrics in that minute
> > >> > >>>
> > >> > >>> The message is simple, the key is the sensor ID and the value is
> > >> e.g. {
> > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > >> > >>>
> > >> > >>> I've started by creating a processor to use the kv store and
> > filter
> > >> old
> > >> > >>> messages:
> > >> > >>>
> > >> > >>>
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > >> > >>>
> > >> > >>> Then I was trying to implement windowing, I saw very nice
> > windowing
> > >> > >>> examples for the DSL but none for the Processor API (only a
> small
> > >> > >> reference
> > >> > >>> to the windowed store), can someone point me in the right
> > direction?
> > >> > >>>
> > >> > >>> Now, since I wasn't able to find any example I tried to use the
> > DSL
> > >> but
> > >> > >>> haven't found a way to use my processor with it, I saw this
> > >> > >>>
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > >> > >>> but
> > >> > >>> it explains mostly transformers not processors. I also saw after
> > >> that
> > >> > the
> > >> > >>> example usage of the processor but `.process(...)` returns void,
> > so
> > >> I
> > >> > >>> cannot have a KStream from a processor?
> > >> > >>>
> > >> > >>> Thank you all in advance
> > >> > >>>
> > >> > >>> --
> > >> > >>> Alessandro Tagliapietra
> > >> > >>>
> > >> > >>
> > >> > >
> > >> >
> > >> >
> > >>
> > >
> >
>

Reply via email to