Hi Bruno,

thank you for your help, glad to hear that those are only bugs and not a
problem on my implementation,
I'm currently using confluent docker images, I've checked their master
branch which seems to use the SNAPSHOT version however those
images/packages aren't publicly available. Are there any snapshot builds
available?
In the meantime I'm trying to create a custom docker image from kafka
source.

Thanks

--
Alessandro Tagliapietra

On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> It seems that the behaviour you described regarding the window aggregation
> is due to bugs. The good news is that the bugs have been already fixed.
>
> The relevant bug reports are
> https://issues.apache.org/jira/browse/KAFKA-7895
> https://issues.apache.org/jira/browse/KAFKA-8204
>
> The fixes for both bugs have been already merged to the 2.2 branch.
>
> Could you please build from the 2.2 branch and confirm that the fixes solve
> your problem?
>
> Best,
> Bruno
>
>
> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Thanks Matthias, one less thing to worry about in the future :)
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Just a side note. There is currently work in progress on
> > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
> > > configuration problem for Serdes.
> > >
> > > -Matthias
> > >
> > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> > > > Hi Bruno,
> > > > thanks a lot for checking the code, regarding the SpecificAvroSerde
> > I've
> > > > found that using
> > > >
> > > > final Serde<InputList> valueSpecificAvroSerde = new
> > > SpecificAvroSerde<>();
> > > > final Map<String, String> serdeConfig =
> > > > Collections.singletonMap("schema.registry.url", "
> http://localhost:8081
> > > ");
> > > > valueSpecificAvroSerde.configure(serdeConfig, false);
> > > >
> > > > and then in aggregate()
> > > >
> > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
> > > >
> > > > fixed the issue.
> > > >
> > > > Thanks in advance for the windowing help, very appreciated.
> > > > In the meantime I'll try to make some progress on the rest.
> > > >
> > > > Have a great weekend
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > >
> > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io>
> > > wrote:
> > > >
> > > >> Hi Alessandro,
> > > >>
> > > >> I had a look at your code. Regarding your question whether you use
> the
> > > >> SpecificAvroSerde correctly, take a look at the following
> > documentation:
> > > >>
> > > >>
> > >
> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
> > > >>
> > > >> I haven't had the time yet to take a closer look at your problems
> with
> > > the
> > > >> aggregation. I will have a look next week.
> > > >>
> > > >> Have a nice weekend,
> > > >> Bruno
> > > >>
> > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
> > > >> tagliapietra.alessan...@gmail.com> wrote:
> > > >>
> > > >>> So I've started with a new app with the archetype:generate as in
> > > >>> https://kafka.apache.org/22/documentation/streams/tutorial
> > > >>>
> > > >>> I've pushed a sample repo here:
> https://github.com/alex88/kafka-test
> > > >>> The avro schemas are a Metric with 2 fields: timestamp and
> production
> > > >> and a
> > > >>> MetricList with a list of records (Metric) to be able to manually
> do
> > > the
> > > >>> aggregation.
> > > >>> Right now the aggregation is simple just for the purpose of the
> > sample
> > > >> repo
> > > >>> and to easily see if we're getting wrong values.
> > > >>>
> > > >>> What I wanted to achieve is:
> > > >>>  - have a custom generator that generates 1 message per second with
> > > >>> production = 1 with 1 ore more separate message keys which in my
> case
> > > are
> > > >>> the sensor IDs generating the data
> > > >>>  - a filter that removes out of order messages by having a state
> that
> > > >>> stores key (sensorID) -> last timestamp
> > > >>>  - a window operation that for this example just sums the values in
> > > each
> > > >> 10
> > > >>> seconds windows
> > > >>>
> > > >>> To show where I'm having issues I've setup multiple branches for
> the
> > > >> repo:
> > > >>>  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>*
> > is
> > > >> the
> > > >>> one I had initially "Failed to flush state store
> > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve
> using
> > > >>>
> > > >>>
> > > >>
> > >
> >
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
> > > >>>  - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>*
> > is
> > > >> the
> > > >>> one after I've tried to solve above problem with the materializer
> > > (maybe
> > > >>> the SpecificAvroSerde is wrong?)
> > > >>>  - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>*
> > > after
> > > >>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(),
> > new
> > > >>> SpecificAvroSerde<>()))) everything seems to be working, if you let
> > > both
> > > >>> the producer and stream running, you'll see that the stream
> receives
> > 10
> > > >>> messages (with the timestamp incrementing 1 second for each
> message)
> > > like
> > > >>> this:
> > > >>>
> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 163000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 164000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 165000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 166000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 167000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 168000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 169000, "production": 1}
> > > >>>
> > > >>> and at the 10 seconds interval something like:
> > > >>>
> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> > > >>> S1 with computed metric {"timestamp": 170000, "production": 10}
> > > >>> S1 with computed metric {"timestamp": 180000, "production": 10}
> > > >>>
> > > >>> and so on...
> > > >>> Now there are two problems, after stopping and restarting the
> stream
> > > >>> processor (by sending SIGINT via IntelliJ since I start the class
> > main
> > > >> with
> > > >>> it) it happens:
> > > >>>  - sometimes the aggregated count is wrong, if I have it start
> > > windowing
> > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after
> restart
> > > it
> > > >>> might just emit a value for the new 3 missing seconds (seconds
> 18-20)
> > > and
> > > >>> the aggregated value is 3 not 10
> > > >>>  - sometimes the window outputs twice, in the example where I
> restart
> > > the
> > > >>> stream processor I might get as output
> > > >>>
> > > >>> S1 with filtered metric{"timestamp": 154000, "production": 1}
> > > >>> S1 with computed metric {"timestamp": 160000, "production": 5}
> > > >>> S1 with filtered metric{"timestamp": 155000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 156000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 157000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 158000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 159000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> > > >>>
> > > >>> as you can see, window for timestamp 160000 is duplicated
> > > >>>
> > > >>> Is this because the window state isn't persisted across restarts?
> > > >>> My ultimate goal is to have the window part emit only once and
> resume
> > > >>> processing across restarts, while avoiding processing out of order
> > data
> > > >>> (that's the purpose of the TimestampIncrementalFilter)
> > > >>>
> > > >>> Thank you in advance
> > > >>>
> > > >>> --
> > > >>> Alessandro Tagliapietra
> > > >>>
> > > >>>
> > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> > > >>> tagliapietra.alessan...@gmail.com> wrote:
> > > >>>
> > > >>>> Hi Bruno,
> > > >>>>
> > > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2.
> > > >>>> Anyway I'll try to make a small reproduction repo with all the
> > > >> different
> > > >>>> cases soon.
> > > >>>>
> > > >>>> Thank you
> > > >>>>
> > > >>>> --
> > > >>>> Alessandro Tagliapietra
> > > >>>>
> > > >>>>
> > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io
> >
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi Alessandro,
> > > >>>>>
> > > >>>>> What version of Kafka do you use?
> > > >>>>>
> > > >>>>> Could you please give a more detailed example for the issues with
> > the
> > > >>> two
> > > >>>>> keys you see?
> > > >>>>>
> > > >>>>> Could the following bug be related to the duplicates you see?
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> > > >>>>>
> > > >>>>> How do you restart the processor?
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Bruno
> > > >>>>>
> > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> > > >>>>> tagliapietra.alessan...@gmail.com> wrote:
> > > >>>>>
> > > >>>>>> Thank you Bruno,
> > > >>>>>>
> > > >>>>>> I'll look into those, however average is just a simple thing I'm
> > > >>> trying
> > > >>>>>> right now just to get an initial windowing flow working.
> > > >>>>>> In the future I'll probably still need the actual values for
> other
> > > >>>>>> calculations. We won't have more than 60 elements per window for
> > > >> sure.
> > > >>>>>>
> > > >>>>>> So far to not manually serialize/deserialize the array list I've
> > > >>>>> created an
> > > >>>>>> Avro model with an array field containing the values.
> > > >>>>>> I had issues with suppress as explained here
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> > > >>>>>>
> > > >>>>>> but I got that working.
> > > >>>>>> So far everything seems to be working, except a couple things:
> > > >>>>>>  - if I generate data with 1 key, I correctly get a value each
> 10
> > > >>>>> seconds,
> > > >>>>>> if I later start generating data with another key (while key 1
> is
> > > >>> still
> > > >>>>>> generating) the windowing emits a value only after the timestamp
> > of
> > > >>> key
> > > >>>>> 2
> > > >>>>>> reaches the last generated window
> > > >>>>>>  - while generating data, if I restart the processor as soon as
> it
> > > >>>>> starts
> > > >>>>>> it sometimes generates 2 aggregates for the same window even if
> > I'm
> > > >>>>> using
> > > >>>>>> the suppress
> > > >>>>>>
> > > >>>>>> Anyway, I'll look into your link and try to find out the cause
> of
> > > >>> these
> > > >>>>>> issues, probably starting from scratch with a simpler example
> > > >>>>>>
> > > >>>>>> Thank you for your help!
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> Alessandro Tagliapietra
> > > >>>>>>
> > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <
> > br...@confluent.io>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Alessandro,
> > > >>>>>>>
> > > >>>>>>> Have a look at this Kafka Usage Pattern for computing averages
> > > >>> without
> > > >>>>>>> using an ArrayList.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> > > >>>>>>> ?
> > > >>>>>>>
> > > >>>>>>> The advantages of this pattern over the ArrayList approach is
> the
> > > >>>>> reduced
> > > >>>>>>> space needed to compute the aggregate. Note that you will still
> > > >> need
> > > >>>>> to
> > > >>>>>>> implement a SerDe. However, the SerDe should be a bit easier to
> > > >>>>> implement
> > > >>>>>>> than a SerDe for an ArrayList.
> > > >>>>>>>
> > > >>>>>>> Hope that helps.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Bruno
> > > >>>>>>>
> > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> > > >>>>>>> tagliapietra.alessan...@gmail.com> wrote:
> > > >>>>>>>
> > > >>>>>>>> Sorry but it seemed harder than I thought,
> > > >>>>>>>>
> > > >>>>>>>> to have the custom aggregation working I need to get an
> > > >> ArrayList
> > > >>> of
> > > >>>>>> all
> > > >>>>>>>> the values in the window, so far my aggregate DSL method
> creates
> > > >>> an
> > > >>>>>>>> ArrayList on the initializer and adds each value to the list
> in
> > > >>> the
> > > >>>>>>>> aggregator.
> > > >>>>>>>> Then I think I'll have to provide a serder to change the
> output
> > > >>>>> type of
> > > >>>>>>>> that method.
> > > >>>>>>>> I was looking at
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > > >>>>>>>> but
> > > >>>>>>>> that seems more towards a list of longs and already uses
> > > >>> longSerde.
> > > >>>>>>>> I'm currently trying to implement another avro model that has
> a
> > > >>>>> field
> > > >>>>>> of
> > > >>>>>>>> type array so I can use the regular avro serializer to
> implement
> > > >>>>> this.
> > > >>>>>>>> Should I create my own serdes instead or is this the right
> way?
> > > >>>>>>>>
> > > >>>>>>>> Thank you in advance
> > > >>>>>>>>
> > > >>>>>>>> --
> > > >>>>>>>> Alessandro Tagliapietra
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > > >>>>>>>> tagliapietra.alessan...@gmail.com> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Thank you Bruno and Matthias,
> > > >>>>>>>>>
> > > >>>>>>>>> I've modified the transformer to implement the
> > > >>>>>> ValueTransformerWithKey
> > > >>>>>>>>> interface and everything is working fine.
> > > >>>>>>>>> I've now to window the data and manually aggregate each
> window
> > > >>>>> data
> > > >>>>>>> since
> > > >>>>>>>>> I've to do some averages and sum of differences.
> > > >>>>>>>>> So far I've just having some issues with message types since
> > > >> I'm
> > > >>>>>>> changing
> > > >>>>>>>>> the data type when aggregating the window but I think it's an
> > > >>> easy
> > > >>>>>>>> problem.
> > > >>>>>>>>>
> > > >>>>>>>>> Thank you again
> > > >>>>>>>>> Best
> > > >>>>>>>>>
> > > >>>>>>>>> --
> > > >>>>>>>>> Alessandro Tagliapietra
> > > >>>>>>>>>
> > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
> > > >>>>> br...@confluent.io>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi Alessandro,
> > > >>>>>>>>>>
> > > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a
> > > >>>>>>>> `ProcessorSupplier`,
> > > >>>>>>>>>> so the statement
> > > >>>>>>>>>>
> > > >>>>>>>>>> `transform` is essentially equivalent to adding the
> > > >> Transformer
> > > >>>>> via
> > > >>>>>>>>>> Topology#addProcessor() to your processor topology
> > > >>>>>>>>>>
> > > >>>>>>>>>> is correct.
> > > >>>>>>>>>>
> > > >>>>>>>>>> If you do not change the key, you should definitely use one
> > > >> of
> > > >>>>> the
> > > >>>>>>>>>> overloads of `transformValues` to avoid internal data
> > > >>>>>> redistribution.
> > > >>>>>>> In
> > > >>>>>>>>>> your case the overload with
> `ValueTransformerWithKeySupplier`
> > > >>> as
> > > >>>>>>>> suggested
> > > >>>>>>>>>> by Matthias would fit.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Bruno
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> > > >>>>>>> matth...@confluent.io
> > > >>>>>>>>>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you
> > > >>>>> read-only
> > > >>>>>>> acess
> > > >>>>>>>>>>> to the key.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > > >>>>>>>>>>>> Hi Bruno,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thank you for the quick answer.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I'm actually trying to do that since it seems there is
> > > >>>>> really no
> > > >>>>>>> way
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>> have it use `Processor<K, V>`.
> > > >>>>>>>>>>>> I just wanted (if that would've made any sense) to use
> > > >> the
> > > >>>>>>> Processor
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>> both DSL and non-DSL pipelines.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I can
> > > >>>>> use it
> > > >>>>>>> as
> > > >>>>>>>> I
> > > >>>>>>>>>>> need
> > > >>>>>>>>>>>> the message key since that is the discriminating value
> > > >> for
> > > >>>>> the
> > > >>>>>>>> filter
> > > >>>>>>>>>> (I
> > > >>>>>>>>>>>> want to exclude old values per sensor ID so per message
> > > >>> key)
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Right now I've this
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > > >>>>>>>>>>>> and
> > > >>>>>>>>>>>> i'm using it with `transform()` .
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> One thing I've found confusing is this
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> transform is essentially equivalent to adding the
> > > >>> Transformer
> > > >>>>>> via
> > > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology
> > > >>>>>>>>>>>>> <
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> .
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier while
> > > >>>>>>>>>> `addProcessor`
> > > >>>>>>>>>>>> uses a ProcessorSupplier?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thank you again for your help
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> --
> > > >>>>>>>>>>>> Alessandro Tagliapietra
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> > > >>>>>> br...@confluent.io
> > > >>>>>>>>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Alessandro,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Have you considered using `transform()` (actually in
> > > >> your
> > > >>>>> case
> > > >>>>>>> you
> > > >>>>>>>>>>> should
> > > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`?
> > > >>>>> `transform()`
> > > >>>>>>> and
> > > >>>>>>>>>>>>> `transformValues()` are stateful operations similar to
> > > >>>>>> `.process`
> > > >>>>>>>> but
> > > >>>>>>>>>>> they
> > > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply a
> > > >>>>>> windowed
> > > >>>>>>>>>>>>> aggregation.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hope that helps.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>> Bruno
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra
> > > >> <
> > > >>>>>>>>>>>>> tagliapietra.alessan...@gmail.com> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi there,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to create a
> > > >>>>> stream
> > > >>>>>>>>>>> processor
> > > >>>>>>>>>>>>>> that in multiple stages:
> > > >>>>>>>>>>>>>>  - filters messages using a kv store so that only
> > > >>> messages
> > > >>>>>> with
> > > >>>>>>>>>> higher
> > > >>>>>>>>>>>>>> timestamp gets processed
> > > >>>>>>>>>>>>>>  - aggregates the message metrics by minute giving e.g.
> > > >>> the
> > > >>>>>> avg
> > > >>>>>>> of
> > > >>>>>>>>>>> those
> > > >>>>>>>>>>>>>> metrics in that minute
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and the
> > > >>>>> value
> > > >>>>>> is
> > > >>>>>>>>>> e.g. {
> > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I've started by creating a processor to use the kv
> > > >> store
> > > >>>>> and
> > > >>>>>>>> filter
> > > >>>>>>>>>> old
> > > >>>>>>>>>>>>>> messages:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very
> > > >> nice
> > > >>>>>>>> windowing
> > > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API
> > > >>> (only a
> > > >>>>>>> small
> > > >>>>>>>>>>>>> reference
> > > >>>>>>>>>>>>>> to the windowed store), can someone point me in the
> > > >> right
> > > >>>>>>>> direction?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried to
> > > >>> use
> > > >>>>>> the
> > > >>>>>>>> DSL
> > > >>>>>>>>>> but
> > > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I saw
> > > >>> this
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > > >>>>>>>>>>>>>> but
> > > >>>>>>>>>>>>>> it explains mostly transformers not processors. I also
> > > >>> saw
> > > >>>>>> after
> > > >>>>>>>>>> that
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>> example usage of the processor but `.process(...)`
> > > >>> returns
> > > >>>>>> void,
> > > >>>>>>>> so
> > > >>>>>>>>>> I
> > > >>>>>>>>>>>>>> cannot have a KStream from a processor?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thank you all in advance
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>> Alessandro Tagliapietra
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to