Hi Alessandro, Apologies for the late reply.
I tried the code from your repository under https://github.com/alex88/kafka-test/tree/master and I run into a `ClassCastException`. I think this is a bug that is described here https://issues.apache.org/jira/browse/KAFKA-8317 . Should I have tried one of the other branches? Best regards, Bruno On Fri, May 3, 2019 at 9:33 AM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Ok so I'm not sure if I did this correctly, > > I've upgraded both the server (by replacing the JARs in the confluent > docker image with those built from kafka source) and the client (by using > the built JARs as local file dependencies). > I've used this as source: https://github.com/apache/kafka/archive/2.2.zip > When the server runs it prints: > > INFO Kafka version: 2.2.1-SNAPSHOT > (org.apache.kafka.common.utils.AppInfoParser). > > and regarding the client I don't see any kafka jars in the "External > libraries" of the IntelliJ project tab so I think it's using the local JARs > (2.2.1-SNAPSHOT). > > The problem is that the window isn't keeping the old values and still emits > values with partially processed intervals. > > Just to summarize: > https://gist.github.com/alex88/43b72e23bda9e15657b008855e1904db > > - consumer emits one message per second with production = 1 > - windowing stream should emit one message each 10 seconds with the sum of > productions (so production = 10) > > If I restart the stream processor, it emits window functions with partial > data (production < 10) as you can see from the logs. > I've checked the JAR file and it seems to include changes from > https://github.com/apache/kafka/pull/6623 (it has the newly > added FixedOrderMap class) > > Even after removing the suppress() the error seems to persist (look at > consumer_nosuppress), here it seems it loses track of the contents of the > window: > > S1 with computed metric {"timestamp": 50000, "production": 10} > S1 with computed metric {"timestamp": 60000, "production": 1} > S1 with computed metric {"timestamp": 60000, "production": 2} > S1 with computed metric {"timestamp": 60000, "production": 3} > S1 with computed metric {"timestamp": 60000, "production": 4} > -- RESTART -- > S1 with computed metric {"timestamp": 60000, "production": 1} > S1 with computed metric {"timestamp": 60000, "production": 2} > S1 with computed metric {"timestamp": 60000, "production": 3} > S1 with computed metric {"timestamp": 60000, "production": 4} > S1 with computed metric {"timestamp": 60000, "production": 5} > S1 with computed metric {"timestamp": 60000, "production": 6} > S1 with computed metric {"timestamp": 70000, "production": 1} > > after restart during the 60 seconds window the sum restarts. > > Is it something wrong with my implementation? > > -- > Alessandro Tagliapietra > > On Thu, May 2, 2019 at 7:58 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > Hi Bruno, > > > > thank you for your help, glad to hear that those are only bugs and not a > > problem on my implementation, > > I'm currently using confluent docker images, I've checked their master > > branch which seems to use the SNAPSHOT version however those > > images/packages aren't publicly available. Are there any snapshot builds > > available? > > In the meantime I'm trying to create a custom docker image from kafka > > source. > > > > Thanks > > > > -- > > Alessandro Tagliapietra > > > > On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <br...@confluent.io> > wrote: > > > >> Hi Alessandro, > >> > >> It seems that the behaviour you described regarding the window > aggregation > >> is due to bugs. The good news is that the bugs have been already fixed. > >> > >> The relevant bug reports are > >> https://issues.apache.org/jira/browse/KAFKA-7895 > >> https://issues.apache.org/jira/browse/KAFKA-8204 > >> > >> The fixes for both bugs have been already merged to the 2.2 branch. > >> > >> Could you please build from the 2.2 branch and confirm that the fixes > >> solve > >> your problem? > >> > >> Best, > >> Bruno > >> > >> > >> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra < > >> tagliapietra.alessan...@gmail.com> wrote: > >> > >> > Thanks Matthias, one less thing to worry about in the future :) > >> > > >> > -- > >> > Alessandro Tagliapietra > >> > > >> > > >> > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax < > matth...@confluent.io > >> > > >> > wrote: > >> > > >> > > Just a side note. There is currently work in progress on > >> > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix > the > >> > > configuration problem for Serdes. > >> > > > >> > > -Matthias > >> > > > >> > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote: > >> > > > Hi Bruno, > >> > > > thanks a lot for checking the code, regarding the > SpecificAvroSerde > >> > I've > >> > > > found that using > >> > > > > >> > > > final Serde<InputList> valueSpecificAvroSerde = new > >> > > SpecificAvroSerde<>(); > >> > > > final Map<String, String> serdeConfig = > >> > > > Collections.singletonMap("schema.registry.url", " > >> http://localhost:8081 > >> > > "); > >> > > > valueSpecificAvroSerde.configure(serdeConfig, false); > >> > > > > >> > > > and then in aggregate() > >> > > > > >> > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde) > >> > > > > >> > > > fixed the issue. > >> > > > > >> > > > Thanks in advance for the windowing help, very appreciated. > >> > > > In the meantime I'll try to make some progress on the rest. > >> > > > > >> > > > Have a great weekend > >> > > > > >> > > > -- > >> > > > Alessandro Tagliapietra > >> > > > > >> > > > > >> > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io > > > >> > > wrote: > >> > > > > >> > > >> Hi Alessandro, > >> > > >> > >> > > >> I had a look at your code. Regarding your question whether you > use > >> the > >> > > >> SpecificAvroSerde correctly, take a look at the following > >> > documentation: > >> > > >> > >> > > >> > >> > > > >> > https://docs.confluent.io/current/streams/developer-guide/datatypes.html > >> > > >> > >> > > >> I haven't had the time yet to take a closer look at your problems > >> with > >> > > the > >> > > >> aggregation. I will have a look next week. > >> > > >> > >> > > >> Have a nice weekend, > >> > > >> Bruno > >> > > >> > >> > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra < > >> > > >> tagliapietra.alessan...@gmail.com> wrote: > >> > > >> > >> > > >>> So I've started with a new app with the archetype:generate as in > >> > > >>> https://kafka.apache.org/22/documentation/streams/tutorial > >> > > >>> > >> > > >>> I've pushed a sample repo here: > >> https://github.com/alex88/kafka-test > >> > > >>> The avro schemas are a Metric with 2 fields: timestamp and > >> production > >> > > >> and a > >> > > >>> MetricList with a list of records (Metric) to be able to > manually > >> do > >> > > the > >> > > >>> aggregation. > >> > > >>> Right now the aggregation is simple just for the purpose of the > >> > sample > >> > > >> repo > >> > > >>> and to easily see if we're getting wrong values. > >> > > >>> > >> > > >>> What I wanted to achieve is: > >> > > >>> - have a custom generator that generates 1 message per second > >> with > >> > > >>> production = 1 with 1 ore more separate message keys which in my > >> case > >> > > are > >> > > >>> the sensor IDs generating the data > >> > > >>> - a filter that removes out of order messages by having a state > >> that > >> > > >>> stores key (sensorID) -> last timestamp > >> > > >>> - a window operation that for this example just sums the values > >> in > >> > > each > >> > > >> 10 > >> > > >>> seconds windows > >> > > >>> > >> > > >>> To show where I'm having issues I've setup multiple branches for > >> the > >> > > >> repo: > >> > > >>> - *issue-01 < > https://github.com/alex88/kafka-test/tree/issue-01 > >> >* > >> > is > >> > > >> the > >> > > >>> one I had initially "Failed to flush state store > >> > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve > >> using > >> > > >>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store > >> > > >>> - *issue-02 < > https://github.com/alex88/kafka-test/tree/issue-02 > >> >* > >> > is > >> > > >> the > >> > > >>> one after I've tried to solve above problem with the > materializer > >> > > (maybe > >> > > >>> the SpecificAvroSerde is wrong?) > >> > > >>> - *issue-03 < > https://github.com/alex88/kafka-test/tree/issue-03 > >> >* > >> > > after > >> > > >>> fixing issue-02 (by using > groupByKey(Grouped.with(Serdes.String(), > >> > new > >> > > >>> SpecificAvroSerde<>()))) everything seems to be working, if you > >> let > >> > > both > >> > > >>> the producer and stream running, you'll see that the stream > >> receives > >> > 10 > >> > > >>> messages (with the timestamp incrementing 1 second for each > >> message) > >> > > like > >> > > >>> this: > >> > > >>> > >> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 163000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 164000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 165000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 166000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 167000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 168000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 169000, "production": 1} > >> > > >>> > >> > > >>> and at the 10 seconds interval something like: > >> > > >>> > >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10} > >> > > >>> S1 with computed metric {"timestamp": 170000, "production": 10} > >> > > >>> S1 with computed metric {"timestamp": 180000, "production": 10} > >> > > >>> > >> > > >>> and so on... > >> > > >>> Now there are two problems, after stopping and restarting the > >> stream > >> > > >>> processor (by sending SIGINT via IntelliJ since I start the > class > >> > main > >> > > >> with > >> > > >>> it) it happens: > >> > > >>> - sometimes the aggregated count is wrong, if I have it start > >> > > windowing > >> > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after > >> restart > >> > > it > >> > > >>> might just emit a value for the new 3 missing seconds (seconds > >> 18-20) > >> > > and > >> > > >>> the aggregated value is 3 not 10 > >> > > >>> - sometimes the window outputs twice, in the example where I > >> restart > >> > > the > >> > > >>> stream processor I might get as output > >> > > >>> > >> > > >>> S1 with filtered metric{"timestamp": 154000, "production": 1} > >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 5} > >> > > >>> S1 with filtered metric{"timestamp": 155000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 156000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 157000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 158000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 159000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1} > >> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1} > >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10} > >> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1} > >> > > >>> > >> > > >>> as you can see, window for timestamp 160000 is duplicated > >> > > >>> > >> > > >>> Is this because the window state isn't persisted across > restarts? > >> > > >>> My ultimate goal is to have the window part emit only once and > >> resume > >> > > >>> processing across restarts, while avoiding processing out of > order > >> > data > >> > > >>> (that's the purpose of the TimestampIncrementalFilter) > >> > > >>> > >> > > >>> Thank you in advance > >> > > >>> > >> > > >>> -- > >> > > >>> Alessandro Tagliapietra > >> > > >>> > >> > > >>> > >> > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra < > >> > > >>> tagliapietra.alessan...@gmail.com> wrote: > >> > > >>> > >> > > >>>> Hi Bruno, > >> > > >>>> > >> > > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2. > >> > > >>>> Anyway I'll try to make a small reproduction repo with all the > >> > > >> different > >> > > >>>> cases soon. > >> > > >>>> > >> > > >>>> Thank you > >> > > >>>> > >> > > >>>> -- > >> > > >>>> Alessandro Tagliapietra > >> > > >>>> > >> > > >>>> > >> > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna < > >> br...@confluent.io> > >> > > >>> wrote: > >> > > >>>> > >> > > >>>>> Hi Alessandro, > >> > > >>>>> > >> > > >>>>> What version of Kafka do you use? > >> > > >>>>> > >> > > >>>>> Could you please give a more detailed example for the issues > >> with > >> > the > >> > > >>> two > >> > > >>>>> keys you see? > >> > > >>>>> > >> > > >>>>> Could the following bug be related to the duplicates you see? > >> > > >>>>> > >> > > >>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22 > >> > > >>>>> > >> > > >>>>> How do you restart the processor? > >> > > >>>>> > >> > > >>>>> Best, > >> > > >>>>> Bruno > >> > > >>>>> > >> > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra < > >> > > >>>>> tagliapietra.alessan...@gmail.com> wrote: > >> > > >>>>> > >> > > >>>>>> Thank you Bruno, > >> > > >>>>>> > >> > > >>>>>> I'll look into those, however average is just a simple thing > >> I'm > >> > > >>> trying > >> > > >>>>>> right now just to get an initial windowing flow working. > >> > > >>>>>> In the future I'll probably still need the actual values for > >> other > >> > > >>>>>> calculations. We won't have more than 60 elements per window > >> for > >> > > >> sure. > >> > > >>>>>> > >> > > >>>>>> So far to not manually serialize/deserialize the array list > >> I've > >> > > >>>>> created an > >> > > >>>>>> Avro model with an array field containing the values. > >> > > >>>>>> I had issues with suppress as explained here > >> > > >>>>>> > >> > > >>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198 > >> > > >>>>>> > >> > > >>>>>> but I got that working. > >> > > >>>>>> So far everything seems to be working, except a couple > things: > >> > > >>>>>> - if I generate data with 1 key, I correctly get a value > each > >> 10 > >> > > >>>>> seconds, > >> > > >>>>>> if I later start generating data with another key (while key > 1 > >> is > >> > > >>> still > >> > > >>>>>> generating) the windowing emits a value only after the > >> timestamp > >> > of > >> > > >>> key > >> > > >>>>> 2 > >> > > >>>>>> reaches the last generated window > >> > > >>>>>> - while generating data, if I restart the processor as soon > >> as it > >> > > >>>>> starts > >> > > >>>>>> it sometimes generates 2 aggregates for the same window even > if > >> > I'm > >> > > >>>>> using > >> > > >>>>>> the suppress > >> > > >>>>>> > >> > > >>>>>> Anyway, I'll look into your link and try to find out the > cause > >> of > >> > > >>> these > >> > > >>>>>> issues, probably starting from scratch with a simpler example > >> > > >>>>>> > >> > > >>>>>> Thank you for your help! > >> > > >>>>>> > >> > > >>>>>> -- > >> > > >>>>>> Alessandro Tagliapietra > >> > > >>>>>> > >> > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna < > >> > br...@confluent.io> > >> > > >>>>> wrote: > >> > > >>>>>> > >> > > >>>>>>> Hi Alessandro, > >> > > >>>>>>> > >> > > >>>>>>> Have a look at this Kafka Usage Pattern for computing > averages > >> > > >>> without > >> > > >>>>>>> using an ArrayList. > >> > > >>>>>>> > >> > > >>>>>>> > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average > >> > > >>>>>>> ? > >> > > >>>>>>> > >> > > >>>>>>> The advantages of this pattern over the ArrayList approach > is > >> the > >> > > >>>>> reduced > >> > > >>>>>>> space needed to compute the aggregate. Note that you will > >> still > >> > > >> need > >> > > >>>>> to > >> > > >>>>>>> implement a SerDe. However, the SerDe should be a bit easier > >> to > >> > > >>>>> implement > >> > > >>>>>>> than a SerDe for an ArrayList. > >> > > >>>>>>> > >> > > >>>>>>> Hope that helps. > >> > > >>>>>>> > >> > > >>>>>>> Best, > >> > > >>>>>>> Bruno > >> > > >>>>>>> > >> > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra < > >> > > >>>>>>> tagliapietra.alessan...@gmail.com> wrote: > >> > > >>>>>>> > >> > > >>>>>>>> Sorry but it seemed harder than I thought, > >> > > >>>>>>>> > >> > > >>>>>>>> to have the custom aggregation working I need to get an > >> > > >> ArrayList > >> > > >>> of > >> > > >>>>>> all > >> > > >>>>>>>> the values in the window, so far my aggregate DSL method > >> creates > >> > > >>> an > >> > > >>>>>>>> ArrayList on the initializer and adds each value to the > list > >> in > >> > > >>> the > >> > > >>>>>>>> aggregator. > >> > > >>>>>>>> Then I think I'll have to provide a serder to change the > >> output > >> > > >>>>> type of > >> > > >>>>>>>> that method. > >> > > >>>>>>>> I was looking at > >> > > >>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api > >> > > >>>>>>>> but > >> > > >>>>>>>> that seems more towards a list of longs and already uses > >> > > >>> longSerde. > >> > > >>>>>>>> I'm currently trying to implement another avro model that > >> has a > >> > > >>>>> field > >> > > >>>>>> of > >> > > >>>>>>>> type array so I can use the regular avro serializer to > >> implement > >> > > >>>>> this. > >> > > >>>>>>>> Should I create my own serdes instead or is this the right > >> way? > >> > > >>>>>>>> > >> > > >>>>>>>> Thank you in advance > >> > > >>>>>>>> > >> > > >>>>>>>> -- > >> > > >>>>>>>> Alessandro Tagliapietra > >> > > >>>>>>>> > >> > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra < > >> > > >>>>>>>> tagliapietra.alessan...@gmail.com> wrote: > >> > > >>>>>>>> > >> > > >>>>>>>>> Thank you Bruno and Matthias, > >> > > >>>>>>>>> > >> > > >>>>>>>>> I've modified the transformer to implement the > >> > > >>>>>> ValueTransformerWithKey > >> > > >>>>>>>>> interface and everything is working fine. > >> > > >>>>>>>>> I've now to window the data and manually aggregate each > >> window > >> > > >>>>> data > >> > > >>>>>>> since > >> > > >>>>>>>>> I've to do some averages and sum of differences. > >> > > >>>>>>>>> So far I've just having some issues with message types > since > >> > > >> I'm > >> > > >>>>>>> changing > >> > > >>>>>>>>> the data type when aggregating the window but I think it's > >> an > >> > > >>> easy > >> > > >>>>>>>> problem. > >> > > >>>>>>>>> > >> > > >>>>>>>>> Thank you again > >> > > >>>>>>>>> Best > >> > > >>>>>>>>> > >> > > >>>>>>>>> -- > >> > > >>>>>>>>> Alessandro Tagliapietra > >> > > >>>>>>>>> > >> > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna < > >> > > >>>>> br...@confluent.io> > >> > > >>>>>>>> wrote: > >> > > >>>>>>>>> > >> > > >>>>>>>>>> Hi Alessandro, > >> > > >>>>>>>>>> > >> > > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a > >> > > >>>>>>>> `ProcessorSupplier`, > >> > > >>>>>>>>>> so the statement > >> > > >>>>>>>>>> > >> > > >>>>>>>>>> `transform` is essentially equivalent to adding the > >> > > >> Transformer > >> > > >>>>> via > >> > > >>>>>>>>>> Topology#addProcessor() to your processor topology > >> > > >>>>>>>>>> > >> > > >>>>>>>>>> is correct. > >> > > >>>>>>>>>> > >> > > >>>>>>>>>> If you do not change the key, you should definitely use > one > >> > > >> of > >> > > >>>>> the > >> > > >>>>>>>>>> overloads of `transformValues` to avoid internal data > >> > > >>>>>> redistribution. > >> > > >>>>>>> In > >> > > >>>>>>>>>> your case the overload with > >> `ValueTransformerWithKeySupplier` > >> > > >>> as > >> > > >>>>>>>> suggested > >> > > >>>>>>>>>> by Matthias would fit. > >> > > >>>>>>>>>> > >> > > >>>>>>>>>> Best, > >> > > >>>>>>>>>> Bruno > >> > > >>>>>>>>>> > >> > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax < > >> > > >>>>>>> matth...@confluent.io > >> > > >>>>>>>>> > >> > > >>>>>>>>>> wrote: > >> > > >>>>>>>>>> > >> > > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you > >> > > >>>>> read-only > >> > > >>>>>>> acess > >> > > >>>>>>>>>>> to the key. > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>>> -Matthias > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > >> > > >>>>>>>>>>>> Hi Bruno, > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> Thank you for the quick answer. > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> I'm actually trying to do that since it seems there is > >> > > >>>>> really no > >> > > >>>>>>> way > >> > > >>>>>>>>>> to > >> > > >>>>>>>>>>>> have it use `Processor<K, V>`. > >> > > >>>>>>>>>>>> I just wanted (if that would've made any sense) to use > >> > > >> the > >> > > >>>>>>> Processor > >> > > >>>>>>>>>> in > >> > > >>>>>>>>>>>> both DSL and non-DSL pipelines. > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I > can > >> > > >>>>> use it > >> > > >>>>>>> as > >> > > >>>>>>>> I > >> > > >>>>>>>>>>> need > >> > > >>>>>>>>>>>> the message key since that is the discriminating value > >> > > >> for > >> > > >>>>> the > >> > > >>>>>>>> filter > >> > > >>>>>>>>>> (I > >> > > >>>>>>>>>>>> want to exclude old values per sensor ID so per message > >> > > >>> key) > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> Right now I've this > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java > >> > > >>>>>>>>>>>> and > >> > > >>>>>>>>>>>> i'm using it with `transform()` . > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> One thing I've found confusing is this > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> transform is essentially equivalent to adding the > >> > > >>> Transformer > >> > > >>>>>> via > >> > > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology > >> > > >>>>>>>>>>>>> < > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>>> . > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier > while > >> > > >>>>>>>>>> `addProcessor` > >> > > >>>>>>>>>>>> uses a ProcessorSupplier? > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> Thank you again for your help > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> -- > >> > > >>>>>>>>>>>> Alessandro Tagliapietra > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna < > >> > > >>>>>> br...@confluent.io > >> > > >>>>>>>> > >> > > >>>>>>>>>>> wrote: > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>>>> Hi Alessandro, > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> Have you considered using `transform()` (actually in > >> > > >> your > >> > > >>>>> case > >> > > >>>>>>> you > >> > > >>>>>>>>>>> should > >> > > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`? > >> > > >>>>> `transform()` > >> > > >>>>>>> and > >> > > >>>>>>>>>>>>> `transformValues()` are stateful operations similar to > >> > > >>>>>> `.process` > >> > > >>>>>>>> but > >> > > >>>>>>>>>>> they > >> > > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply > a > >> > > >>>>>> windowed > >> > > >>>>>>>>>>>>> aggregation. > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> Hope that helps. > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> Best, > >> > > >>>>>>>>>>>>> Bruno > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro > Tagliapietra > >> > > >> < > >> > > >>>>>>>>>>>>> tagliapietra.alessan...@gmail.com> wrote: > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> Hi there, > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to > create a > >> > > >>>>> stream > >> > > >>>>>>>>>>> processor > >> > > >>>>>>>>>>>>>> that in multiple stages: > >> > > >>>>>>>>>>>>>> - filters messages using a kv store so that only > >> > > >>> messages > >> > > >>>>>> with > >> > > >>>>>>>>>> higher > >> > > >>>>>>>>>>>>>> timestamp gets processed > >> > > >>>>>>>>>>>>>> - aggregates the message metrics by minute giving > e.g. > >> > > >>> the > >> > > >>>>>> avg > >> > > >>>>>>> of > >> > > >>>>>>>>>>> those > >> > > >>>>>>>>>>>>>> metrics in that minute > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and > the > >> > > >>>>> value > >> > > >>>>>> is > >> > > >>>>>>>>>> e.g. { > >> > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }. > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> I've started by creating a processor to use the kv > >> > > >> store > >> > > >>>>> and > >> > > >>>>>>>> filter > >> > > >>>>>>>>>> old > >> > > >>>>>>>>>>>>>> messages: > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very > >> > > >> nice > >> > > >>>>>>>> windowing > >> > > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API > >> > > >>> (only a > >> > > >>>>>>> small > >> > > >>>>>>>>>>>>> reference > >> > > >>>>>>>>>>>>>> to the windowed store), can someone point me in the > >> > > >> right > >> > > >>>>>>>> direction? > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried > to > >> > > >>> use > >> > > >>>>>> the > >> > > >>>>>>>> DSL > >> > > >>>>>>>>>> but > >> > > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I > saw > >> > > >>> this > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>> > >> > > >> > >> > > > >> > > >> > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration > >> > > >>>>>>>>>>>>>> but > >> > > >>>>>>>>>>>>>> it explains mostly transformers not processors. I > also > >> > > >>> saw > >> > > >>>>>> after > >> > > >>>>>>>>>> that > >> > > >>>>>>>>>>> the > >> > > >>>>>>>>>>>>>> example usage of the processor but `.process(...)` > >> > > >>> returns > >> > > >>>>>> void, > >> > > >>>>>>>> so > >> > > >>>>>>>>>> I > >> > > >>>>>>>>>>>>>> cannot have a KStream from a processor? > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> Thank you all in advance > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>>> -- > >> > > >>>>>>>>>>>>>> Alessandro Tagliapietra > >> > > >>>>>>>>>>>>>> > >> > > >>>>>>>>>>>>> > >> > > >>>>>>>>>>>> > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>>> > >> > > >>>>>>>>>> > >> > > >>>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>>> > >> > > >>> > >> > > >> > >> > > > > >> > > > >> > > > >> > > >> > > >