Steve Zhou created KAFKA-13290:
----------------------------------

             Summary: My timeWindows last aggregated message never emit until a 
new message coming 
                 Key: KAFKA-13290
                 URL: https://issues.apache.org/jira/browse/KAFKA-13290
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.6.2
         Environment: Development
            Reporter: Steve Zhou


I have a stream code which aggregate 1 minutes data. 

It works as expected if data comes continuously, 

If we stop producer, then i found the last aggregated message does not emit 
until new message coming, even the new message has different key.

 

Following is my sample code, @Bean
 public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder 
streamBuilder) {

  KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder
              .stream(dataTopic, dataConsumed)
              .groupByKey(Grouped.with(
                            stringSerde,
                            aggregateValueSerde))
              
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
             .aggregate(this::initialize, this::aggregateFields,
                              materializedAsWindowStore(windowedStoreName, 
stringSerde,
                              AggregateMetricsFieldsSerde))
           
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
           .withName(windowedFlowSuppressNodeName))
            .toStream().map((key, aggregateMetrics) -> {
           return KeyValue.pair(key.key(), aggregateMetrics);
   });
 aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
 return aggregatedFlowData;
 }

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to