Awesome, thanks a lot for clarifications Jing Zhang, it's very useful.
Best,
Svend
On Sun, 30 May 2021, at 6:27 AM, JING ZHANG wrote:
> Hi Svend,
> Your solution could work well in Flink 1.13.0 and Flink 1.13.0+ because those
> version provides many related improvements.
>
> > as per [1]
> Yes, "table.exec.source.idle-timeout" is not table-level parameter, but a
> global parameter, It would apply to all those table sources which with
> watermark clause but not use SOURCE WATERMARK
> > as per [2]
> Yes.
> > If that is correct, I guess I can simply use the DataStream connector for
> > that specific topic and then convert it to a Table.
> Yes, and please use SOURCE_WATERMARK() when convert DataStream to Table, like
> the following demo:
> Table table =
> tableEnv.fromDataStream(
> dataStream,
> Schema.*newBuilder*()
> .XXXX // other logical
> .watermark("columnName", "SOURCE_WATERMARK()")
> .build());
> I would like to invite Jark And Timo to double check, they are more familiar
> with the issue.
>
> Best,
> JING ZHANG
>
>
> Svend <[email protected]> 于2021年5月29日周六 下午3:34写道:
>> __
>> Hi everyone,
>>
>> My Flink streaming application consumes several Kafka topics, one of which
>> receiving traffic in burst once per day.
>>
>> I would like that topic not to hold back the progress of the watermark.
>>
>> Most of my code is currently using the SQL API and in particular the Table
>> API Kafka connector.
>>
>> I have read about the idle source configuration mechanism, could you please
>> confirm my understanding that:
>>
>> * as per [1]: when I'm using the Table API Kafka connector, we currently do
>> not have the possibility to specify the idle source parameter specifically
>> for each topic, although we can set it globally on the
>> StreamTableEnvironment with the "table.exec.source.idle-timeout" parameter
>>
>> * as per [2]: when using the DataStream Kafka connector, we can set the idle
>> source parameter specifically for each topic by specifying ".withIdleness()"
>> to the WatermarkStrategy.
>>
>> If that is correct, I guess I can simply use the DataStream connector for
>> that specific topic and then convert it to a Table.
>>
>> Thanks a lot!
>>
>> Svend
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#source-per-partition-watermarks
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-and-timestamp-extractionwatermark-emission
>>
>>
>>