Awesome, thanks a lot for clarifications Jing Zhang, it's very useful. Best,
Svend On Sun, 30 May 2021, at 6:27 AM, JING ZHANG wrote: > Hi Svend, > Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those > version provides many related improvements. > > > as per [1] > Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a > global parameter, It would apply to all those table sources which with > watermark clause but not use SOURCE WATERMARK > > as per [2] > Yes. > > If that is correct, I guess I can simply use the DataStream connector for > > that specific topic and then convert it to a Table. > Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like > the following demo: > Table table = > tableEnv.fromDataStream( > dataStream, > Schema.*newBuilder*() > .XXXX // other logical > .watermark("columnName", "SOURCE_WATERMARK()") > .build()); > I would like to invite Jark And Timo to double check, they are more familiar > with the issue. > > Best, > JING ZHANG > > > Svend <stream...@svend.xyz> 于2021年5月29日周六 下午3:34写道: >> __ >> Hi everyone, >> >> My Flink streaming application consumes several Kafka topics, one of which >> receiving traffic in burst once per day. >> >> I would like that topic not to hold back the progress of the watermark. >> >> Most of my code is currently using the SQL API and in particular the Table >> API Kafka connector. >> >> I have read about the idle source configuration mechanism, could you please >> confirm my understanding that: >> >> * as per [1]: when I'm using the Table API Kafka connector, we currently do >> not have the possibility to specify the idle source parameter specifically >> for each topic, although we can set it globally on the >> StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter >> >> * as per [2]: when using the DataStream Kafka connector, we can set the idle >> source parameter specifically for each topic by specifying ".withIdleness()" >> to the WatermarkStrategy. >> >> If that is correct, I guess I can simply use the DataStream connector for >> that specific topic and then convert it to a Table. >> >> Thanks a lot! >> >> Svend >> >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks >> >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission >> >> >>