Hi Bruno, thank you for your help, glad to hear that those are only bugs and not a problem on my implementation, I'm currently using confluent docker images, I've checked their master branch which seems to use the SNAPSHOT version however those images/packages aren't publicly available. Are there any snapshot builds available? In the meantime I'm trying to create a custom docker image from kafka source.
Thanks -- Alessandro Tagliapietra On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <br...@confluent.io> wrote: > Hi Alessandro, > > It seems that the behaviour you described regarding the window aggregation > is due to bugs. The good news is that the bugs have been already fixed. > > The relevant bug reports are > https://issues.apache.org/jira/browse/KAFKA-7895 > https://issues.apache.org/jira/browse/KAFKA-8204 > > The fixes for both bugs have been already merged to the 2.2 branch. > > Could you please build from the 2.2 branch and confirm that the fixes solve > your problem? > > Best, > Bruno > > > On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > Thanks Matthias, one less thing to worry about in the future :) > > > > -- > > Alessandro Tagliapietra > > > > > > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Just a side note. There is currently work in progress on > > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the > > > configuration problem for Serdes. > > > > > > -Matthias > > > > > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote: > > > > Hi Bruno, > > > > thanks a lot for checking the code, regarding the SpecificAvroSerde > > I've > > > > found that using > > > > > > > > final Serde<InputList> valueSpecificAvroSerde = new > > > SpecificAvroSerde<>(); > > > > final Map<String, String> serdeConfig = > > > > Collections.singletonMap("schema.registry.url", " > http://localhost:8081 > > > "); > > > > valueSpecificAvroSerde.configure(serdeConfig, false); > > > > > > > > and then in aggregate() > > > > > > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde) > > > > > > > > fixed the issue. > > > > > > > > Thanks in advance for the windowing help, very appreciated. > > > > In the meantime I'll try to make some progress on the rest. > > > > > > > > Have a great weekend > > > > > > > > -- > > > > Alessandro Tagliapietra > > > > > > > > > > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io> > > > wrote: > > > > > > > >> Hi Alessandro, > > > >> > > > >> I had a look at your code. Regarding your question whether you use > the > > > >> SpecificAvroSerde correctly, take a look at the following > > documentation: > > > >> > > > >> > > > > https://docs.confluent.io/current/streams/developer-guide/datatypes.html > > > >> > > > >> I haven't had the time yet to take a closer look at your problems > with > > > the > > > >> aggregation. I will have a look next week. > > > >> > > > >> Have a nice weekend, > > > >> Bruno > > > >> > > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra < > > > >> tagliapietra.alessan...@gmail.com> wrote: > > > >> > > > >>> So I've started with a new app with the archetype:generate as in > > > >>> https://kafka.apache.org/22/documentation/streams/tutorial > > > >>> > > > >>> I've pushed a sample repo here: > https://github.com/alex88/kafka-test > > > >>> The avro schemas are a Metric with 2 fields: timestamp and > production > > > >> and a > > > >>> MetricList with a list of records (Metric) to be able to manually > do > > > the > > > >>> aggregation. > > > >>> Right now the aggregation is simple just for the purpose of the > > sample > > > >> repo > > > >>> and to easily see if we're getting wrong values. > > > >>> > > > >>> What I wanted to achieve is: > > > >>> - have a custom generator that generates 1 message per second with > > > >>> production = 1 with 1 ore more separate message keys which in my > case > > > are > > > >>> the sensor IDs generating the data > > > >>> - a filter that removes out of order messages by having a state > that > > > >>> stores key (sensorID) -> last timestamp > > > >>> - a window operation that for this example just sums the values in > > > each > > > >> 10 > > > >>> seconds windows > > > >>> > > > >>> To show where I'm having issues I've setup multiple branches for > the > > > >> repo: > > > >>> - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* > > is > > > >> the > > > >>> one I had initially "Failed to flush state store > > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve > using > > > >>> > > > >>> > > > >> > > > > > > https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store > > > >>> - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* > > is > > > >> the > > > >>> one after I've tried to solve above problem with the materializer > > > (maybe > > > >>> the SpecificAvroSerde is wrong?) > > > >>> - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>* > > > after > > > >>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), > > new > > > >>> SpecificAvroSerde<>()))) everything seems to be working, if you let > > > both > > > >>> the producer and stream running, you'll see that the stream > receives > > 10 > > > >>> messages (with the timestamp incrementing 1 second for each > message) > > > like > > > >>> this: > > > >>> > > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 163000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 164000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 165000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 166000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 167000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 168000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 169000, "production": 1} > > > >>> > > > >>> and at the 10 seconds interval something like: > > > >>> > > > >>> S1 with computed metric {"timestamp": 160000, "production": 10} > > > >>> S1 with computed metric {"timestamp": 170000, "production": 10} > > > >>> S1 with computed metric {"timestamp": 180000, "production": 10} > > > >>> > > > >>> and so on... > > > >>> Now there are two problems, after stopping and restarting the > stream > > > >>> processor (by sending SIGINT via IntelliJ since I start the class > > main > > > >> with > > > >>> it) it happens: > > > >>> - sometimes the aggregated count is wrong, if I have it start > > > windowing > > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after > restart > > > it > > > >>> might just emit a value for the new 3 missing seconds (seconds > 18-20) > > > and > > > >>> the aggregated value is 3 not 10 > > > >>> - sometimes the window outputs twice, in the example where I > restart > > > the > > > >>> stream processor I might get as output > > > >>> > > > >>> S1 with filtered metric{"timestamp": 154000, "production": 1} > > > >>> S1 with computed metric {"timestamp": 160000, "production": 5} > > > >>> S1 with filtered metric{"timestamp": 155000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 156000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 157000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 158000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 159000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1} > > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1} > > > >>> S1 with computed metric {"timestamp": 160000, "production": 10} > > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1} > > > >>> > > > >>> as you can see, window for timestamp 160000 is duplicated > > > >>> > > > >>> Is this because the window state isn't persisted across restarts? > > > >>> My ultimate goal is to have the window part emit only once and > resume > > > >>> processing across restarts, while avoiding processing out of order > > data > > > >>> (that's the purpose of the TimestampIncrementalFilter) > > > >>> > > > >>> Thank you in advance > > > >>> > > > >>> -- > > > >>> Alessandro Tagliapietra > > > >>> > > > >>> > > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra < > > > >>> tagliapietra.alessan...@gmail.com> wrote: > > > >>> > > > >>>> Hi Bruno, > > > >>>> > > > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2. > > > >>>> Anyway I'll try to make a small reproduction repo with all the > > > >> different > > > >>>> cases soon. > > > >>>> > > > >>>> Thank you > > > >>>> > > > >>>> -- > > > >>>> Alessandro Tagliapietra > > > >>>> > > > >>>> > > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io > > > > > >>> wrote: > > > >>>> > > > >>>>> Hi Alessandro, > > > >>>>> > > > >>>>> What version of Kafka do you use? > > > >>>>> > > > >>>>> Could you please give a more detailed example for the issues with > > the > > > >>> two > > > >>>>> keys you see? > > > >>>>> > > > >>>>> Could the following bug be related to the duplicates you see? > > > >>>>> > > > >>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22 > > > >>>>> > > > >>>>> How do you restart the processor? > > > >>>>> > > > >>>>> Best, > > > >>>>> Bruno > > > >>>>> > > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra < > > > >>>>> tagliapietra.alessan...@gmail.com> wrote: > > > >>>>> > > > >>>>>> Thank you Bruno, > > > >>>>>> > > > >>>>>> I'll look into those, however average is just a simple thing I'm > > > >>> trying > > > >>>>>> right now just to get an initial windowing flow working. > > > >>>>>> In the future I'll probably still need the actual values for > other > > > >>>>>> calculations. We won't have more than 60 elements per window for > > > >> sure. > > > >>>>>> > > > >>>>>> So far to not manually serialize/deserialize the array list I've > > > >>>>> created an > > > >>>>>> Avro model with an array field containing the values. > > > >>>>>> I had issues with suppress as explained here > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198 > > > >>>>>> > > > >>>>>> but I got that working. > > > >>>>>> So far everything seems to be working, except a couple things: > > > >>>>>> - if I generate data with 1 key, I correctly get a value each > 10 > > > >>>>> seconds, > > > >>>>>> if I later start generating data with another key (while key 1 > is > > > >>> still > > > >>>>>> generating) the windowing emits a value only after the timestamp > > of > > > >>> key > > > >>>>> 2 > > > >>>>>> reaches the last generated window > > > >>>>>> - while generating data, if I restart the processor as soon as > it > > > >>>>> starts > > > >>>>>> it sometimes generates 2 aggregates for the same window even if > > I'm > > > >>>>> using > > > >>>>>> the suppress > > > >>>>>> > > > >>>>>> Anyway, I'll look into your link and try to find out the cause > of > > > >>> these > > > >>>>>> issues, probably starting from scratch with a simpler example > > > >>>>>> > > > >>>>>> Thank you for your help! > > > >>>>>> > > > >>>>>> -- > > > >>>>>> Alessandro Tagliapietra > > > >>>>>> > > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna < > > br...@confluent.io> > > > >>>>> wrote: > > > >>>>>> > > > >>>>>>> Hi Alessandro, > > > >>>>>>> > > > >>>>>>> Have a look at this Kafka Usage Pattern for computing averages > > > >>> without > > > >>>>>>> using an ArrayList. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average > > > >>>>>>> ? > > > >>>>>>> > > > >>>>>>> The advantages of this pattern over the ArrayList approach is > the > > > >>>>> reduced > > > >>>>>>> space needed to compute the aggregate. Note that you will still > > > >> need > > > >>>>> to > > > >>>>>>> implement a SerDe. However, the SerDe should be a bit easier to > > > >>>>> implement > > > >>>>>>> than a SerDe for an ArrayList. > > > >>>>>>> > > > >>>>>>> Hope that helps. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Bruno > > > >>>>>>> > > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra < > > > >>>>>>> tagliapietra.alessan...@gmail.com> wrote: > > > >>>>>>> > > > >>>>>>>> Sorry but it seemed harder than I thought, > > > >>>>>>>> > > > >>>>>>>> to have the custom aggregation working I need to get an > > > >> ArrayList > > > >>> of > > > >>>>>> all > > > >>>>>>>> the values in the window, so far my aggregate DSL method > creates > > > >>> an > > > >>>>>>>> ArrayList on the initializer and adds each value to the list > in > > > >>> the > > > >>>>>>>> aggregator. > > > >>>>>>>> Then I think I'll have to provide a serder to change the > output > > > >>>>> type of > > > >>>>>>>> that method. > > > >>>>>>>> I was looking at > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api > > > >>>>>>>> but > > > >>>>>>>> that seems more towards a list of longs and already uses > > > >>> longSerde. > > > >>>>>>>> I'm currently trying to implement another avro model that has > a > > > >>>>> field > > > >>>>>> of > > > >>>>>>>> type array so I can use the regular avro serializer to > implement > > > >>>>> this. > > > >>>>>>>> Should I create my own serdes instead or is this the right > way? > > > >>>>>>>> > > > >>>>>>>> Thank you in advance > > > >>>>>>>> > > > >>>>>>>> -- > > > >>>>>>>> Alessandro Tagliapietra > > > >>>>>>>> > > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra < > > > >>>>>>>> tagliapietra.alessan...@gmail.com> wrote: > > > >>>>>>>> > > > >>>>>>>>> Thank you Bruno and Matthias, > > > >>>>>>>>> > > > >>>>>>>>> I've modified the transformer to implement the > > > >>>>>> ValueTransformerWithKey > > > >>>>>>>>> interface and everything is working fine. > > > >>>>>>>>> I've now to window the data and manually aggregate each > window > > > >>>>> data > > > >>>>>>> since > > > >>>>>>>>> I've to do some averages and sum of differences. > > > >>>>>>>>> So far I've just having some issues with message types since > > > >> I'm > > > >>>>>>> changing > > > >>>>>>>>> the data type when aggregating the window but I think it's an > > > >>> easy > > > >>>>>>>> problem. > > > >>>>>>>>> > > > >>>>>>>>> Thank you again > > > >>>>>>>>> Best > > > >>>>>>>>> > > > >>>>>>>>> -- > > > >>>>>>>>> Alessandro Tagliapietra > > > >>>>>>>>> > > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna < > > > >>>>> br...@confluent.io> > > > >>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Hi Alessandro, > > > >>>>>>>>>> > > > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a > > > >>>>>>>> `ProcessorSupplier`, > > > >>>>>>>>>> so the statement > > > >>>>>>>>>> > > > >>>>>>>>>> `transform` is essentially equivalent to adding the > > > >> Transformer > > > >>>>> via > > > >>>>>>>>>> Topology#addProcessor() to your processor topology > > > >>>>>>>>>> > > > >>>>>>>>>> is correct. > > > >>>>>>>>>> > > > >>>>>>>>>> If you do not change the key, you should definitely use one > > > >> of > > > >>>>> the > > > >>>>>>>>>> overloads of `transformValues` to avoid internal data > > > >>>>>> redistribution. > > > >>>>>>> In > > > >>>>>>>>>> your case the overload with > `ValueTransformerWithKeySupplier` > > > >>> as > > > >>>>>>>> suggested > > > >>>>>>>>>> by Matthias would fit. > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Bruno > > > >>>>>>>>>> > > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax < > > > >>>>>>> matth...@confluent.io > > > >>>>>>>>> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you > > > >>>>> read-only > > > >>>>>>> acess > > > >>>>>>>>>>> to the key. > > > >>>>>>>>>>> > > > >>>>>>>>>>> -Matthias > > > >>>>>>>>>>> > > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > > > >>>>>>>>>>>> Hi Bruno, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thank you for the quick answer. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I'm actually trying to do that since it seems there is > > > >>>>> really no > > > >>>>>>> way > > > >>>>>>>>>> to > > > >>>>>>>>>>>> have it use `Processor<K, V>`. > > > >>>>>>>>>>>> I just wanted (if that would've made any sense) to use > > > >> the > > > >>>>>>> Processor > > > >>>>>>>>>> in > > > >>>>>>>>>>>> both DSL and non-DSL pipelines. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I can > > > >>>>> use it > > > >>>>>>> as > > > >>>>>>>> I > > > >>>>>>>>>>> need > > > >>>>>>>>>>>> the message key since that is the discriminating value > > > >> for > > > >>>>> the > > > >>>>>>>> filter > > > >>>>>>>>>> (I > > > >>>>>>>>>>>> want to exclude old values per sensor ID so per message > > > >>> key) > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Right now I've this > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java > > > >>>>>>>>>>>> and > > > >>>>>>>>>>>> i'm using it with `transform()` . > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> One thing I've found confusing is this > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> transform is essentially equivalent to adding the > > > >>> Transformer > > > >>>>>> via > > > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology > > > >>>>>>>>>>>>> < > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> . > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier while > > > >>>>>>>>>> `addProcessor` > > > >>>>>>>>>>>> uses a ProcessorSupplier? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thank you again for your help > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> -- > > > >>>>>>>>>>>> Alessandro Tagliapietra > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna < > > > >>>>>> br...@confluent.io > > > >>>>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi Alessandro, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Have you considered using `transform()` (actually in > > > >> your > > > >>>>> case > > > >>>>>>> you > > > >>>>>>>>>>> should > > > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`? > > > >>>>> `transform()` > > > >>>>>>> and > > > >>>>>>>>>>>>> `transformValues()` are stateful operations similar to > > > >>>>>> `.process` > > > >>>>>>>> but > > > >>>>>>>>>>> they > > > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply a > > > >>>>>> windowed > > > >>>>>>>>>>>>> aggregation. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Hope that helps. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>> Bruno > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra > > > >> < > > > >>>>>>>>>>>>> tagliapietra.alessan...@gmail.com> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi there, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to create a > > > >>>>> stream > > > >>>>>>>>>>> processor > > > >>>>>>>>>>>>>> that in multiple stages: > > > >>>>>>>>>>>>>> - filters messages using a kv store so that only > > > >>> messages > > > >>>>>> with > > > >>>>>>>>>> higher > > > >>>>>>>>>>>>>> timestamp gets processed > > > >>>>>>>>>>>>>> - aggregates the message metrics by minute giving e.g. > > > >>> the > > > >>>>>> avg > > > >>>>>>> of > > > >>>>>>>>>>> those > > > >>>>>>>>>>>>>> metrics in that minute > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and the > > > >>>>> value > > > >>>>>> is > > > >>>>>>>>>> e.g. { > > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I've started by creating a processor to use the kv > > > >> store > > > >>>>> and > > > >>>>>>>> filter > > > >>>>>>>>>> old > > > >>>>>>>>>>>>>> messages: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very > > > >> nice > > > >>>>>>>> windowing > > > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API > > > >>> (only a > > > >>>>>>> small > > > >>>>>>>>>>>>> reference > > > >>>>>>>>>>>>>> to the windowed store), can someone point me in the > > > >> right > > > >>>>>>>> direction? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried to > > > >>> use > > > >>>>>> the > > > >>>>>>>> DSL > > > >>>>>>>>>> but > > > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I saw > > > >>> this > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > > > > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration > > > >>>>>>>>>>>>>> but > > > >>>>>>>>>>>>>> it explains mostly transformers not processors. I also > > > >>> saw > > > >>>>>> after > > > >>>>>>>>>> that > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>> example usage of the processor but `.process(...)` > > > >>> returns > > > >>>>>> void, > > > >>>>>>>> so > > > >>>>>>>>>> I > > > >>>>>>>>>>>>>> cannot have a KStream from a processor? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thank you all in advance > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>> Alessandro Tagliapietra > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > > > > > > > >