> After some time, the window closes. This is not correct. Windows are based on event-time, and because no new input record is processed, the window is not closed. That is the reason why you don't get any output. Only a new input record can advance "stream time" and close the window.
In practice, when data flows continuously, this should not be a issue though. -Matthias On 12/31/18 8:22 AM, jingguo yao wrote: > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does > the window final result is not emitted after the window has elapsed?" > > I have browsed the Kafka source code and found the cause of the > mentioned behaviour. > > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor > has the following code: > > @Override > public void process(final K key, final Change<V> value) { > buffer(key, value); > enforceConstraints(); > } > > enforceConstraints method invocation emits window results under some > conditions in the above code. > > After process method processes the first record, the window begins. > After some time, the window closes. But before process is invoked > again (triggered by receiving another record), there is no chance to > emit the window result. > > Are there some configuration options to emit the window result without > waiting for another record to arrive? > > And I using Kafka 2.1.0 contained in Confluent Open Source Edition > 5.1.0. > > jingguo yao <yaojing...@gmail.com> 于2018年12月30日周日 下午10:53写道: >> >> I followed [1] to code a simple example to try suppress operator. >> >> Here is the simple code: >> >> final Serde<String> stringSerde = Serdes.String(); >> final StreamsBuilder builder = new StreamsBuilder(); >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), >> Serdes.String())) >> .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) >> .groupBy((key, word) -> word, >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde)) >> >> .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0))) >> .count(Materialized.with(Serdes.String(), Serdes.Long())) >> >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) >> .toStream() >> .foreach( >> (key, value) -> { >> System.out.printf("key: %s, value: %d\n", key, value); >> }); >> >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I >> send one text line "hello", nothing will be printed even I wait for >> more than 3 seconds (the window size). Since the time longer than the >> window size has elapsed, I think that key and value should be printed. >> >> But if I send another text line "hello", key and value will be >> printed. >> >> Can anyone explain this behavior? I have browsed the Kafka >> documentation. But I can't find an explanation. >> >> [1] >> http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results >> >> >> -- >> Jingguo > > >
signature.asc
Description: OpenPGP digital signature