HiI'm using Flink 1.2.0 to read from Kafka- have written a flink
streaming job that creates (event) time based window and then computes some
stats. However, the window function is never called. I used the debug
watermark code and noticed that no watermark is generated. If I read from
file, then only one watermark is generated. Here is my code (reading from
kafka)- public static void main(String[] args) throws Exception {       
StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);    
// Read from kafka and reading works as the following print statement works     
DataStream jsonEventStream = JsonEventStreamReader.readStream(env);       
// jsonEventStream.print();       jsonEventStream            .flatMap(new
.flatMap(new jsonToTupleListFlatMapFunImpl())           
.transform("WatermarkDebugger", tmp.getType(), new WatermarkDebugger<>());      
.keyBy(0, 1, 2)            .timeWindow(Time.seconds(60))           
.allowedLateness(Time.seconds(10))            .reduce(new ReduceFunImpl(),
new WindowFunImpl())   // reduce fun is called but not window           
.addSink(new InfluxDBSink(INFLUXDB_DB));        env.execute();    }   
private static BoundedOutOfOrdernessTimestampExtractor<Tuple2&lt;String,
Long>> getRawJsonTimestampsAndWatermarksAssigner() {        return new
Long>>(Time.seconds(WINDOW_LATENESS)) {            @Override           
public long extractTimestamp(Tuple2<String, Long> tuple) {               
return tuple.f1;            }        };    }    public static
StreamExecutionEnvironment createExecutionEnvironment(String[] args) throws
IOException {        ParameterTool params = ParameterTool.fromArgs(args);       
StreamExecutionEnvironment env =
env.enableCheckpointing(params.getLong("--flink.checkpointing", 5000));       
return env;    }

View this message in context: 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 

Reply via email to