Hi Bruno,
thanks a lot for checking the code, regarding the SpecificAvroSerde I've
found that using

final Serde<InputList> valueSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig =
Collections.singletonMap("schema.registry.url", "http://localhost:8081";);
valueSpecificAvroSerde.configure(serdeConfig, false);

and then in aggregate()

Materialized.with(Serdes.String(), valueSpecificAvroSerde)

fixed the issue.

Thanks in advance for the windowing help, very appreciated.
In the meantime I'll try to make some progress on the rest.

Have a great weekend

--
Alessandro Tagliapietra


On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> I had a look at your code. Regarding your question whether you use the
> SpecificAvroSerde correctly, take a look at the following documentation:
>
> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
>
> I haven't had the time yet to take a closer look at your problems with the
> aggregation. I will have a look next week.
>
> Have a nice weekend,
> Bruno
>
> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > So I've started with a new app with the archetype:generate as in
> > https://kafka.apache.org/22/documentation/streams/tutorial
> >
> > I've pushed a sample repo here: https://github.com/alex88/kafka-test
> > The avro schemas are a Metric with 2 fields: timestamp and production
> and a
> > MetricList with a list of records (Metric) to be able to manually do the
> > aggregation.
> > Right now the aggregation is simple just for the purpose of the sample
> repo
> > and to easily see if we're getting wrong values.
> >
> > What I wanted to achieve is:
> >  - have a custom generator that generates 1 message per second with
> > production = 1 with 1 ore more separate message keys which in my case are
> > the sensor IDs generating the data
> >  - a filter that removes out of order messages by having a state that
> > stores key (sensorID) -> last timestamp
> >  - a window operation that for this example just sums the values in each
> 10
> > seconds windows
> >
> > To show where I'm having issues I've setup multiple branches for the
> repo:
> >  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* is
> the
> > one I had initially "Failed to flush state store
> > KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve using
> >
> >
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
> >  - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* is
> the
> > one after I've tried to solve above problem with the materializer (maybe
> > the SpecificAvroSerde is wrong?)
> >  - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>* after
> > fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), new
> > SpecificAvroSerde<>()))) everything seems to be working, if you let both
> > the producer and stream running, you'll see that the stream receives 10
> > messages (with the timestamp incrementing 1 second for each message) like
> > this:
> >
> > S1 with filtered metric{"timestamp": 160000, "production": 1}
> > S1 with filtered metric{"timestamp": 161000, "production": 1}
> > S1 with filtered metric{"timestamp": 162000, "production": 1}
> > S1 with filtered metric{"timestamp": 163000, "production": 1}
> > S1 with filtered metric{"timestamp": 164000, "production": 1}
> > S1 with filtered metric{"timestamp": 165000, "production": 1}
> > S1 with filtered metric{"timestamp": 166000, "production": 1}
> > S1 with filtered metric{"timestamp": 167000, "production": 1}
> > S1 with filtered metric{"timestamp": 168000, "production": 1}
> > S1 with filtered metric{"timestamp": 169000, "production": 1}
> >
> > and at the 10 seconds interval something like:
> >
> > S1 with computed metric {"timestamp": 160000, "production": 10}
> > S1 with computed metric {"timestamp": 170000, "production": 10}
> > S1 with computed metric {"timestamp": 180000, "production": 10}
> >
> > and so on...
> > Now there are two problems, after stopping and restarting the stream
> > processor (by sending SIGINT via IntelliJ since I start the class main
> with
> > it) it happens:
> >  - sometimes the aggregated count is wrong, if I have it start windowing
> > for 7 seconds (e.g. seconds 11-17), restart the stream, after restart it
> > might just emit a value for the new 3 missing seconds (seconds 18-20) and
> > the aggregated value is 3 not 10
> >  - sometimes the window outputs twice, in the example where I restart the
> > stream processor I might get as output
> >
> > S1 with filtered metric{"timestamp": 154000, "production": 1}
> > S1 with computed metric {"timestamp": 160000, "production": 5}
> > S1 with filtered metric{"timestamp": 155000, "production": 1}
> > S1 with filtered metric{"timestamp": 156000, "production": 1}
> > S1 with filtered metric{"timestamp": 157000, "production": 1}
> > S1 with filtered metric{"timestamp": 158000, "production": 1}
> > S1 with filtered metric{"timestamp": 159000, "production": 1}
> > S1 with filtered metric{"timestamp": 160000, "production": 1}
> > S1 with filtered metric{"timestamp": 161000, "production": 1}
> > S1 with computed metric {"timestamp": 160000, "production": 10}
> > S1 with filtered metric{"timestamp": 162000, "production": 1}
> >
> > as you can see, window for timestamp 160000 is duplicated
> >
> > Is this because the window state isn't persisted across restarts?
> > My ultimate goal is to have the window part emit only once and resume
> > processing across restarts, while avoiding processing out of order data
> > (that's the purpose of the TimestampIncrementalFilter)
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > Hi Bruno,
> > >
> > > I'm using the confluent docker images 5.2.1, so kafka 2.2.
> > > Anyway I'll try to make a small reproduction repo with all the
> different
> > > cases soon.
> > >
> > > Thank you
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > > On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io>
> > wrote:
> > >
> > >> Hi Alessandro,
> > >>
> > >> What version of Kafka do you use?
> > >>
> > >> Could you please give a more detailed example for the issues with the
> > two
> > >> keys you see?
> > >>
> > >> Could the following bug be related to the duplicates you see?
> > >>
> > >>
> > >>
> >
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> > >>
> > >> How do you restart the processor?
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> > >> tagliapietra.alessan...@gmail.com> wrote:
> > >>
> > >> > Thank you Bruno,
> > >> >
> > >> > I'll look into those, however average is just a simple thing I'm
> > trying
> > >> > right now just to get an initial windowing flow working.
> > >> > In the future I'll probably still need the actual values for other
> > >> > calculations. We won't have more than 60 elements per window for
> sure.
> > >> >
> > >> > So far to not manually serialize/deserialize the array list I've
> > >> created an
> > >> > Avro model with an array field containing the values.
> > >> > I had issues with suppress as explained here
> > >> >
> > >> >
> > >> >
> > >>
> >
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> > >> >
> > >> > but I got that working.
> > >> > So far everything seems to be working, except a couple things:
> > >> >  - if I generate data with 1 key, I correctly get a value each 10
> > >> seconds,
> > >> > if I later start generating data with another key (while key 1 is
> > still
> > >> > generating) the windowing emits a value only after the timestamp of
> > key
> > >> 2
> > >> > reaches the last generated window
> > >> >  - while generating data, if I restart the processor as soon as it
> > >> starts
> > >> > it sometimes generates 2 aggregates for the same window even if I'm
> > >> using
> > >> > the suppress
> > >> >
> > >> > Anyway, I'll look into your link and try to find out the cause of
> > these
> > >> > issues, probably starting from scratch with a simpler example
> > >> >
> > >> > Thank you for your help!
> > >> >
> > >> > --
> > >> > Alessandro Tagliapietra
> > >> >
> > >> > On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io>
> > >> wrote:
> > >> >
> > >> > > Hi Alessandro,
> > >> > >
> > >> > > Have a look at this Kafka Usage Pattern for computing averages
> > without
> > >> > > using an ArrayList.
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> > >> > > ?
> > >> > >
> > >> > > The advantages of this pattern over the ArrayList approach is the
> > >> reduced
> > >> > > space needed to compute the aggregate. Note that you will still
> need
> > >> to
> > >> > > implement a SerDe. However, the SerDe should be a bit easier to
> > >> implement
> > >> > > than a SerDe for an ArrayList.
> > >> > >
> > >> > > Hope that helps.
> > >> > >
> > >> > > Best,
> > >> > > Bruno
> > >> > >
> > >> > > On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> > >> > > tagliapietra.alessan...@gmail.com> wrote:
> > >> > >
> > >> > > > Sorry but it seemed harder than I thought,
> > >> > > >
> > >> > > > to have the custom aggregation working I need to get an
> ArrayList
> > of
> > >> > all
> > >> > > > the values in the window, so far my aggregate DSL method creates
> > an
> > >> > > > ArrayList on the initializer and adds each value to the list in
> > the
> > >> > > > aggregator.
> > >> > > > Then I think I'll have to provide a serder to change the output
> > >> type of
> > >> > > > that method.
> > >> > > > I was looking at
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > >> > > > but
> > >> > > > that seems more towards a list of longs and already uses
> > longSerde.
> > >> > > > I'm currently trying to implement another avro model that has a
> > >> field
> > >> > of
> > >> > > > type array so I can use the regular avro serializer to implement
> > >> this.
> > >> > > > Should I create my own serdes instead or is this the right way?
> > >> > > >
> > >> > > > Thank you in advance
> > >> > > >
> > >> > > > --
> > >> > > > Alessandro Tagliapietra
> > >> > > >
> > >> > > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > >> > > > tagliapietra.alessan...@gmail.com> wrote:
> > >> > > >
> > >> > > > > Thank you Bruno and Matthias,
> > >> > > > >
> > >> > > > > I've modified the transformer to implement the
> > >> > ValueTransformerWithKey
> > >> > > > > interface and everything is working fine.
> > >> > > > > I've now to window the data and manually aggregate each window
> > >> data
> > >> > > since
> > >> > > > > I've to do some averages and sum of differences.
> > >> > > > > So far I've just having some issues with message types since
> I'm
> > >> > > changing
> > >> > > > > the data type when aggregating the window but I think it's an
> > easy
> > >> > > > problem.
> > >> > > > >
> > >> > > > > Thank you again
> > >> > > > > Best
> > >> > > > >
> > >> > > > > --
> > >> > > > > Alessandro Tagliapietra
> > >> > > > >
> > >> > > > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
> > >> br...@confluent.io>
> > >> > > > wrote:
> > >> > > > >
> > >> > > > >> Hi Alessandro,
> > >> > > > >>
> > >> > > > >> the `TransformSupplier` is internally wrapped with a
> > >> > > > `ProcessorSupplier`,
> > >> > > > >> so the statement
> > >> > > > >>
> > >> > > > >> `transform` is essentially equivalent to adding the
> Transformer
> > >> via
> > >> > > > >> Topology#addProcessor() to your processor topology
> > >> > > > >>
> > >> > > > >> is correct.
> > >> > > > >>
> > >> > > > >> If you do not change the key, you should definitely use one
> of
> > >> the
> > >> > > > >> overloads of `transformValues` to avoid internal data
> > >> > redistribution.
> > >> > > In
> > >> > > > >> your case the overload with `ValueTransformerWithKeySupplier`
> > as
> > >> > > > suggested
> > >> > > > >> by Matthias would fit.
> > >> > > > >>
> > >> > > > >> Best,
> > >> > > > >> Bruno
> > >> > > > >>
> > >> > > > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> > >> > > matth...@confluent.io
> > >> > > > >
> > >> > > > >> wrote:
> > >> > > > >>
> > >> > > > >> > There is also `ValueTransformerWithKey` that gives you
> > >> read-only
> > >> > > acess
> > >> > > > >> > to the key.
> > >> > > > >> >
> > >> > > > >> > -Matthias
> > >> > > > >> >
> > >> > > > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > >> > > > >> > > Hi Bruno,
> > >> > > > >> > >
> > >> > > > >> > > Thank you for the quick answer.
> > >> > > > >> > >
> > >> > > > >> > > I'm actually trying to do that since it seems there is
> > >> really no
> > >> > > way
> > >> > > > >> to
> > >> > > > >> > > have it use `Processor<K, V>`.
> > >> > > > >> > > I just wanted (if that would've made any sense) to use
> the
> > >> > > Processor
> > >> > > > >> in
> > >> > > > >> > > both DSL and non-DSL pipelines.
> > >> > > > >> > >
> > >> > > > >> > > Anyway, regarding `transformValues()` I don't think I can
> > >> use it
> > >> > > as
> > >> > > > I
> > >> > > > >> > need
> > >> > > > >> > > the message key since that is the discriminating value
> for
> > >> the
> > >> > > > filter
> > >> > > > >> (I
> > >> > > > >> > > want to exclude old values per sensor ID so per message
> > key)
> > >> > > > >> > >
> > >> > > > >> > > Right now I've this
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > >> > > > >> > > and
> > >> > > > >> > > i'm using it with `transform()` .
> > >> > > > >> > >
> > >> > > > >> > > One thing I've found confusing is this
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > >> > > > >> > >
> > >> > > > >> > > transform is essentially equivalent to adding the
> > Transformer
> > >> > via
> > >> > > > >> > >> Topology#addProcessor() to yourprocessor topology
> > >> > > > >> > >> <
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > >> > > > >> > >
> > >> > > > >> > >> .
> > >> > > > >> > >
> > >> > > > >> > >
> > >> > > > >> > > is it? Doesn't `transform` need a TransformSupplier while
> > >> > > > >> `addProcessor`
> > >> > > > >> > > uses a ProcessorSupplier?
> > >> > > > >> > >
> > >> > > > >> > > Thank you again for your help
> > >> > > > >> > >
> > >> > > > >> > > --
> > >> > > > >> > > Alessandro Tagliapietra
> > >> > > > >> > >
> > >> > > > >> > >
> > >> > > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> > >> > br...@confluent.io
> > >> > > >
> > >> > > > >> > wrote:
> > >> > > > >> > >
> > >> > > > >> > >> Hi Alessandro,
> > >> > > > >> > >>
> > >> > > > >> > >> Have you considered using `transform()` (actually in
> your
> > >> case
> > >> > > you
> > >> > > > >> > should
> > >> > > > >> > >> use `transformValues()`) instead of `.process()`?
> > >> `transform()`
> > >> > > and
> > >> > > > >> > >> `transformValues()` are stateful operations similar to
> > >> > `.process`
> > >> > > > but
> > >> > > > >> > they
> > >> > > > >> > >> return a `KStream`. On a `KStream` you can then apply a
> > >> > windowed
> > >> > > > >> > >> aggregation.
> > >> > > > >> > >>
> > >> > > > >> > >> Hope that helps.
> > >> > > > >> > >>
> > >> > > > >> > >> Best,
> > >> > > > >> > >> Bruno
> > >> > > > >> > >>
> > >> > > > >> > >>
> > >> > > > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra
> <
> > >> > > > >> > >> tagliapietra.alessan...@gmail.com> wrote:
> > >> > > > >> > >>
> > >> > > > >> > >>> Hi there,
> > >> > > > >> > >>>
> > >> > > > >> > >>> I'm just starting with Kafka and I'm trying to create a
> > >> stream
> > >> > > > >> > processor
> > >> > > > >> > >>> that in multiple stages:
> > >> > > > >> > >>>  - filters messages using a kv store so that only
> > messages
> > >> > with
> > >> > > > >> higher
> > >> > > > >> > >>> timestamp gets processed
> > >> > > > >> > >>>  - aggregates the message metrics by minute giving e.g.
> > the
> > >> > avg
> > >> > > of
> > >> > > > >> > those
> > >> > > > >> > >>> metrics in that minute
> > >> > > > >> > >>>
> > >> > > > >> > >>> The message is simple, the key is the sensor ID and the
> > >> value
> > >> > is
> > >> > > > >> e.g. {
> > >> > > > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > >> > > > >> > >>>
> > >> > > > >> > >>> I've started by creating a processor to use the kv
> store
> > >> and
> > >> > > > filter
> > >> > > > >> old
> > >> > > > >> > >>> messages:
> > >> > > > >> > >>>
> > >> > > > >> > >>>
> > >> > > > >> > >>>
> > >> > > > >> > >>
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > >> > > > >> > >>>
> > >> > > > >> > >>> Then I was trying to implement windowing, I saw very
> nice
> > >> > > > windowing
> > >> > > > >> > >>> examples for the DSL but none for the Processor API
> > (only a
> > >> > > small
> > >> > > > >> > >> reference
> > >> > > > >> > >>> to the windowed store), can someone point me in the
> right
> > >> > > > direction?
> > >> > > > >> > >>>
> > >> > > > >> > >>> Now, since I wasn't able to find any example I tried to
> > use
> > >> > the
> > >> > > > DSL
> > >> > > > >> but
> > >> > > > >> > >>> haven't found a way to use my processor with it, I saw
> > this
> > >> > > > >> > >>>
> > >> > > > >> > >>>
> > >> > > > >> > >>
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > >> > > > >> > >>> but
> > >> > > > >> > >>> it explains mostly transformers not processors. I also
> > saw
> > >> > after
> > >> > > > >> that
> > >> > > > >> > the
> > >> > > > >> > >>> example usage of the processor but `.process(...)`
> > returns
> > >> > void,
> > >> > > > so
> > >> > > > >> I
> > >> > > > >> > >>> cannot have a KStream from a processor?
> > >> > > > >> > >>>
> > >> > > > >> > >>> Thank you all in advance
> > >> > > > >> > >>>
> > >> > > > >> > >>> --
> > >> > > > >> > >>> Alessandro Tagliapietra
> > >> > > > >> > >>>
> > >> > > > >> > >>
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to