yuchengxin commented on code in PR #22767:
URL: https://github.com/apache/flink/pull/22767#discussion_r1257911008


##########
docs/content/docs/dev/table/concepts/time_attributes.md:
##########
@@ -99,6 +99,105 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
 
 ```
 
+#### Advanced watermark features
+
+In previous versions, many advanced features of watermark (such as watermark 
alignment) were easy to use through the datastream api, but not so easy to use 
in sql, so we have extended these features in version 1.18 to enable users to 
use them in sql as well.
+
+{{< hint warning >}}
+**Note:** Only source connectors that implement the 
`SupportsWatermarkPushDown` interface (e.g. kafka, pulsar) can use these 
advanced features. If a source does not implement the 
`SupportsWatermarkPushDown` interface, but the task is configured with these 
parameters, the task can run normally, but these parameters will not take 
effect.
+
+These features all can be configured with dynamic table options or the 
'OPTIONS' hint, If the user has configured these feature both in the dynamic 
table options and in the 'OPTIONS' hint, the options in the 'OPTIONS' hint are 
preferred. If the user uses 'OPTIONS' hint for the same source table in 
multiple places, the first hint will be used.
+{{< /hint >}}
+
+##### I. Configure watermark emit strategy
+There are two strategies to emit watermark in flink:
+
+- on-periodic: emit watermark periodic
+- on-event: emit watermark per event data
+
+In the DataStream API, the user can choose the emit strategy through the 
WatermarkGenerator interface ([Writing WatermarkGenerators]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#writing-watermarkgenerators)). For sql tasks, watermark is emited 
periodically by default, with a default period of 200ms, which can be changed 
by the parameter `pipeline.auto-watermark-interval`. If you need to emit 
watermark per event data, you can configure it in the source table as follows:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3).
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.exit.strategy'='on-event'.

Review Comment:
   modified



##########
docs/content/docs/dev/table/concepts/time_attributes.md:
##########
@@ -99,6 +99,105 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
 
 ```
 
+#### Advanced watermark features
+
+In previous versions, many advanced features of watermark (such as watermark 
alignment) were easy to use through the datastream api, but not so easy to use 
in sql, so we have extended these features in version 1.18 to enable users to 
use them in sql as well.
+
+{{< hint warning >}}
+**Note:** Only source connectors that implement the 
`SupportsWatermarkPushDown` interface (e.g. kafka, pulsar) can use these 
advanced features. If a source does not implement the 
`SupportsWatermarkPushDown` interface, but the task is configured with these 
parameters, the task can run normally, but these parameters will not take 
effect.
+
+These features all can be configured with dynamic table options or the 
'OPTIONS' hint, If the user has configured these feature both in the dynamic 
table options and in the 'OPTIONS' hint, the options in the 'OPTIONS' hint are 
preferred. If the user uses 'OPTIONS' hint for the same source table in 
multiple places, the first hint will be used.
+{{< /hint >}}
+
+##### I. Configure watermark emit strategy
+There are two strategies to emit watermark in flink:
+
+- on-periodic: emit watermark periodic
+- on-event: emit watermark per event data
+
+In the DataStream API, the user can choose the emit strategy through the 
WatermarkGenerator interface ([Writing WatermarkGenerators]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#writing-watermarkgenerators)). For sql tasks, watermark is emited 
periodically by default, with a default period of 200ms, which can be changed 
by the parameter `pipeline.auto-watermark-interval`. If you need to emit 
watermark per event data, you can configure it in the source table as follows:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3).
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.exit.strategy'='on-event'.
+  ...
+)
+```
+
+Of course, you can also use the `OPTIONS` hint:
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ 
OPTIONS('scan.watermark.exit.strategy'='on-periodic') */

Review Comment:
   modified



##########
docs/content/docs/dev/table/concepts/time_attributes.md:
##########
@@ -99,6 +99,105 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
 
 ```
 
+#### Advanced watermark features
+
+In previous versions, many advanced features of watermark (such as watermark 
alignment) were easy to use through the datastream api, but not so easy to use 
in sql, so we have extended these features in version 1.18 to enable users to 
use them in sql as well.
+
+{{< hint warning >}}
+**Note:** Only source connectors that implement the 
`SupportsWatermarkPushDown` interface (e.g. kafka, pulsar) can use these 
advanced features. If a source does not implement the 
`SupportsWatermarkPushDown` interface, but the task is configured with these 
parameters, the task can run normally, but these parameters will not take 
effect.
+
+These features all can be configured with dynamic table options or the 
'OPTIONS' hint, If the user has configured these feature both in the dynamic 
table options and in the 'OPTIONS' hint, the options in the 'OPTIONS' hint are 
preferred. If the user uses 'OPTIONS' hint for the same source table in 
multiple places, the first hint will be used.
+{{< /hint >}}
+
+##### I. Configure watermark emit strategy
+There are two strategies to emit watermark in flink:
+
+- on-periodic: emit watermark periodic
+- on-event: emit watermark per event data
+
+In the DataStream API, the user can choose the emit strategy through the 
WatermarkGenerator interface ([Writing WatermarkGenerators]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#writing-watermarkgenerators)). For sql tasks, watermark is emited 
periodically by default, with a default period of 200ms, which can be changed 
by the parameter `pipeline.auto-watermark-interval`. If you need to emit 
watermark per event data, you can configure it in the source table as follows:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3).
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.exit.strategy'='on-event'.
+  ...
+)
+```
+
+Of course, you can also use the `OPTIONS` hint:
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ 
OPTIONS('scan.watermark.exit.strategy'='on-periodic') */
+```
+
+##### II. Configure the idle-timeout of source table
+
+If a split/partition/shard in the source table does not send event data for 
some time, it means that `WatermarkGenerator` will not get any new data to 
generate watermark either, we call such data sources as idle inputs or idle 
sources. In this case, a problem occurs if some other partition is still 
sending event data, because the downstream operator's watermark is calculated 
by taking the minimum value of all upstream parallel data sources' watermarks, 
and since the idle split/partition/shard is not generating a new watermark, the 
downstream operator's watermark will not change. However, if the idle timeout 
is configured, the split/partition/shard will be marked as idle if no event 
data is sent in the timeout, and the downstream will ignore this idle sourse 
when calculating new watermark.
+
+A global idle timeout can be defined in sql with the 
`table.exec.source.idle-timeout` parameter, which will take effect for each 
source table. However, if you want to set a different idle timeout for each 
source table, you can configure in the source table by parameter 
`scan.watermark.idle-timeout` like this:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3).
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.idle-timeout'='1min'.

Review Comment:
   modified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to