Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does the window final result is not emitted after the window has elapsed?"
I have browsed the Kafka source code and found the cause of the mentioned behaviour. org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor has the following code: @Override public void process(final K key, final Change<V> value) { buffer(key, value); enforceConstraints(); } enforceConstraints method invocation emits window results under some conditions in the above code. After process method processes the first record, the window begins. After some time, the window closes. But before process is invoked again (triggered by receiving another record), there is no chance to emit the window result. Are there some configuration options to emit the window result without waiting for another record to arrive? And I using Kafka 2.1.0 contained in Confluent Open Source Edition 5.1.0. jingguo yao <yaojing...@gmail.com> 于2018年12月30日周日 下午10:53写道: > > I followed [1] to code a simple example to try suppress operator. > > Here is the simple code: > > final Serde<String> stringSerde = Serdes.String(); > final StreamsBuilder builder = new StreamsBuilder(); > builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), > Serdes.String())) > .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) > .groupBy((key, word) -> word, > Grouped.keySerde(stringSerde).withValueSerde(stringSerde)) > > .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0))) > .count(Materialized.with(Serdes.String(), Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream() > .foreach( > (key, value) -> { > System.out.printf("key: %s, value: %d\n", key, value); > }); > > I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I > send one text line "hello", nothing will be printed even I wait for > more than 3 seconds (the window size). Since the time longer than the > window size has elapsed, I think that key and value should be printed. > > But if I send another text line "hello", key and value will be > printed. > > Can anyone explain this behavior? I have browsed the Kafka > documentation. But I can't find an explanation. > > [1] > http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results > > > -- > Jingguo -- Jingguo