Hello,

We have a doubt about Kafka streams on how it works. Or at least we are
having some troubles in making it to work.

The purpose we want to achieve is to group by user some messages that
we receive from a Kafka topic and window them in order to aggregate the
messages we receive in the window (5 minutes). Then, I'd like to
collect all aggregates in each window in order to process them at once
adding them to a report of all the messages I received in the 5 minutes
interval.

The last point seems to be the tough part as Kafka Streams doesn't seem
to provide (at least we can't find it :() anything that can collect all
the window related stuff in a "finite" stream to be processed in one
place.

The file (implemented_code.txt) contains the code we have implemented
where it contains at least one of our tries to make it to work. 

You can find its result inside the file (result.txt)

For each window there are many log lines and they are mixed with the
other windows.

What I'd like to have is something like:

// Hypothetical implementation
windowedMessages.streamWindows((interval, window) -> process(interval,
window));

where method process would be something like:

// Hypothetical implementation
void process(Interval interval, WindowStream<UserId, List<Message>>
windowStream) {
// Create report for the whole window   
Report report = new Report(nameFromInterval());
    // Loop on the finite iterable that represents the window content
    for (WindowStreamEntry<UserId, List<Message>> entry: windowStream)
{
        report.addLine(entry.getKey(), entry.getValue());
    }
    report.close();
}

StreamsBuilder builder = new StreamsBuilder();
KStream<UserId, Message> messages = builder.stream("KAFKA_TOPIC");

TimeWindowedKStream<UserId, Message> windowedMessages =
        messages.
                groupByKey().windowedBy(TimeWindows.of(SIZE_MS));

KTable<Windowed<UserId>, List<Message>> messagesAggregatedByWindow =
        windowedMessages.
                aggregate(
                        () -> new LinkedList<>(), new MyAggregator<>(),
                        Materialized.with(new MessageKeySerde(), new 
MessageListSerde())
                );

messagesAggregatedByWindow.toStream().foreach((key, value) -> log.info("({}), 
KEY {} MESSAGE {}",  value.size(), key, value.toString()));

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
KEY [UserId(82770583)@1531502760000/1531502770000] Message 
[Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
KEY [UserId(77082590)@1531502760000/1531502770000] Message 
[Message(userId=UserId(77082590),message="g")]
KEY [UserId(85077691)@1531502750000/1531502760000] Message 
[Message(userId=UserId(85077691),message="h")]
KEY [UserId(79117307)@1531502380000/1531502390000] Message 
[Message(userId=UserId(79117307),message="e")]
KEY [UserId(73176289)@1531502760000/1531502770000] Message 
[Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
KEY [UserId(92077080)@1531502760000/1531502770000] Message 
[Message(userId=UserId(92077080),message="w")]
KEY [UserId(78530050)@1531502760000/1531502770000] Message 
[Message(userId=UserId(78530050),message="t")]
KEY [UserId(64640536)@1531502760000/1531502770000] Message 
[Message(userId=UserId(64640536),message="y")]

Reply via email to