Hi, I have a query with regard to Late arriving records. We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. In my sink operators, which converts this table to a stream which is being pushed to Elastic Search, I am able to see this metric " *numLateRecordsDropped*".
My Kafka consumers doesn't seem to have any lag and the events are processed properly. To be able to take these events to a side outputs doesn't seem to be possible with tables. Below is the snippet: tableEnv.connect(new Kafka() /* setting of all kafka properties */ .startFromLatest()) .withSchema(new Schema() .field("sid", Types.STRING()) .field("_zpsbd6", Types.STRING()) .field("r1", Types.STRING()) .field("r2", Types.STRING()) .field("r5", Types.STRING()) .field("r10", Types.STRING()) .field("isBot", Types.BOOLEAN()) .field("botcode", Types.STRING()) .field("ts", Types.SQL_TIMESTAMP()) .rowtime(new Rowtime() .timestampsFromField("recvdTime") .watermarksPeriodicBounded(10000) ) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSource("sourceTopic"); String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, " + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, " + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM sourceTopic " + "WHERE r1='true' or r2='true' or r5='true' or r10='true' and isBot='true' " + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, _zpsbd6"; Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is showing the lateRecordsDropped, while executing the group by operation. Is there a way to get the sideOutput of this to be able to debug better ?? Thanks, ~Ramya.