Thanks Matthias, one less thing to worry about in the future :) -- Alessandro Tagliapietra
On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <matth...@confluent.io> wrote: > Just a side note. There is currently work in progress on > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the > configuration problem for Serdes. > > -Matthias > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote: > > Hi Bruno, > > thanks a lot for checking the code, regarding the SpecificAvroSerde I've > > found that using > > > > final Serde<InputList> valueSpecificAvroSerde = new > SpecificAvroSerde<>(); > > final Map<String, String> serdeConfig = > > Collections.singletonMap("schema.registry.url", "http://localhost:8081 > "); > > valueSpecificAvroSerde.configure(serdeConfig, false); > > > > and then in aggregate() > > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde) > > > > fixed the issue. > > > > Thanks in advance for the windowing help, very appreciated. > > In the meantime I'll try to make some progress on the rest. > > > > Have a great weekend > > > > -- > > Alessandro Tagliapietra > > > > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io> > wrote: > > > >> Hi Alessandro, > >> > >> I had a look at your code. Regarding your question whether you use the > >> SpecificAvroSerde correctly, take a look at the following documentation: > >> > >> > https://docs.confluent.io/current/streams/developer-guide/datatypes.html > >> > >> I haven't had the time yet to take a closer look at your problems with > the > >> aggregation. I will have a look next week. > >> > >> Have a nice weekend, > >> Bruno > >> > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra < > >> tagliapietra.alessan...@gmail.com> wrote: > >> > >>> So I've started with a new app with the archetype:generate as in > >>> https://kafka.apache.org/22/documentation/streams/tutorial > >>> > >>> I've pushed a sample repo here: https://github.com/alex88/kafka-test > >>> The avro schemas are a Metric with 2 fields: timestamp and production > >> and a > >>> MetricList with a list of records (Metric) to be able to manually do > the > >>> aggregation. > >>> Right now the aggregation is simple just for the purpose of the sample > >> repo > >>> and to easily see if we're getting wrong values. > >>> > >>> What I wanted to achieve is: > >>> - have a custom generator that generates 1 message per second with > >>> production = 1 with 1 ore more separate message keys which in my case > are > >>> the sensor IDs generating the data > >>> - a filter that removes out of order messages by having a state that > >>> stores key (sensorID) -> last timestamp > >>> - a window operation that for this example just sums the values in > each > >> 10 > >>> seconds windows > >>> > >>> To show where I'm having issues I've setup multiple branches for the > >> repo: > >>> - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* is > >> the > >>> one I had initially "Failed to flush state store > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve using > >>> > >>> > >> > https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store > >>> - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* is > >> the > >>> one after I've tried to solve above problem with the materializer > (maybe > >>> the SpecificAvroSerde is wrong?) > >>> - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>* > after > >>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), new > >>> SpecificAvroSerde<>()))) everything seems to be working, if you let > both > >>> the producer and stream running, you'll see that the stream receives 10 > >>> messages (with the timestamp incrementing 1 second for each message) > like > >>> this: > >>> > >>> S1 with filtered metric{"timestamp": 160000, "production": 1} > >>> S1 with filtered metric{"timestamp": 161000, "production": 1} > >>> S1 with filtered metric{"timestamp": 162000, "production": 1} > >>> S1 with filtered metric{"timestamp": 163000, "production": 1} > >>> S1 with filtered metric{"timestamp": 164000, "production": 1} > >>> S1 with filtered metric{"timestamp": 165000, "production": 1} > >>> S1 with filtered metric{"timestamp": 166000, "production": 1} > >>> S1 with filtered metric{"timestamp": 167000, "production": 1} > >>> S1 with filtered metric{"timestamp": 168000, "production": 1} > >>> S1 with filtered metric{"timestamp": 169000, "production": 1} > >>> > >>> and at the 10 seconds interval something like: > >>> > >>> S1 with computed metric {"timestamp": 160000, "production": 10} > >>> S1 with computed metric {"timestamp": 170000, "production": 10} > >>> S1 with computed metric {"timestamp": 180000, "production": 10} > >>> > >>> and so on... > >>> Now there are two problems, after stopping and restarting the stream > >>> processor (by sending SIGINT via IntelliJ since I start the class main > >> with > >>> it) it happens: > >>> - sometimes the aggregated count is wrong, if I have it start > windowing > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after restart > it > >>> might just emit a value for the new 3 missing seconds (seconds 18-20) > and > >>> the aggregated value is 3 not 10 > >>> - sometimes the window outputs twice, in the example where I restart > the > >>> stream processor I might get as output > >>> > >>> S1 with filtered metric{"timestamp": 154000, "production": 1} > >>> S1 with computed metric {"timestamp": 160000, "production": 5} > >>> S1 with filtered metric{"timestamp": 155000, "production": 1} > >>> S1 with filtered metric{"timestamp": 156000, "production": 1} > >>> S1 with filtered metric{"timestamp": 157000, "production": 1} > >>> S1 with filtered metric{"timestamp": 158000, "production": 1} > >>> S1 with filtered metric{"timestamp": 159000, "production": 1} > >>> S1 with filtered metric{"timestamp": 160000, "production": 1} > >>> S1 with filtered metric{"timestamp": 161000, "production": 1} > >>> S1 with computed metric {"timestamp": 160000, "production": 10} > >>> S1 with filtered metric{"timestamp": 162000, "production": 1} > >>> > >>> as you can see, window for timestamp 160000 is duplicated > >>> > >>> Is this because the window state isn't persisted across restarts? > >>> My ultimate goal is to have the window part emit only once and resume > >>> processing across restarts, while avoiding processing out of order data > >>> (that's the purpose of the TimestampIncrementalFilter) > >>> > >>> Thank you in advance > >>> > >>> -- > >>> Alessandro Tagliapietra > >>> > >>> > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra < > >>> tagliapietra.alessan...@gmail.com> wrote: > >>> > >>>> Hi Bruno, > >>>> > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2. > >>>> Anyway I'll try to make a small reproduction repo with all the > >> different > >>>> cases soon. > >>>> > >>>> Thank you > >>>> > >>>> -- > >>>> Alessandro Tagliapietra > >>>> > >>>> > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io> > >>> wrote: > >>>> > >>>>> Hi Alessandro, > >>>>> > >>>>> What version of Kafka do you use? > >>>>> > >>>>> Could you please give a more detailed example for the issues with the > >>> two > >>>>> keys you see? > >>>>> > >>>>> Could the following bug be related to the duplicates you see? > >>>>> > >>>>> > >>>>> > >>> > >> > https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22 > >>>>> > >>>>> How do you restart the processor? > >>>>> > >>>>> Best, > >>>>> Bruno > >>>>> > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra < > >>>>> tagliapietra.alessan...@gmail.com> wrote: > >>>>> > >>>>>> Thank you Bruno, > >>>>>> > >>>>>> I'll look into those, however average is just a simple thing I'm > >>> trying > >>>>>> right now just to get an initial windowing flow working. > >>>>>> In the future I'll probably still need the actual values for other > >>>>>> calculations. We won't have more than 60 elements per window for > >> sure. > >>>>>> > >>>>>> So far to not manually serialize/deserialize the array list I've > >>>>> created an > >>>>>> Avro model with an array field containing the values. > >>>>>> I had issues with suppress as explained here > >>>>>> > >>>>>> > >>>>>> > >>>>> > >>> > >> > https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198 > >>>>>> > >>>>>> but I got that working. > >>>>>> So far everything seems to be working, except a couple things: > >>>>>> - if I generate data with 1 key, I correctly get a value each 10 > >>>>> seconds, > >>>>>> if I later start generating data with another key (while key 1 is > >>> still > >>>>>> generating) the windowing emits a value only after the timestamp of > >>> key > >>>>> 2 > >>>>>> reaches the last generated window > >>>>>> - while generating data, if I restart the processor as soon as it > >>>>> starts > >>>>>> it sometimes generates 2 aggregates for the same window even if I'm > >>>>> using > >>>>>> the suppress > >>>>>> > >>>>>> Anyway, I'll look into your link and try to find out the cause of > >>> these > >>>>>> issues, probably starting from scratch with a simpler example > >>>>>> > >>>>>> Thank you for your help! > >>>>>> > >>>>>> -- > >>>>>> Alessandro Tagliapietra > >>>>>> > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io> > >>>>> wrote: > >>>>>> > >>>>>>> Hi Alessandro, > >>>>>>> > >>>>>>> Have a look at this Kafka Usage Pattern for computing averages > >>> without > >>>>>>> using an ArrayList. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average > >>>>>>> ? > >>>>>>> > >>>>>>> The advantages of this pattern over the ArrayList approach is the > >>>>> reduced > >>>>>>> space needed to compute the aggregate. Note that you will still > >> need > >>>>> to > >>>>>>> implement a SerDe. However, the SerDe should be a bit easier to > >>>>> implement > >>>>>>> than a SerDe for an ArrayList. > >>>>>>> > >>>>>>> Hope that helps. > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra < > >>>>>>> tagliapietra.alessan...@gmail.com> wrote: > >>>>>>> > >>>>>>>> Sorry but it seemed harder than I thought, > >>>>>>>> > >>>>>>>> to have the custom aggregation working I need to get an > >> ArrayList > >>> of > >>>>>> all > >>>>>>>> the values in the window, so far my aggregate DSL method creates > >>> an > >>>>>>>> ArrayList on the initializer and adds each value to the list in > >>> the > >>>>>>>> aggregator. > >>>>>>>> Then I think I'll have to provide a serder to change the output > >>>>> type of > >>>>>>>> that method. > >>>>>>>> I was looking at > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api > >>>>>>>> but > >>>>>>>> that seems more towards a list of longs and already uses > >>> longSerde. > >>>>>>>> I'm currently trying to implement another avro model that has a > >>>>> field > >>>>>> of > >>>>>>>> type array so I can use the regular avro serializer to implement > >>>>> this. > >>>>>>>> Should I create my own serdes instead or is this the right way? > >>>>>>>> > >>>>>>>> Thank you in advance > >>>>>>>> > >>>>>>>> -- > >>>>>>>> Alessandro Tagliapietra > >>>>>>>> > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra < > >>>>>>>> tagliapietra.alessan...@gmail.com> wrote: > >>>>>>>> > >>>>>>>>> Thank you Bruno and Matthias, > >>>>>>>>> > >>>>>>>>> I've modified the transformer to implement the > >>>>>> ValueTransformerWithKey > >>>>>>>>> interface and everything is working fine. > >>>>>>>>> I've now to window the data and manually aggregate each window > >>>>> data > >>>>>>> since > >>>>>>>>> I've to do some averages and sum of differences. > >>>>>>>>> So far I've just having some issues with message types since > >> I'm > >>>>>>> changing > >>>>>>>>> the data type when aggregating the window but I think it's an > >>> easy > >>>>>>>> problem. > >>>>>>>>> > >>>>>>>>> Thank you again > >>>>>>>>> Best > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Alessandro Tagliapietra > >>>>>>>>> > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna < > >>>>> br...@confluent.io> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Alessandro, > >>>>>>>>>> > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a > >>>>>>>> `ProcessorSupplier`, > >>>>>>>>>> so the statement > >>>>>>>>>> > >>>>>>>>>> `transform` is essentially equivalent to adding the > >> Transformer > >>>>> via > >>>>>>>>>> Topology#addProcessor() to your processor topology > >>>>>>>>>> > >>>>>>>>>> is correct. > >>>>>>>>>> > >>>>>>>>>> If you do not change the key, you should definitely use one > >> of > >>>>> the > >>>>>>>>>> overloads of `transformValues` to avoid internal data > >>>>>> redistribution. > >>>>>>> In > >>>>>>>>>> your case the overload with `ValueTransformerWithKeySupplier` > >>> as > >>>>>>>> suggested > >>>>>>>>>> by Matthias would fit. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Bruno > >>>>>>>>>> > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax < > >>>>>>> matth...@confluent.io > >>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you > >>>>> read-only > >>>>>>> acess > >>>>>>>>>>> to the key. > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > >>>>>>>>>>>> Hi Bruno, > >>>>>>>>>>>> > >>>>>>>>>>>> Thank you for the quick answer. > >>>>>>>>>>>> > >>>>>>>>>>>> I'm actually trying to do that since it seems there is > >>>>> really no > >>>>>>> way > >>>>>>>>>> to > >>>>>>>>>>>> have it use `Processor<K, V>`. > >>>>>>>>>>>> I just wanted (if that would've made any sense) to use > >> the > >>>>>>> Processor > >>>>>>>>>> in > >>>>>>>>>>>> both DSL and non-DSL pipelines. > >>>>>>>>>>>> > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I can > >>>>> use it > >>>>>>> as > >>>>>>>> I > >>>>>>>>>>> need > >>>>>>>>>>>> the message key since that is the discriminating value > >> for > >>>>> the > >>>>>>>> filter > >>>>>>>>>> (I > >>>>>>>>>>>> want to exclude old values per sensor ID so per message > >>> key) > >>>>>>>>>>>> > >>>>>>>>>>>> Right now I've this > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java > >>>>>>>>>>>> and > >>>>>>>>>>>> i'm using it with `transform()` . > >>>>>>>>>>>> > >>>>>>>>>>>> One thing I've found confusing is this > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process > >>>>>>>>>>>> > >>>>>>>>>>>> transform is essentially equivalent to adding the > >>> Transformer > >>>>>> via > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology > >>>>>>>>>>>>> < > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology > >>>>>>>>>>>> > >>>>>>>>>>>>> . > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier while > >>>>>>>>>> `addProcessor` > >>>>>>>>>>>> uses a ProcessorSupplier? > >>>>>>>>>>>> > >>>>>>>>>>>> Thank you again for your help > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> Alessandro Tagliapietra > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna < > >>>>>> br...@confluent.io > >>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Alessandro, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Have you considered using `transform()` (actually in > >> your > >>>>> case > >>>>>>> you > >>>>>>>>>>> should > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`? > >>>>> `transform()` > >>>>>>> and > >>>>>>>>>>>>> `transformValues()` are stateful operations similar to > >>>>>> `.process` > >>>>>>>> but > >>>>>>>>>>> they > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply a > >>>>>> windowed > >>>>>>>>>>>>> aggregation. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hope that helps. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Bruno > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra > >> < > >>>>>>>>>>>>> tagliapietra.alessan...@gmail.com> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi there, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to create a > >>>>> stream > >>>>>>>>>>> processor > >>>>>>>>>>>>>> that in multiple stages: > >>>>>>>>>>>>>> - filters messages using a kv store so that only > >>> messages > >>>>>> with > >>>>>>>>>> higher > >>>>>>>>>>>>>> timestamp gets processed > >>>>>>>>>>>>>> - aggregates the message metrics by minute giving e.g. > >>> the > >>>>>> avg > >>>>>>> of > >>>>>>>>>>> those > >>>>>>>>>>>>>> metrics in that minute > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and the > >>>>> value > >>>>>> is > >>>>>>>>>> e.g. { > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I've started by creating a processor to use the kv > >> store > >>>>> and > >>>>>>>> filter > >>>>>>>>>> old > >>>>>>>>>>>>>> messages: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very > >> nice > >>>>>>>> windowing > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API > >>> (only a > >>>>>>> small > >>>>>>>>>>>>> reference > >>>>>>>>>>>>>> to the windowed store), can someone point me in the > >> right > >>>>>>>> direction? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried to > >>> use > >>>>>> the > >>>>>>>> DSL > >>>>>>>>>> but > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I saw > >>> this > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration > >>>>>>>>>>>>>> but > >>>>>>>>>>>>>> it explains mostly transformers not processors. I also > >>> saw > >>>>>> after > >>>>>>>>>> that > >>>>>>>>>>> the > >>>>>>>>>>>>>> example usage of the processor but `.process(...)` > >>> returns > >>>>>> void, > >>>>>>>> so > >>>>>>>>>> I > >>>>>>>>>>>>>> cannot have a KStream from a processor? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thank you all in advance > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> Alessandro Tagliapietra > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > > >