Hi Team, I have a Flink job that consumes from a kafka topic and tries to create windows (using Tumble) based on few columns like eventId and eventName. Kafka topic has data in format of comma separated values like below:
event1,Util1,1647614467000,0.12 event1,Util1,1647614527000,0.26 event1,Util1,1647614587000,0.71 event2,Util2,1647614647000,0.08 event2,Util2,1647614707000,0.32 event2,Util2,1647614767000,0.23 event2,Util2,1647614827000,0.85 event1,Util1,1647614887000,0.08 event1,Util1,1647614947000,0.32 Here is the Flink code I’m using for this: main() { val env = StreamExecutionEnvironment.getExecutionEnvironment() val tableEnv = StreamTableEnvironment.create(env) val kafkaSource = KafkaSource.builder<String>() .setBootstrapServers("localhost:9092") .setTopics("an-topic") .setGroupId("testGroup") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(SimpleStringSchema()) .build() val kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource") val kafkaRowMapper = kafkaStream.map(RowMapper()) val finalTable = tableEnv.fromDataStream(kafkaRowMapper, Schema.newBuilder() .columnByExpression("proc_time", "PROCTIME()") .columnByExpression("event_time", "TO_TIMESTAMP_LTZ(f2, 3)") .watermark("event_time", "event_time - INTERVAL '20' SECOND") .build() ).renameColumns( `$`("f0").`as`("eventId"), `$`("f1").`as`("eventName"), `$`("f3").`as`("eventValue") ) tableEnv.createTemporaryView("finalTable", finalTable) val sqlQuery = "SELECT eventId, eventName, TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS event_time_new, " + "LAST_VALUE(eventValue) AS eventValue FROM finalTable " + "GROUP BY eventId, eventName, TUMBLE(event_time, INTERVAL '1' MINUTE)" val resultTable = tableEnv.sqlQuery(sqlQuery) tableEnv.toDataStream(resultTable).print() env.execute("TestJob") } class RowMapper: MapFunction<String, Tuple4<String, String, Long, Float>> { override fun map(value: String): Tuple4<String, String, Long, Float> { val lineArray = value.split(",") return Tuple4 (lineArray[0], lineArray[1], lineArray[2].toLong(), lineArray[3].toFloat()) } } When I use proc_time instead of event_time, I’m able to create windows and see results. I suspect that the code is correct but the way I'm testing is wrong. How shall I test this for event time? Can someone please help me on what mistake I’m making here? Thanks in advance.