Hi Bruno, What you are asking is a common request. There is a KIP in the works, https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables, that should suit the requirements you've outlined.
In the meantime, I'll see if I can come up with an alternative approach over the next few days. -Bill On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini < bruno.bottazz...@targatelematics.com> wrote: > Hello, > > We have a doubt about Kafka streams on how it works. Or at least we are > having some troubles in making it to work. > > The purpose we want to achieve is to group by user some messages that > we receive from a Kafka topic and window them in order to aggregate the > messages we receive in the window (5 minutes). Then, I'd like to > collect all aggregates in each window in order to process them at once > adding them to a report of all the messages I received in the 5 minutes > interval. > > The last point seems to be the tough part as Kafka Streams doesn't seem > to provide (at least we can't find it :() anything that can collect all > the window related stuff in a "finite" stream to be processed in one > place. > > The file (implemented_code.txt) contains the code we have implemented > where it contains at least one of our tries to make it to work. > > You can find its result inside the file (result.txt) > > For each window there are many log lines and they are mixed with the > other windows. > > What I'd like to have is something like: > > // Hypothetical implementation > windowedMessages.streamWindows((interval, window) -> process(interval, > window)); > > where method process would be something like: > > // Hypothetical implementation > void process(Interval interval, WindowStream<UserId, List<Message>> > windowStream) { > // Create report for the whole window > Report report = new Report(nameFromInterval()); > // Loop on the finite iterable that represents the window content > for (WindowStreamEntry<UserId, List<Message>> entry: windowStream) > { > report.addLine(entry.getKey(), entry.getValue()); > } > report.close(); > } > >