Hello Sebastián, Thanks for the question. I think the reason you don't find documentation for this is that it's quite unusual to have a data stream that just stops at some point and never sends any more data.
Can you elaborate a little whether you meant that the whole input topic just "ends" at some point, or are there maybe just different producers responsible for different keys, and each key has a finite lifespan? As you noted, the issue is that the "untilWindowCloses" directive uses stream time, which stops advancing when the input stream stops. If it's really the case that the whole input topic has an "end", and if the producer knows when the end is reached, one thing it could do is produce one last "end of topic" message to every partition, which has a timestamp calculated to advance the stream time of every partition. This might be tricky if you have other intermediate repartition operations, though. I hope this helps, -John On Mon, 2020-09-14 at 15:30 +0200, David Sebastián Manjo wrote: > Dear team, > > I have a question but I don’t see documentation about it. I hope you can help > me. > I have a Kafka producer and a Kafka consumer, both the first one and the > second one are written in Java (Java 11). > > The Kafka producer has start moment and end moment. On the other hand Kafka > consumer is working with aggregate windowed operations. The business logic is: > > final KStream<String, AggregateDto> transactions = > builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), > aggregateSerde)); > > // Topology > transactions > .groupBy(this::groupedByTimeStampAndProtocolName) > .windowedBy( TimeWindows > .of( Duration.ofSeconds( > kafkaProperties.getAggregateSeconds() )) > .grace( Duration.ofMillis( > kafkaProperties.getGraceAggregateMillis() ))) > .aggregate( > tool::emptyAggregate, > this::processNewRecord, > Materialized.<String, AggregateDto, WindowStore<Bytes, > byte[]>>as(TRANSACTION_AGGREGATE) > .withKeySerde(Serdes.String()) > .withValueSerde(aggregateSerde) > ) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream() > .foreach(sendAggregatesToCassandra); > > All are working properly and Kafka is helping a lot, but we have a trouble. > Like I said before, Kafka producer has start and end moment and Kafka > consumer is working with windows. > > Please, imagine we need N windowed operations to consume all the data from > the Kafka producer, the Kafka consumer consumes all data from moment 0 to > moment N-1. > We cannot process the last window because Kafka retains it, (for maintains > the data consistency) but the producer will never send data anymore. > > Is there any way to consume the last window (for instance after 15 minutes or > 1 hour…) in the Kafka consumer? > > > Thanks in advance for all the information and help. > > > > Best regards. > > > > Sebastián Manjón David > Senior Java Software Engineer > dsebast...@sonoc.io <mailto:dsebast...@sonoc.io> > SONOC > C/ Josefa Amar y Borbón, 10, 4ª · 50001 Zaragoza, España > Tlf: +34 917019888 · www.sonoc.io <http://www.sonoc.io/>