Hi Svend, Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those version provides many related improvements.
> as per [1] Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a global parameter, It would apply to all those table sources which with watermark clause but not use SOURCE WATERMARK > as per [2] Yes. > If that is correct, I guess I can simply use the DataStream connector for that specific topic and then convert it to a Table. Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like the following demo: Table table = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .XXXX // other logical .watermark("columnName", "SOURCE_WATERMARK()") .build()); I would like to invite Jark And Timo to double check, they are more familiar with the issue. Best, JING ZHANG Svend <stream...@svend.xyz> 于2021年5月29日周六 下午3:34写道: > Hi everyone, > > My Flink streaming application consumes several Kafka topics, one of which > receiving traffic in burst once per day. > > I would like that topic not to hold back the progress of the watermark. > > Most of my code is currently using the SQL API and in particular the Table > API Kafka connector. > > I have read about the idle source configuration mechanism, could you > please confirm my understanding that: > > * as per [1]: when I'm using the Table API Kafka connector, we currently > do not have the possibility to specify the idle source parameter > specifically for each topic, although we can set it globally on > the StreamTableEnvironment with the "table.exec.source.idle-timeout" > parameter > > * as per [2]: when using the DataStream Kafka connector, we can set the > idle source parameter specifically for each topic by specifying > ".withIdleness()" to the WatermarkStrategy. > > If that is correct, I guess I can simply use the DataStream connector for > that specific topic and then convert it to a Table. > > Thanks a lot! > > Svend > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission > > > >