So I've started with a new app with the archetype:generate as in
https://kafka.apache.org/22/documentation/streams/tutorial

I've pushed a sample repo here: https://github.com/alex88/kafka-test
The avro schemas are a Metric with 2 fields: timestamp and production and a
MetricList with a list of records (Metric) to be able to manually do the
aggregation.
Right now the aggregation is simple just for the purpose of the sample repo
and to easily see if we're getting wrong values.

What I wanted to achieve is:
 - have a custom generator that generates 1 message per second with
production = 1 with 1 ore more separate message keys which in my case are
the sensor IDs generating the data
 - a filter that removes out of order messages by having a state that
stores key (sensorID) -> last timestamp
 - a window operation that for this example just sums the values in each 10
seconds windows

To show where I'm having issues I've setup multiple branches for the repo:
 - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* is the
one I had initially "Failed to flush state store
KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve using
https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
 - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* is the
one after I've tried to solve above problem with the materializer (maybe
the SpecificAvroSerde is wrong?)
 - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>* after
fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), new
SpecificAvroSerde<>()))) everything seems to be working, if you let both
the producer and stream running, you'll see that the stream receives 10
messages (with the timestamp incrementing 1 second for each message) like
this:

S1 with filtered metric{"timestamp": 160000, "production": 1}
S1 with filtered metric{"timestamp": 161000, "production": 1}
S1 with filtered metric{"timestamp": 162000, "production": 1}
S1 with filtered metric{"timestamp": 163000, "production": 1}
S1 with filtered metric{"timestamp": 164000, "production": 1}
S1 with filtered metric{"timestamp": 165000, "production": 1}
S1 with filtered metric{"timestamp": 166000, "production": 1}
S1 with filtered metric{"timestamp": 167000, "production": 1}
S1 with filtered metric{"timestamp": 168000, "production": 1}
S1 with filtered metric{"timestamp": 169000, "production": 1}

and at the 10 seconds interval something like:

S1 with computed metric {"timestamp": 160000, "production": 10}
S1 with computed metric {"timestamp": 170000, "production": 10}
S1 with computed metric {"timestamp": 180000, "production": 10}

and so on...
Now there are two problems, after stopping and restarting the stream
processor (by sending SIGINT via IntelliJ since I start the class main with
it) it happens:
 - sometimes the aggregated count is wrong, if I have it start windowing
for 7 seconds (e.g. seconds 11-17), restart the stream, after restart it
might just emit a value for the new 3 missing seconds (seconds 18-20) and
the aggregated value is 3 not 10
 - sometimes the window outputs twice, in the example where I restart the
stream processor I might get as output

S1 with filtered metric{"timestamp": 154000, "production": 1}
S1 with computed metric {"timestamp": 160000, "production": 5}
S1 with filtered metric{"timestamp": 155000, "production": 1}
S1 with filtered metric{"timestamp": 156000, "production": 1}
S1 with filtered metric{"timestamp": 157000, "production": 1}
S1 with filtered metric{"timestamp": 158000, "production": 1}
S1 with filtered metric{"timestamp": 159000, "production": 1}
S1 with filtered metric{"timestamp": 160000, "production": 1}
S1 with filtered metric{"timestamp": 161000, "production": 1}
S1 with computed metric {"timestamp": 160000, "production": 10}
S1 with filtered metric{"timestamp": 162000, "production": 1}

as you can see, window for timestamp 160000 is duplicated

