Hi Sam, do you have an idea what the timestamps in your data are, i.e. the tuple.f1 field you're extracting. What you could try is instead of windowing simply print your data and observe the timestamps. Maybe we can learn something from this about why the window doesn't trigger.
Best, Aljoscha On Wed, Mar 1, 2017 at 12:13 AM, Sam Huang <sam.hu...@reflektion.com> wrote: > Hi, > > I'm using *Flink* *1.2.0* to read from *Kafka*-0.8.1.1_2.10. > > I have written a *flink* streaming job that creates (event) time based > window and then computes some stats. However, the window function is never > called. I used the debug watermark code and noticed that no watermark is > generated. If I read from file, then only one watermark is generated. Here > is my code (reading from *kafka*)- > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > FlinkUtil.createExecutionEnvironment(args); > > // Read from kafka (it works as the following print statement works) > DataStream<String> jsonEventStream = > JsonEventStreamReader.readStream(env); > // jsonEventStream.print(); > > jsonEventStream > .flatMap(new strToTupleFlatMapFunImpl()) > > .assignTimestampsAndWatermarks(getRawJsonTimestampsAndWatermarksAssigner()) > .flatMap(new jsonToTupleListFlatMapFunImpl()) > .keyBy(0, 1, 2) > .timeWindow(Time.seconds(60)) > .allowedLateness(Time.seconds(10)) > .reduce(new ReduceFunImpl(), new WindowFunImpl()) // reduce fun gets > called but not window fun > .addSink(new InfluxDBSink(INFLUXDB_DB)); > > env.execute(); > } > > private static BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>> > getRawJsonTimestampsAndWatermarksAssigner() { > return new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, > Long>>(Time.seconds(WINDOW_LATENESS)) { > @Override > public long extractTimestamp(Tuple2<String, Long> tuple) { > return tuple.f1; > } > }; > } > > > public static StreamExecutionEnvironment createExecutionEnvironment(String[] > args) throws IOException { > ParameterTool params = ParameterTool.fromArgs(args); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(params); > //env.getConfig().setAutoWatermarkInterval(1000); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > return env; > } > > > Any help will be appreciated. > > Thank you, > > Sam >