Guozhang: Thanks for your kind help.
Guozhang Wang <wangg...@gmail.com> 于2019年1月5日周六 上午3:28写道: > > Thanks for the detailed description. > > 1) Yes the stream time is advancable by any records. > 2) Given your description, another way to work around the situation is to > let your class send a final record with timestamp set as the class-end-time > plus a small delta (think of it as a sentinel "tick" record just for > advancing the clock), and therefore the stream time can be advanced still > by the end of the last class. > > Guozhang > > > On Thu, Jan 3, 2019 at 11:43 PM jingguo yao <yaojing...@gmail.com> wrote: > > > Guozhang: > > > > Yes, my case is a real production scenario. > > > > I am monitoring on-line live-broadcast classes. I need to have a > > summary of each 5-minute period for one class. Each class has a > > classroom Id. I report class activity data to a Kafka topic. Classroom > > id is used to partition these data. Here are my Kafka streaming steps: > > > > 1. groupByKey: with classroom id. > > 2. windowedBy: with a 5-minute tumbling window. > > 3. aggregate: do a summary over the window. > > 4. Send the aggregate result to other external systems. > > > > Each class has about one-hour length. After the class ends, no data > > will be sent for the class. At each night, there will be dozens of > > classes. > > > > I have done more tests with my code. It seems that a new record to > > advance the window for class A does not need to be a new record for > > class A. A new record for any class can advance the window. Is this > > behavior guaranteed by Kafka streams? Even if this behavior is > > guaranteed, the last window for the last class at one night can't be > > delivered until a new record arrives for a new class tomorrow night. > > > > > > Guozhang Wang <wangg...@gmail.com> 于2019年1月4日周五 上午2:59写道: > > > > > > > > Hello Jingguo, > > > > > > Is this case (i.e. you only have data over 57 minutes, and no new data > > > afterwards) a real production scenario? In stream processing we usually > > > expect the input data stream in continuously, and I'm curious to learn > > your > > > use case better and why it would not have further data after a period of > > > time. > > > > > > ATM, if you want to really walk around this issue you can use system-time > > > based `punctuate` call, which is a lower-level functionality than the > > > `suppress` call in DSL. > > > > > > > > > Guozhang > > > > > > > > > On Thu, Jan 3, 2019 at 6:50 AM jingguo yao <yaojing...@gmail.com> wrote: > > > > > > > Hi, Matthias > > > > > > > > I am doing a 5-minute tumbling window analysis over a 57-minute data > > > > flow. And I want only one final result per window. So I need suppress. > > > > The 57-minute period can be divided into about 12 windows. The results > > > > of the first 11 windows can be delivered downstream. But the final > > > > result for the last 2-minute window can never be delivered downstream > > > > since there is no a new record to advance the window. > > > > > > > > Is there any workaround to deliver the result for the last window in > > > > my situation? > > > > > > > > -- Jingguo > > > > > > > > Matthias J. Sax <matth...@confluent.io> 于2019年1月2日周三 下午10:27写道: > > > > > > > > > > > After some time, the window closes. > > > > > > > > > > This is not correct. Windows are based on event-time, and because no > > new > > > > > input record is processed, the window is not closed. That is the > > reason > > > > > why you don't get any output. Only a new input record can advance > > > > > "stream time" and close the window. > > > > > > > > > > In practice, when data flows continuously, this should not be a issue > > > > > though. > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > On 12/31/18 8:22 AM, jingguo yao wrote: > > > > > > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does > > > > > > the window final result is not emitted after the window has > > elapsed?" > > > > > > > > > > > > I have browsed the Kafka source code and found the cause of the > > > > > > mentioned behaviour. > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor > > > > > > has the following code: > > > > > > > > > > > > @Override > > > > > > public void process(final K key, final Change<V> value) { > > > > > > buffer(key, value); > > > > > > enforceConstraints(); > > > > > > } > > > > > > > > > > > > enforceConstraints method invocation emits window results under > > some > > > > > > conditions in the above code. > > > > > > > > > > > > After process method processes the first record, the window begins. > > > > > > After some time, the window closes. But before process is invoked > > > > > > again (triggered by receiving another record), there is no chance > > to > > > > > > emit the window result. > > > > > > > > > > > > Are there some configuration options to emit the window result > > without > > > > > > waiting for another record to arrive? > > > > > > > > > > > > And I using Kafka 2.1.0 contained in Confluent Open Source Edition > > > > > > 5.1.0. > > > > > > > > > > > > jingguo yao <yaojing...@gmail.com> 于2018年12月30日周日 下午10:53写道: > > > > > >> > > > > > >> I followed [1] to code a simple example to try suppress operator. > > > > > >> > > > > > >> Here is the simple code: > > > > > >> > > > > > >> final Serde<String> stringSerde = Serdes.String(); > > > > > >> final StreamsBuilder builder = new StreamsBuilder(); > > > > > >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), > > > > > >> Serdes.String())) > > > > > >> .flatMapValues(value -> > > > > Arrays.asList(value.toLowerCase().split("\\W+"))) > > > > > >> .groupBy((key, word) -> word, > > > > > >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde)) > > > > > >> > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0))) > > > > > >> .count(Materialized.with(Serdes.String(), Serdes.Long())) > > > > > >> > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > > >> .toStream() > > > > > >> .foreach( > > > > > >> (key, value) -> { > > > > > >> System.out.printf("key: %s, value: %d\n", key, value); > > > > > >> }); > > > > > >> > > > > > >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to > > 0. If > > > > I > > > > > >> send one text line "hello", nothing will be printed even I wait > > for > > > > > >> more than 3 seconds (the window size). Since the time longer than > > the > > > > > >> window size has elapsed, I think that key and value should be > > printed. > > > > > >> > > > > > >> But if I send another text line "hello", key and value will be > > > > > >> printed. > > > > > >> > > > > > >> Can anyone explain this behavior? I have browsed the Kafka > > > > > >> documentation. But I can't find an explanation. > > > > > >> > > > > > >> [1] > > > > > > http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Jingguo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Jingguo > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > > Jingguo > > > > > -- > -- Guozhang -- Jingguo