Hi Alessandro, I had a look at your code. Regarding your question whether you use the SpecificAvroSerde correctly, take a look at the following documentation:
https://docs.confluent.io/current/streams/developer-guide/datatypes.html I haven't had the time yet to take a closer look at your problems with the aggregation. I will have a look next week. Have a nice weekend, Bruno On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra < [email protected]> wrote: > So I've started with a new app with the archetype:generate as in > https://kafka.apache.org/22/documentation/streams/tutorial > > I've pushed a sample repo here: https://github.com/alex88/kafka-test > The avro schemas are a Metric with 2 fields: timestamp and production and a > MetricList with a list of records (Metric) to be able to manually do the > aggregation. > Right now the aggregation is simple just for the purpose of the sample repo > and to easily see if we're getting wrong values. > > What I wanted to achieve is: > - have a custom generator that generates 1 message per second with > production = 1 with 1 ore more separate message keys which in my case are > the sensor IDs generating the data > - a filter that removes out of order messages by having a state that > stores key (sensorID) -> last timestamp > - a window operation that for this example just sums the values in each 10 > seconds windows > > To show where I'm having issues I've setup multiple branches for the repo: > - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* is the > one I had initially "Failed to flush state store > KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve using > > https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store > - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* is the > one after I've tried to solve above problem with the materializer (maybe > the SpecificAvroSerde is wrong?) > - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>* after > fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), new > SpecificAvroSerde<>()))) everything seems to be working, if you let both > the producer and stream running, you'll see that the stream receives 10 > messages (with the timestamp incrementing 1 second for each message) like > this: > > S1 with filtered metric{"timestamp": 160000, "production": 1} > S1 with filtered metric{"timestamp": 161000, "production": 1} > S1 with filtered metric{"timestamp": 162000, "production": 1} > S1 with filtered metric{"timestamp": 163000, "production": 1} > S1 with filtered metric{"timestamp": 164000, "production": 1} > S1 with filtered metric{"timestamp": 165000, "production": 1} > S1 with filtered metric{"timestamp": 166000, "production": 1} > S1 with filtered metric{"timestamp": 167000, "production": 1} > S1 with filtered metric{"timestamp": 168000, "production": 1} > S1 with filtered metric{"timestamp": 169000, "production": 1} > > and at the 10 seconds interval something like: > > S1 with computed metric {"timestamp": 160000, "production": 10} > S1 with computed metric {"timestamp": 170000, "production": 10} > S1 with computed metric {"timestamp": 180000, "production": 10} > > and so on... > Now there are two problems, after stopping and restarting the stream > processor (by sending SIGINT via IntelliJ since I start the class main with > it) it happens: > - sometimes the aggregated count is wrong, if I have it start windowing > for 7 seconds (e.g. seconds 11-17), restart the stream, after restart it > might just emit a value for the new 3 missing seconds (seconds 18-20) and > the aggregated value is 3 not 10 > - sometimes the window outputs twice, in the example where I restart the > stream processor I might get as output > > S1 with filtered metric{"timestamp": 154000, "production": 1} > S1 with computed metric {"timestamp": 160000, "production": 5} > S1 with filtered metric{"timestamp": 155000, "production": 1} > S1 with filtered metric{"timestamp": 156000, "production": 1} > S1 with filtered metric{"timestamp": 157000, "production": 1} > S1 with filtered metric{"timestamp": 158000, "production": 1} > S1 with filtered metric{"timestamp": 159000, "production": 1} > S1 with filtered metric{"timestamp": 160000, "production": 1} > S1 with filtered metric{"timestamp": 161000, "production": 1} > S1 with computed metric {"timestamp": 160000, "production": 10} > S1 with filtered metric{"timestamp": 162000, "production": 1} > > as you can see, window for timestamp 160000 is duplicated > > Is this because the window state isn't persisted across restarts? > My ultimate goal is to have the window part emit only once and resume > processing across restarts, while avoiding processing out of order data > (that's the purpose of the TimestampIncrementalFilter) > > Thank you in advance > > -- > Alessandro Tagliapietra > > > On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra < > [email protected]> wrote: > > > Hi Bruno, > > > > I'm using the confluent docker images 5.2.1, so kafka 2.2. > > Anyway I'll try to make a small reproduction repo with all the different > > cases soon. > > > > Thank you > > > > -- > > Alessandro Tagliapietra > > > > > > On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <[email protected]> > wrote: > > > >> Hi Alessandro, > >> > >> What version of Kafka do you use? > >> > >> Could you please give a more detailed example for the issues with the > two > >> keys you see? > >> > >> Could the following bug be related to the duplicates you see? > >> > >> > >> > https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22 > >> > >> How do you restart the processor? > >> > >> Best, > >> Bruno > >> > >> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra < > >> [email protected]> wrote: > >> > >> > Thank you Bruno, > >> > > >> > I'll look into those, however average is just a simple thing I'm > trying > >> > right now just to get an initial windowing flow working. > >> > In the future I'll probably still need the actual values for other > >> > calculations. We won't have more than 60 elements per window for sure. > >> > > >> > So far to not manually serialize/deserialize the array list I've > >> created an > >> > Avro model with an array field containing the values. > >> > I had issues with suppress as explained here > >> > > >> > > >> > > >> > https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198 > >> > > >> > but I got that working. > >> > So far everything seems to be working, except a couple things: > >> > - if I generate data with 1 key, I correctly get a value each 10 > >> seconds, > >> > if I later start generating data with another key (while key 1 is > still > >> > generating) the windowing emits a value only after the timestamp of > key > >> 2 > >> > reaches the last generated window > >> > - while generating data, if I restart the processor as soon as it > >> starts > >> > it sometimes generates 2 aggregates for the same window even if I'm > >> using > >> > the suppress > >> > > >> > Anyway, I'll look into your link and try to find out the cause of > these > >> > issues, probably starting from scratch with a simpler example > >> > > >> > Thank you for your help! > >> > > >> > -- > >> > Alessandro Tagliapietra > >> > > >> > On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <[email protected]> > >> wrote: > >> > > >> > > Hi Alessandro, > >> > > > >> > > Have a look at this Kafka Usage Pattern for computing averages > without > >> > > using an ArrayList. > >> > > > >> > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average > >> > > ? > >> > > > >> > > The advantages of this pattern over the ArrayList approach is the > >> reduced > >> > > space needed to compute the aggregate. Note that you will still need > >> to > >> > > implement a SerDe. However, the SerDe should be a bit easier to > >> implement > >> > > than a SerDe for an ArrayList. > >> > > > >> > > Hope that helps. > >> > > > >> > > Best, > >> > > Bruno > >> > > > >> > > On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra < > >> > > [email protected]> wrote: > >> > > > >> > > > Sorry but it seemed harder than I thought, > >> > > > > >> > > > to have the custom aggregation working I need to get an ArrayList > of > >> > all > >> > > > the values in the window, so far my aggregate DSL method creates > an > >> > > > ArrayList on the initializer and adds each value to the list in > the > >> > > > aggregator. > >> > > > Then I think I'll have to provide a serder to change the output > >> type of > >> > > > that method. > >> > > > I was looking at > >> > > > > >> > > > > >> > > > >> > > >> > https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api > >> > > > but > >> > > > that seems more towards a list of longs and already uses > longSerde. > >> > > > I'm currently trying to implement another avro model that has a > >> field > >> > of > >> > > > type array so I can use the regular avro serializer to implement > >> this. > >> > > > Should I create my own serdes instead or is this the right way? > >> > > > > >> > > > Thank you in advance > >> > > > > >> > > > -- > >> > > > Alessandro Tagliapietra > >> > > > > >> > > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra < > >> > > > [email protected]> wrote: > >> > > > > >> > > > > Thank you Bruno and Matthias, > >> > > > > > >> > > > > I've modified the transformer to implement the > >> > ValueTransformerWithKey > >> > > > > interface and everything is working fine. > >> > > > > I've now to window the data and manually aggregate each window > >> data > >> > > since > >> > > > > I've to do some averages and sum of differences. > >> > > > > So far I've just having some issues with message types since I'm > >> > > changing > >> > > > > the data type when aggregating the window but I think it's an > easy > >> > > > problem. > >> > > > > > >> > > > > Thank you again > >> > > > > Best > >> > > > > > >> > > > > -- > >> > > > > Alessandro Tagliapietra > >> > > > > > >> > > > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna < > >> [email protected]> > >> > > > wrote: > >> > > > > > >> > > > >> Hi Alessandro, > >> > > > >> > >> > > > >> the `TransformSupplier` is internally wrapped with a > >> > > > `ProcessorSupplier`, > >> > > > >> so the statement > >> > > > >> > >> > > > >> `transform` is essentially equivalent to adding the Transformer > >> via > >> > > > >> Topology#addProcessor() to your processor topology > >> > > > >> > >> > > > >> is correct. > >> > > > >> > >> > > > >> If you do not change the key, you should definitely use one of > >> the > >> > > > >> overloads of `transformValues` to avoid internal data > >> > redistribution. > >> > > In > >> > > > >> your case the overload with `ValueTransformerWithKeySupplier` > as > >> > > > suggested > >> > > > >> by Matthias would fit. > >> > > > >> > >> > > > >> Best, > >> > > > >> Bruno > >> > > > >> > >> > > > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax < > >> > > [email protected] > >> > > > > > >> > > > >> wrote: > >> > > > >> > >> > > > >> > There is also `ValueTransformerWithKey` that gives you > >> read-only > >> > > acess > >> > > > >> > to the key. > >> > > > >> > > >> > > > >> > -Matthias > >> > > > >> > > >> > > > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > >> > > > >> > > Hi Bruno, > >> > > > >> > > > >> > > > >> > > Thank you for the quick answer. > >> > > > >> > > > >> > > > >> > > I'm actually trying to do that since it seems there is > >> really no > >> > > way > >> > > > >> to > >> > > > >> > > have it use `Processor<K, V>`. > >> > > > >> > > I just wanted (if that would've made any sense) to use the > >> > > Processor > >> > > > >> in > >> > > > >> > > both DSL and non-DSL pipelines. > >> > > > >> > > > >> > > > >> > > Anyway, regarding `transformValues()` I don't think I can > >> use it > >> > > as > >> > > > I > >> > > > >> > need > >> > > > >> > > the message key since that is the discriminating value for > >> the > >> > > > filter > >> > > > >> (I > >> > > > >> > > want to exclude old values per sensor ID so per message > key) > >> > > > >> > > > >> > > > >> > > Right now I've this > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > > >> > > > >> > > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java > >> > > > >> > > and > >> > > > >> > > i'm using it with `transform()` . > >> > > > >> > > > >> > > > >> > > One thing I've found confusing is this > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > > >> > > > >> > > >> > https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process > >> > > > >> > > > >> > > > >> > > transform is essentially equivalent to adding the > Transformer > >> > via > >> > > > >> > >> Topology#addProcessor() to yourprocessor topology > >> > > > >> > >> < > >> > > > >> > > >> > > > >> > >> > > > > >> > > > >> > > >> > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology > >> > > > >> > > > >> > > > >> > >> . > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > is it? Doesn't `transform` need a TransformSupplier while > >> > > > >> `addProcessor` > >> > > > >> > > uses a ProcessorSupplier? > >> > > > >> > > > >> > > > >> > > Thank you again for your help > >> > > > >> > > > >> > > > >> > > -- > >> > > > >> > > Alessandro Tagliapietra > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna < > >> > [email protected] > >> > > > > >> > > > >> > wrote: > >> > > > >> > > > >> > > > >> > >> Hi Alessandro, > >> > > > >> > >> > >> > > > >> > >> Have you considered using `transform()` (actually in your > >> case > >> > > you > >> > > > >> > should > >> > > > >> > >> use `transformValues()`) instead of `.process()`? > >> `transform()` > >> > > and > >> > > > >> > >> `transformValues()` are stateful operations similar to > >> > `.process` > >> > > > but > >> > > > >> > they > >> > > > >> > >> return a `KStream`. On a `KStream` you can then apply a > >> > windowed > >> > > > >> > >> aggregation. > >> > > > >> > >> > >> > > > >> > >> Hope that helps. > >> > > > >> > >> > >> > > > >> > >> Best, > >> > > > >> > >> Bruno > >> > > > >> > >> > >> > > > >> > >> > >> > > > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra < > >> > > > >> > >> [email protected]> wrote: > >> > > > >> > >> > >> > > > >> > >>> Hi there, > >> > > > >> > >>> > >> > > > >> > >>> I'm just starting with Kafka and I'm trying to create a > >> stream > >> > > > >> > processor > >> > > > >> > >>> that in multiple stages: > >> > > > >> > >>> - filters messages using a kv store so that only > messages > >> > with > >> > > > >> higher > >> > > > >> > >>> timestamp gets processed > >> > > > >> > >>> - aggregates the message metrics by minute giving e.g. > the > >> > avg > >> > > of > >> > > > >> > those > >> > > > >> > >>> metrics in that minute > >> > > > >> > >>> > >> > > > >> > >>> The message is simple, the key is the sensor ID and the > >> value > >> > is > >> > > > >> e.g. { > >> > > > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }. > >> > > > >> > >>> > >> > > > >> > >>> I've started by creating a processor to use the kv store > >> and > >> > > > filter > >> > > > >> old > >> > > > >> > >>> messages: > >> > > > >> > >>> > >> > > > >> > >>> > >> > > > >> > >>> > >> > > > >> > >> > >> > > > >> > > >> > > > >> > >> > > > > >> > > > >> > > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java > >> > > > >> > >>> > >> > > > >> > >>> Then I was trying to implement windowing, I saw very nice > >> > > > windowing > >> > > > >> > >>> examples for the DSL but none for the Processor API > (only a > >> > > small > >> > > > >> > >> reference > >> > > > >> > >>> to the windowed store), can someone point me in the right > >> > > > direction? > >> > > > >> > >>> > >> > > > >> > >>> Now, since I wasn't able to find any example I tried to > use > >> > the > >> > > > DSL > >> > > > >> but > >> > > > >> > >>> haven't found a way to use my processor with it, I saw > this > >> > > > >> > >>> > >> > > > >> > >>> > >> > > > >> > >> > >> > > > >> > > >> > > > >> > >> > > > > >> > > > >> > > >> > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration > >> > > > >> > >>> but > >> > > > >> > >>> it explains mostly transformers not processors. I also > saw > >> > after > >> > > > >> that > >> > > > >> > the > >> > > > >> > >>> example usage of the processor but `.process(...)` > returns > >> > void, > >> > > > so > >> > > > >> I > >> > > > >> > >>> cannot have a KStream from a processor? > >> > > > >> > >>> > >> > > > >> > >>> Thank you all in advance > >> > > > >> > >>> > >> > > > >> > >>> -- > >> > > > >> > >>> Alessandro Tagliapietra > >> > > > >> > >>> > >> > > > >> > >> > >> > > > >> > > > >> > > > >> > > >> > > > >> > > >> > > > >> > >> > > > > > >> > > > > >> > > > >> > > >> > > >
