Hi,

I have a query with regard to Late arriving records.
We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
In my sink operators, which converts this table to a stream which is being
pushed to Elastic Search, I am able to see this metric "
*numLateRecordsDropped*".

My Kafka consumers doesn't seem to have any lag and the events are
processed properly. To be able to take these events to a side outputs
doesn't seem to be possible with tables. Below is the snippet:

        tableEnv.connect(new Kafka()
          /* setting of all kafka properties */
               .startFromLatest())
               .withSchema(new Schema()
                       .field("sid", Types.STRING())
                       .field("_zpsbd6", Types.STRING())
                       .field("r1", Types.STRING())
                       .field("r2", Types.STRING())
                       .field("r5", Types.STRING())
                       .field("r10", Types.STRING())
                       .field("isBot", Types.BOOLEAN())
                       .field("botcode", Types.STRING())
                       .field("ts", Types.SQL_TIMESTAMP())
                       .rowtime(new Rowtime()
                               .timestampsFromField("recvdTime")
                               .watermarksPeriodicBounded(10000)
                       )
               )
               .withFormat(new Json().deriveSchema())
               .inAppendMode()
               .registerTableSource("sourceTopic");

       String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, "
               + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, "
               + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
sourceTopic "
               + "WHERE r1='true' or r2='true' or r5='true' or r10='true'
and isBot='true' "
               + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,  _zpsbd6";

Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
showing the lateRecordsDropped, while executing the group by operation.

Is there  a way to get the sideOutput of this to be able to debug better ??

Thanks,
~Ramya.

Reply via email to