Is this because the window state isn't persisted across restarts?
My ultimate goal is to have the window part emit only once and resume
processing across restarts, while avoiding processing out of order data
(that's the purpose of the TimestampIncrementalFilter)

Thank you in advance

--
Alessandro Tagliapietra


On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Hi Bruno,
>
> I'm using the confluent docker images 5.2.1, so kafka 2.2.
> Anyway I'll try to make a small reproduction repo with all the different
> cases soon.
>
> Thank you
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Hi Alessandro,
>>
>> What version of Kafka do you use?
>>
>> Could you please give a more detailed example for the issues with the two
>> keys you see?
>>
>> Could the following bug be related to the duplicates you see?
>>
>>
>> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
>>
>> How do you restart the processor?
>>
>> Best,
>> Bruno
>>
>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>
>> > Thank you Bruno,
>> >
>> > I'll look into those, however average is just a simple thing I'm trying
>> > right now just to get an initial windowing flow working.
>> > In the future I'll probably still need the actual values for other
>> > calculations. We won't have more than 60 elements per window for sure.
>> >
>> > So far to not manually serialize/deserialize the array list I've
>> created an
>> > Avro model with an array field containing the values.
>> > I had issues with suppress as explained here
>> >
>> >
>> >
>> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
>> >
>> > but I got that working.
>> > So far everything seems to be working, except a couple things:
>> >  - if I generate data with 1 key, I correctly get a value each 10
>> seconds,
>> > if I later start generating data with another key (while key 1 is still
>> > generating) the windowing emits a value only after the timestamp of key
>> 2
>> > reaches the last generated window
>> >  - while generating data, if I restart the processor as soon as it
>> starts
>> > it sometimes generates 2 aggregates for the same window even if I'm
>> using
>> > the suppress
>> >
>> > Anyway, I'll look into your link and try to find out the cause of these
>> > issues, probably starting from scratch with a simpler example
>> >
>> > Thank you for your help!
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> > On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io>
>> wrote:
>> >
>> > > Hi Alessandro,
>> > >
>> > > Have a look at this Kafka Usage Pattern for computing averages without
>> > > using an ArrayList.
>> > >
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
>> > > ?
>> > >
>> > > The advantages of this pattern over the ArrayList approach is the
>> reduced
>> > > space needed to compute the aggregate. Note that you will still need
>> to
>> > > implement a SerDe. However, the SerDe should be a bit easier to
>> implement
>> > > than a SerDe for an ArrayList.
>> > >
>> > > Hope that helps.
>> > >
>> > > Best,
>> > > Bruno
>> > >
>> > > On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
>> > > tagliapietra.alessan...@gmail.com> wrote:
>> > >
>> > > > Sorry but it seemed harder than I thought,
>> > > >
>> > > > to have the custom aggregation working I need to get an ArrayList of
>> > all
>> > > > the values in the window, so far my aggregate DSL method creates an
>> > > > ArrayList on the initializer and adds each value to the list in the
>> > > > aggregator.
>> > > > Then I think I'll have to provide a serder to change the output
>> type of
>> > > > that method.
>> > > > I was looking at
>> > > >
>> > > >
>> > >
>> >
>> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
>> > > > but
>> > > > that seems more towards a list of longs and already uses longSerde.
>> > > > I'm currently trying to implement another avro model that has a
>> field
>> > of
>> > > > type array so I can use the regular avro serializer to implement
>> this.
>> > > > Should I create my own serdes instead or is this the right way?
>> > > >
>> > > > Thank you in advance
>> > > >
>> > > > --
>> > > > Alessandro Tagliapietra
>> > > >
>> > > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
>> > > > tagliapietra.alessan...@gmail.com> wrote:
>> > > >
>> > > > > Thank you Bruno and Matthias,
>> > > > >
>> > > > > I've modified the transformer to implement the
>> > ValueTransformerWithKey
>> > > > > interface and everything is working fine.
>> > > > > I've now to window the data and manually aggregate each window
>> data
>> > > since
>> > > > > I've to do some averages and sum of differences.
>> > > > > So far I've just having some issues with message types since I'm
>> > > changing
>> > > > > the data type when aggregating the window but I think it's an easy
>> > > > problem.
>> > > > >
>> > > > > Thank you again
>> > > > > Best
>> > > > >
>> > > > > --
>> > > > > Alessandro Tagliapietra
>> > > > >
>> > > > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
>> br...@confluent.io>
>> > > > wrote:
>> > > > >
>> > > > >> Hi Alessandro,
>> > > > >>
>> > > > >> the `TransformSupplier` is internally wrapped with a
>> > > > `ProcessorSupplier`,
>> > > > >> so the statement
>> > > > >>
>> > > > >> `transform` is essentially equivalent to adding the Transformer
>> via
>> > > > >> Topology#addProcessor() to your processor topology
>> > > > >>
>> > > > >> is correct.
>> > > > >>
>> > > > >> If you do not change the key, you should definitely use one of
>> the
>> > > > >> overloads of `transformValues` to avoid internal data
>> > redistribution.
>> > > In
>> > > > >> your case the overload with `ValueTransformerWithKeySupplier` as
>> > > > suggested
>> > > > >> by Matthias would fit.
>> > > > >>
>> > > > >> Best,
>> > > > >> Bruno
>> > > > >>
>> > > > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
>> > > matth...@confluent.io
>> > > > >
>> > > > >> wrote:
>> > > > >>
>> > > > >> > There is also `ValueTransformerWithKey` that gives you
>> read-only
>> > > acess
>> > > > >> > to the key.
>> > > > >> >
>> > > > >> > -Matthias
>> > > > >> >
>> > > > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
>> > > > >> > > Hi Bruno,
>> > > > >> > >
>> > > > >> > > Thank you for the quick answer.
>> > > > >> > >
>> > > > >> > > I'm actually trying to do that since it seems there is
>> really no
>> > > way
>> > > > >> to
>> > > > >> > > have it use `Processor<K, V>`.
>> > > > >> > > I just wanted (if that would've made any sense) to use the
>> > > Processor
>> > > > >> in
>> > > > >> > > both DSL and non-DSL pipelines.
>> > > > >> > >
>> > > > >> > > Anyway, regarding `transformValues()` I don't think I can
>> use it
>> > > as
>> > > > I
>> > > > >> > need
>> > > > >> > > the message key since that is the discriminating value for
>> the
>> > > > filter
>> > > > >> (I
>> > > > >> > > want to exclude old values per sensor ID so per message key)
>> > > > >> > >
>> > > > >> > > Right now I've this
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
>> > > > >> > > and
>> > > > >> > > i'm using it with `transform()` .
>> > > > >> > >
>> > > > >> > > One thing I've found confusing is this
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
>> > > > >> > >
>> > > > >> > > transform is essentially equivalent to adding the Transformer
>> > via
>> > > > >> > >> Topology#addProcessor() to yourprocessor topology
>> > > > >> > >> <
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
>> > > > >> > >
>> > > > >> > >> .
>> > > > >> > >
>> > > > >> > >
>> > > > >> > > is it? Doesn't `transform` need a TransformSupplier while
>> > > > >> `addProcessor`
>> > > > >> > > uses a ProcessorSupplier?
>> > > > >> > >
>> > > > >> > > Thank you again for your help
>> > > > >> > >
>> > > > >> > > --
>> > > > >> > > Alessandro Tagliapietra
>> > > > >> > >
>> > > > >> > >
>> > > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
>> > br...@confluent.io
>> > > >
>> > > > >> > wrote:
>> > > > >> > >
>> > > > >> > >> Hi Alessandro,
>> > > > >> > >>
>> > > > >> > >> Have you considered using `transform()` (actually in your
>> case
>> > > you
>> > > > >> > should
>> > > > >> > >> use `transformValues()`) instead of `.process()`?
>> `transform()`
>> > > and
>> > > > >> > >> `transformValues()` are stateful operations similar to
>> > `.process`
>> > > > but
>> > > > >> > they
>> > > > >> > >> return a `KStream`. On a `KStream` you can then apply a
>> > windowed
>> > > > >> > >> aggregation.
>> > > > >> > >>
>> > > > >> > >> Hope that helps.
>> > > > >> > >>
>> > > > >> > >> Best,
>> > > > >> > >> Bruno
>> > > > >> > >>
>> > > > >> > >>
>> > > > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
>> > > > >> > >> tagliapietra.alessan...@gmail.com> wrote:
>> > > > >> > >>
>> > > > >> > >>> Hi there,
>> > > > >> > >>>
>> > > > >> > >>> I'm just starting with Kafka and I'm trying to create a
>> stream
>> > > > >> > processor
>> > > > >> > >>> that in multiple stages:
>> > > > >> > >>>  - filters messages using a kv store so that only messages
>> > with
>> > > > >> higher
>> > > > >> > >>> timestamp gets processed
>> > > > >> > >>>  - aggregates the message metrics by minute giving e.g. the
>> > avg
>> > > of
>> > > > >> > those
>> > > > >> > >>> metrics in that minute
>> > > > >> > >>>
>> > > > >> > >>> The message is simple, the key is the sensor ID and the
>> value
>> > is
>> > > > >> e.g. {
>> > > > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
>> > > > >> > >>>
>> > > > >> > >>> I've started by creating a processor to use the kv store
>> and
>> > > > filter
>> > > > >> old
>> > > > >> > >>> messages:
>> > > > >> > >>>
>> > > > >> > >>>
>> > > > >> > >>>
>> > > > >> > >>
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
>> > > > >> > >>>
>> > > > >> > >>> Then I was trying to implement windowing, I saw very nice
>> > > > windowing
>> > > > >> > >>> examples for the DSL but none for the Processor API (only a
>> > > small
>> > > > >> > >> reference
>> > > > >> > >>> to the windowed store), can someone point me in the right
>> > > > direction?
>> > > > >> > >>>
>> > > > >> > >>> Now, since I wasn't able to find any example I tried to use
>> > the
>> > > > DSL
>> > > > >> but
>> > > > >> > >>> haven't found a way to use my processor with it, I saw this
>> > > > >> > >>>
>> > > > >> > >>>
>> > > > >> > >>
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
>> > > > >> > >>> but
>> > > > >> > >>> it explains mostly transformers not processors. I also saw
>> > after
>> > > > >> that
>> > > > >> > the
>> > > > >> > >>> example usage of the processor but `.process(...)` returns
>> > void,
>> > > > so
>> > > > >> I
>> > > > >> > >>> cannot have a KStream from a processor?
>> > > > >> > >>>
>> > > > >> > >>> Thank you all in advance
>> > > > >> > >>>
>> > > > >> > >>> --
>> > > > >> > >>> Alessandro Tagliapietra
>> > > > >> > >>>
>> > > > >> > >>
>> > > > >> > >
>> > > > >> >
>> > > > >> >
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to