Hi, yes you are right. I forgot that the interval is set by default when enabling event time.
Also your comment about triggering the window is correct. Technically, you don't need a record that falls into the next window, but just a watermark that is past the window boundary. In your case, watermarks only advance if the assigner sees more records and you'd need a record with a timestamp of at least 2017-12-14 13:10:15 (or 16), because the watermark assigner subtracts 10 seconds. Given the current watermark assigner, there is no other way than sending more records to trigger a window computation. You can implement a custom assigner to also emit watermarks without data, but that would somewhat bind the event-time watermarks to the clock of the generating machine such that watermarks wouldn't be only data-driven. Best, Fabian 2017-12-14 17:25 GMT+01:00 Plamen Paskov <[email protected]>: > Hi Fabian, > > Thank you for your response! I think it's not necessary to do that because > i have a call to anyway: > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > which do exactly what you say. It set the watermark interval to 200ms . > I think i found the problem and it is the default event-time trigger attached > to the assigner?. > According to the docs here > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html > : "*all the event-time window assigners have an EventTimeTrigger as default > trigger. > This trigger simply fires once the watermark passes the end of a window.*" . > All i have to do in order to trigger the computation is to send an event > which will fall in "next" window. > So the question now is how can i set trigger to fire in regular intervals > (e.g. every 5 seconds) using table API? > > > On 14.12.2017 17:57, Fabian Hueske wrote: > > Hi, > > you are using a BoundedOutOfOrdernessTimestampExtractor to generate > watermarks. > The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark > assigner and only generates watermarks if a watermark interval is > configured. > Without watermarks, the query cannot "make progress" and only computes its > result when the program is closed (sources emit a MAX_LONG watermark when > being canceled). > > Long story short: you need to configure the watermark interval: > env.getConfig.setAutoWatermarkInterval(100L); > > Best, Fabian > > 2017-12-14 16:30 GMT+01:00 Plamen Paskov <[email protected]>: > >> Hi, >> >> I'm trying to run the following streaming program in my local flink 1.3.2 >> environment. The program compile and run without any errors but the print() >> call doesn't display anything. Once i stop the program i receive all >> aggregated data. Any ideas how to make it output regularly or when new data >> come/old data updated? >> >> package flink; >> import org.apache.flink.api.common.functions.MapFunction;import >> org.apache.flink.api.java.tuple.Tuple2;import >> org.apache.flink.streaming.api.TimeCharacteristic;import >> org.apache.flink.streaming.api.datastream.DataStream;import >> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import >> org.apache.flink.streaming.api.windowing.time.Time;import >> org.apache.flink.table.api.Table;import >> org.apache.flink.table.api.java.Slide;import >> org.apache.flink.table.api.java.StreamTableEnvironment; >> import java.sql.Timestamp; >> >> public class StreamingJob { >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> StreamTableEnvironment tEnv = >> StreamTableEnvironment.getTableEnvironment(env); >> >> >> SingleOutputStreamOperator<WC> input = env >> .socketTextStream("localhost", 9000, "\n") >> .map(new MapFunction<String, WC>() { >> @Override public WC map(String value) >> throws Exception { >> String[] row = value.split(","); >> Timestamp timestamp = Timestamp.valueOf(row[2]); >> return new WC(row[0], Long.valueOf(row[1]), >> timestamp); >> } >> }) >> .assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) { >> @Override public long >> extractTimestamp(WC element) { >> return element.dt.getTime(); >> } >> }); >> >> >> tEnv.registerDataStream("WordCount", input, "word, frequency, >> dt.rowtime"); >> >> Table table = tEnv.scan("WordCount") >> >> .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w")) >> .groupBy("w, word") >> .select("word, frequency.sum as frequency, w.start as dt"); >> DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, >> WC.class); >> result.print(); >> >> env.execute(); >> } >> >> public static class WC { >> public String word; >> public long frequency; >> public Timestamp dt; >> >> public WC() { >> } >> >> public WC(String word, long frequency, Timestamp dt) { >> this.word = word; >> this.frequency = frequency; >> this.dt = dt; >> } >> >> @Override public String toString() { >> return "WC " + word + " " + frequency + " " + dt.getTime(); >> } >> } >> } >> >> >> Sample input: >> >> hello,1,2017-12-14 13:10:01 >> ciao,1,2017-12-14 13:10:02 >> hello,1,2017-12-14 13:10:03 >> hello,1,2017-12-14 13:10:04 >> >> >> Thanks >> > > >
