yuchengxin commented on code in PR #22767: URL: https://github.com/apache/flink/pull/22767#discussion_r1257911008
########## docs/content/docs/dev/table/concepts/time_attributes.md: ########## @@ -99,6 +99,105 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE); ``` +#### Advanced watermark features + +In previous versions, many advanced features of watermark (such as watermark alignment) were easy to use through the datastream api, but not so easy to use in sql, so we have extended these features in version 1.18 to enable users to use them in sql as well. + +{{< hint warning >}} +**Note:** Only source connectors that implement the `SupportsWatermarkPushDown` interface (e.g. kafka, pulsar) can use these advanced features. If a source does not implement the `SupportsWatermarkPushDown` interface, but the task is configured with these parameters, the task can run normally, but these parameters will not take effect. + +These features all can be configured with dynamic table options or the 'OPTIONS' hint, If the user has configured these feature both in the dynamic table options and in the 'OPTIONS' hint, the options in the 'OPTIONS' hint are preferred. If the user uses 'OPTIONS' hint for the same source table in multiple places, the first hint will be used. +{{< /hint >}} + +##### I. Configure watermark emit strategy +There are two strategies to emit watermark in flink: + +- on-periodic: emit watermark periodic +- on-event: emit watermark per event data + +In the DataStream API, the user can choose the emit strategy through the WatermarkGenerator interface ([Writing WatermarkGenerators]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}#writing-watermarkgenerators)). For sql tasks, watermark is emited periodically by default, with a default period of 200ms, which can be changed by the parameter `pipeline.auto-watermark-interval`. If you need to emit watermark per event data, you can configure it in the source table as follows: + +```sql +-- configure in table options +CREATE TABLE user_actions ( + ... + user_action_time TIMESTAMP(3). + WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND +) WITH ( + 'scan.watermark.exit.strategy'='on-event'. Review Comment: modified ########## docs/content/docs/dev/table/concepts/time_attributes.md: ########## @@ -99,6 +99,105 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE); ``` +#### Advanced watermark features + +In previous versions, many advanced features of watermark (such as watermark alignment) were easy to use through the datastream api, but not so easy to use in sql, so we have extended these features in version 1.18 to enable users to use them in sql as well. + +{{< hint warning >}} +**Note:** Only source connectors that implement the `SupportsWatermarkPushDown` interface (e.g. kafka, pulsar) can use these advanced features. If a source does not implement the `SupportsWatermarkPushDown` interface, but the task is configured with these parameters, the task can run normally, but these parameters will not take effect. + +These features all can be configured with dynamic table options or the 'OPTIONS' hint, If the user has configured these feature both in the dynamic table options and in the 'OPTIONS' hint, the options in the 'OPTIONS' hint are preferred. If the user uses 'OPTIONS' hint for the same source table in multiple places, the first hint will be used. +{{< /hint >}} + +##### I. Configure watermark emit strategy +There are two strategies to emit watermark in flink: + +- on-periodic: emit watermark periodic +- on-event: emit watermark per event data + +In the DataStream API, the user can choose the emit strategy through the WatermarkGenerator interface ([Writing WatermarkGenerators]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}#writing-watermarkgenerators)). For sql tasks, watermark is emited periodically by default, with a default period of 200ms, which can be changed by the parameter `pipeline.auto-watermark-interval`. If you need to emit watermark per event data, you can configure it in the source table as follows: + +```sql +-- configure in table options +CREATE TABLE user_actions ( + ... + user_action_time TIMESTAMP(3). + WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND +) WITH ( + 'scan.watermark.exit.strategy'='on-event'. + ... +) +``` + +Of course, you can also use the `OPTIONS` hint: +```sql +-- use 'OPTIONS' hint +select ... from source_table /*+ OPTIONS('scan.watermark.exit.strategy'='on-periodic') */ Review Comment: modified ########## docs/content/docs/dev/table/concepts/time_attributes.md: ########## @@ -99,6 +99,105 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE); ``` +#### Advanced watermark features + +In previous versions, many advanced features of watermark (such as watermark alignment) were easy to use through the datastream api, but not so easy to use in sql, so we have extended these features in version 1.18 to enable users to use them in sql as well. + +{{< hint warning >}} +**Note:** Only source connectors that implement the `SupportsWatermarkPushDown` interface (e.g. kafka, pulsar) can use these advanced features. If a source does not implement the `SupportsWatermarkPushDown` interface, but the task is configured with these parameters, the task can run normally, but these parameters will not take effect. + +These features all can be configured with dynamic table options or the 'OPTIONS' hint, If the user has configured these feature both in the dynamic table options and in the 'OPTIONS' hint, the options in the 'OPTIONS' hint are preferred. If the user uses 'OPTIONS' hint for the same source table in multiple places, the first hint will be used. +{{< /hint >}} + +##### I. Configure watermark emit strategy +There are two strategies to emit watermark in flink: + +- on-periodic: emit watermark periodic +- on-event: emit watermark per event data + +In the DataStream API, the user can choose the emit strategy through the WatermarkGenerator interface ([Writing WatermarkGenerators]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}#writing-watermarkgenerators)). For sql tasks, watermark is emited periodically by default, with a default period of 200ms, which can be changed by the parameter `pipeline.auto-watermark-interval`. If you need to emit watermark per event data, you can configure it in the source table as follows: + +```sql +-- configure in table options +CREATE TABLE user_actions ( + ... + user_action_time TIMESTAMP(3). + WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND +) WITH ( + 'scan.watermark.exit.strategy'='on-event'. + ... +) +``` + +Of course, you can also use the `OPTIONS` hint: +```sql +-- use 'OPTIONS' hint +select ... from source_table /*+ OPTIONS('scan.watermark.exit.strategy'='on-periodic') */ +``` + +##### II. Configure the idle-timeout of source table + +If a split/partition/shard in the source table does not send event data for some time, it means that `WatermarkGenerator` will not get any new data to generate watermark either, we call such data sources as idle inputs or idle sources. In this case, a problem occurs if some other partition is still sending event data, because the downstream operator's watermark is calculated by taking the minimum value of all upstream parallel data sources' watermarks, and since the idle split/partition/shard is not generating a new watermark, the downstream operator's watermark will not change. However, if the idle timeout is configured, the split/partition/shard will be marked as idle if no event data is sent in the timeout, and the downstream will ignore this idle sourse when calculating new watermark. + +A global idle timeout can be defined in sql with the `table.exec.source.idle-timeout` parameter, which will take effect for each source table. However, if you want to set a different idle timeout for each source table, you can configure in the source table by parameter `scan.watermark.idle-timeout` like this: + +```sql +-- configure in table options +CREATE TABLE user_actions ( + ... + user_action_time TIMESTAMP(3). + WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND +) WITH ( + 'scan.watermark.idle-timeout'='1min'. Review Comment: modified -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org