Hello Bruno,

We've discussed about the callback approach before, but then we realized
with the proposed API, this can still be achieved. In the "Final window
results per key" section, John showed how to do that. Note the resulted
stream will have exactly one record per each key, with the value
representing the "final result" for that key.


Guozhang


On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini <
bruno.bottazz...@targatelematics.com> wrote:

> Bill,
>
> After reading the documentation and sure it looks really close to our
> need however I had a doubt about it.
>
> One small question.
>
> I was expecting also a callback that Kafka would call after the whole
> period is passed and this callback would receive an iterable object
> that contains all the aggregated information that was collected in the
> same period.
>
> Will it be possible when using grace() or suppress()?
>
> Best Regards,
>
> Bruno
>
> On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> > Hi Bruno,
> >
> > What you are asking is a common request.  There is a KIP in the
> > works,
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+
> > to+suppress+updates+for+KTables,
> > that should suit the requirements you've outlined.
> >
> > In the meantime, I'll see if I can come up with an alternative
> > approach
> > over the next few days.
> >
> > -Bill
> >
> > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> > bruno.bottazz...@targatelematics.com> wrote:
> >
> > >
> > > Hello,
> > >
> > > We have a doubt about Kafka streams on how it works. Or at least we
> > > are
> > > having some troubles in making it to work.
> > >
> > > The purpose we want to achieve is to group by user some messages
> > > that
> > > we receive from a Kafka topic and window them in order to aggregate
> > > the
> > > messages we receive in the window (5 minutes). Then, I'd like to
> > > collect all aggregates in each window in order to process them at
> > > once
> > > adding them to a report of all the messages I received in the 5
> > > minutes
> > > interval.
> > >
> > > The last point seems to be the tough part as Kafka Streams doesn't
> > > seem
> > > to provide (at least we can't find it :() anything that can collect
> > > all
> > > the window related stuff in a "finite" stream to be processed in
> > > one
> > > place.
> > >
> > > The file (implemented_code.txt) contains the code we have
> > > implemented
> > > where it contains at least one of our tries to make it to work.
> > >
> > > You can find its result inside the file (result.txt)
> > >
> > > For each window there are many log lines and they are mixed with
> > > the
> > > other windows.
> > >
> > > What I'd like to have is something like:
> > >
> > > // Hypothetical implementation
> > > windowedMessages.streamWindows((interval, window) ->
> > > process(interval,
> > > window));
> > >
> > > where method process would be something like:
> > >
> > > // Hypothetical implementation
> > > void process(Interval interval, WindowStream<UserId, List<Message>>
> > > windowStream) {
> > > // Create report for the whole window
> > > Report report = new Report(nameFromInterval());
> > >     // Loop on the finite iterable that represents the window
> > > content
> > >     for (WindowStreamEntry<UserId, List<Message>> entry:
> > > windowStream)
> > > {
> > >         report.addLine(entry.getKey(), entry.getValue());
> > >     }
> > >     report.close();
> > > }
> > >
> > >
>



-- 
-- Guozhang

Reply via email to