Dear team, I have a question but I don’t see documentation about it. I hope you can help me. I have a Kafka producer and a Kafka consumer, both the first one and the second one are written in Java (Java 11).
The Kafka producer has start moment and end moment. On the other hand Kafka consumer is working with aggregate windowed operations. The business logic is: final KStream<String, AggregateDto> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde)); // Topology transactions .groupBy(this::groupedByTimeStampAndProtocolName) .windowedBy( TimeWindows .of( Duration.ofSeconds( kafkaProperties.getAggregateSeconds() )) .grace( Duration.ofMillis( kafkaProperties.getGraceAggregateMillis() ))) .aggregate( tool::emptyAggregate, this::processNewRecord, Materialized.<String, AggregateDto, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE) .withKeySerde(Serdes.String()) .withValueSerde(aggregateSerde) ) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream() .foreach(sendAggregatesToCassandra); All are working properly and Kafka is helping a lot, but we have a trouble. Like I said before, Kafka producer has start and end moment and Kafka consumer is working with windows. Please, imagine we need N windowed operations to consume all the data from the Kafka producer, the Kafka consumer consumes all data from moment 0 to moment N-1. We cannot process the last window because Kafka retains it, (for maintains the data consistency) but the producer will never send data anymore. Is there any way to consume the last window (for instance after 15 minutes or 1 hour…) in the Kafka consumer? Thanks in advance for all the information and help. Best regards. Sebastián Manjón David Senior Java Software Engineer dsebast...@sonoc.io <mailto:dsebast...@sonoc.io> SONOC C/ Josefa Amar y Borbón, 10, 4ª · 50001 Zaragoza, España Tlf: +34 917019888 · www.sonoc.io <http://www.sonoc.io/>