Awesome,  thanks a lot for clarifications Jing Zhang, it's very useful.

Best,

Svend

On Sun, 30 May 2021, at 6:27 AM, JING ZHANG wrote:
> Hi Svend,
> Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those 
> version provides many related improvements.
> 
> > as per [1]
> Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a 
> global parameter, It would apply to all those table sources which with 
> watermark  clause but not use SOURCE WATERMARK
> > as per [2]
> Yes.
> > If that is correct, I guess I can simply use the DataStream connector for 
> > that specific topic and then convert it to a Table.
> Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like 
> the following demo:
> Table table =
>         tableEnv.fromDataStream(
>                 dataStream,
>                 Schema.*newBuilder*()
>                         .XXXX // other logical
>                         .watermark("columnName", "SOURCE_WATERMARK()")
>                         .build());
> I would like to invite Jark And Timo to double check, they are more familiar 
> with the issue.
> 
> Best,
> JING ZHANG
> 
> 
> Svend <stream...@svend.xyz> 于2021年5月29日周六 下午3:34写道:
>> __
>> Hi everyone,
>> 
>> My Flink streaming application consumes several Kafka topics, one of which 
>> receiving traffic in burst once per day.
>> 
>> I would like that topic not to hold back the progress of the watermark.
>> 
>> Most of my code is currently using the SQL API and in particular the Table 
>> API Kafka connector.
>> 
>> I have read about the idle source configuration mechanism, could you please 
>> confirm my understanding that:
>> 
>> * as per [1]: when I'm using the Table API Kafka connector, we currently do 
>> not have the possibility to specify the idle source parameter specifically 
>> for each topic, although we can set it globally on the 
>> StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter
>> 
>> * as per [2]: when using the DataStream Kafka connector, we can set the idle 
>> source parameter specifically for each topic by specifying ".withIdleness()" 
>> to the WatermarkStrategy.
>> 
>> If that is correct, I guess I can simply use the DataStream connector for 
>> that specific topic and then convert it to a Table.
>> 
>> Thanks a lot!
>> 
>> Svend
>> 
>> 
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission
>> 
>> 
>> 

Reply via email to