Steve Zhou created KAFKA-13290: ---------------------------------- Summary: My timeWindows last aggregated message never emit until a new message coming Key: KAFKA-13290 URL: https://issues.apache.org/jira/browse/KAFKA-13290 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.2 Environment: Development Reporter: Steve Zhou
I have a stream code which aggregate 1 minutes data. It works as expected if data comes continuously, If we stop producer, then i found the last aggregated message does not emit until new message coming, even the new message has different key. Following is my sample code, @Bean public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder streamBuilder) { KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder .stream(dataTopic, dataConsumed) .groupByKey(Grouped.with( stringSerde, aggregateValueSerde)) .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) .aggregate(this::initialize, this::aggregateFields, materializedAsWindowStore(windowedStoreName, stringSerde, AggregateMetricsFieldsSerde)) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName(windowedFlowSuppressNodeName)) .toStream().map((key, aggregateMetrics) -> { return KeyValue.pair(key.key(), aggregateMetrics); }); aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); return aggregatedFlowData; } -- This message was sent by Atlassian Jira (v8.3.4#803005)