Dear team,

I have a question but I don’t see documentation about it. I hope you can help 
me.
I have a Kafka producer and a Kafka consumer, both the first one and the second 
one are written in Java (Java 11).

The Kafka producer has start moment and end moment. On the other hand Kafka 
consumer is working with aggregate windowed operations. The business logic is:

final KStream<String, AggregateDto> transactions = 
builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), 
aggregateSerde));

// Topology
transactions
        .groupBy(this::groupedByTimeStampAndProtocolName)
        .windowedBy( TimeWindows
                .of( Duration.ofSeconds( kafkaProperties.getAggregateSeconds() 
))
                .grace( Duration.ofMillis( 
kafkaProperties.getGraceAggregateMillis() )))
        .aggregate(
                tool::emptyAggregate,
                this::processNewRecord,
                Materialized.<String, AggregateDto, WindowStore<Bytes, 
byte[]>>as(TRANSACTION_AGGREGATE)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(aggregateSerde)
        )
        
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .foreach(sendAggregatesToCassandra);

All are working properly and Kafka is helping a lot, but we have a trouble. 
Like I said before, Kafka producer has start and end moment and Kafka consumer 
is working with windows.

Please, imagine we need N windowed operations to consume all the data from the 
Kafka producer, the Kafka consumer consumes all data from moment 0 to moment 
N-1.
We cannot process the last window because Kafka retains it, (for maintains the 
data consistency) but the producer will never send data anymore.

Is there any way to consume the last window (for instance after 15 minutes or 1 
hour…) in the Kafka consumer?


Thanks in advance for all the information and help.



Best regards.


        
Sebastián Manjón David
Senior Java Software Engineer
dsebast...@sonoc.io <mailto:dsebast...@sonoc.io>
SONOC
C/ Josefa Amar y Borbón, 10, 4ª · 50001 Zaragoza, España
Tlf: +34 917019888 · www.sonoc.io <http://www.sonoc.io/>

Reply via email to