godfreyhe commented on code in PR #20577: URL: https://github.com/apache/flink/pull/20577#discussion_r952122673
########## docs/content/docs/dev/table/sql/queries/hints.md: ########## @@ -84,4 +84,323 @@ insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select ``` +## Query Hints + +### Join Hints + +#### LOOKUP Hint + +{{< label Streaming >}} + +The LOOKUP hint allows users to suggest the Flink optimizer to: +1. use synchronous(sync) or asynchronous(async) lookup function +2. configure the async parameters +3. enable delayed retry strategy for lookup + +```sql +SELECT /*+ LOOKUP(key=value[, key=value]*) */ + +key: + stringLiteral + +value: + stringLiteral +``` + +The available hint options: + +<table class="table table-bordered"> +<thead> +<tr> + <th>option type</th> + <th>option name</th> + <th>required</th> + <th>value type</th> + <th>default value</th> + <th class="text-left">description</th> +</tr> +</thead> +<tbody> +<tr> + <td rowspan="1">table</td> + <td>table</td> + <td>Y</td> + <td>string</td> + <td>N/A</td> + <td>the table name of the lookup source</td> +</tr> +<tr> + <td rowspan="4">async</td> + <td>async</td> + <td>N</td> + <td>boolean</td> + <td>N/A</td> + <td>value can be 'true' or 'false' to suggest the planner choose the corresponding lookup function. + If the backend lookup source does not support the suggested lookup mode, it will take no effect.</td> +</tr> +<tr> + <td>output-mode</td> + <td>N</td> + <td>string</td> + <td>ordered</td> + <td>value can be 'ordered' or 'allow_unordered'.<br />'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with <br />`ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`.</td> +</tr> +<tr> + <td>capacity</td> + <td>N</td> + <td>integer</td> + <td>100</td> + <td>the buffer capacity for the backend asyncWaitOperator of the lookup join operator.</td> +</tr> +<tr> + <td>timeout</td> + <td>N</td> + <td>duration</td> + <td>300s</td> + <td>timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover</td> +</tr> +<tr> + <td rowspan="4">retry</td> + <td>retry-predicate</td> + <td>N</td> + <td>string</td> + <td>N/A</td> + <td>can be 'lookup_miss' which will enable retry if lookup result is empty.</td> +</tr> +<tr> + <td>retry-strategy</td> + <td>N</td> + <td>string</td> + <td>N/A</td> + <td>can be 'fixed_delay'</td> +</tr> +<tr> + <td>fixed-delay</td> + <td>N</td> + <td>duration</td> + <td>N/A</td> + <td>delay time for the 'fixed_delay' strategy</td> +</tr> +<tr> + <td>max-attempts</td> + <td>N</td> + <td>integer</td> + <td>N/A</td> + <td>max attempt number of the 'fixed_delay' strategy</td> +</tr> +</tbody> +</table> + +Note: +- 'table' option is required, only table name is supported(keep consistent with which in the FROM clause), alias name is not supported currently(will be supported in later versions). +- async options are all optional, will use default value if not configured. +- there is no default value for retry options, all retry options should be set to valid values when need to enable retry. + +##### 1. Use Sync And Async Lookup Function +If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='false' +to suggest the planner to use the sync lookup or 'async'='true' to use the async lookup: + +Example: +```sql +-- suggest the optimizer to use sync lookup +LOOKUP('table'='Customers', 'async'='false') + +-- suggest the optimizer to use async lookup +LOOKUP('table'='Customers', 'async'='true') +``` +Note: the optimizer prefers async lookup if no 'async' option is specified, it will always use sync lookup when: +1. the connector only implements the sync lookup +2. user enables 'TRY_RESOLVE' mode of 'table.optimizer.non-deterministic-update.strategy' and the +optimizer has checked there's correctness issue caused by non-deterministic update. + +##### 2. Configure The Async Parameters +Users can configure the async parameters via async options on async lookup mode. + +Example: +```sql +-- configure the async parameters: 'output-mode', 'capacity', 'timeout', can set single one or multi params +LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +``` +Note: the async options are consistent with the async options in [job level Execution Options]({{< ref "docs/dev/table/config" >}}#execution-options), +will use job level configuration if not set. Another difference is that the scope of the LOOKUP hint +is smaller, limited to the table name corresponding to the hint option set in the current lookup +operation (other lookup operations will not be affected by the LOOKUP hint). + +e.g., if the job level configuration is: +```gitexclude +table.exec.async-lookup.output-mode: ORDERED +table.exec.async-lookup.buffer-capacity: 100 +table.exec.async-lookup.timeout: 180s +``` + +then the following hints: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') +2. LOOKUP('table'='Customers', 'async'='true', 'timeout'='300s') +``` + +are equivalent to: +```sql +1. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') +2. LOOKUP('table'='Customers', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') +``` + +##### 3. Enable Delayed Retry Strategy For Lookup +Delayed retry for lookup join is intended to solve the problem of delayed updates in external system +which cause unexpected enrichment with stream data. The hint option 'retry-predicate'='lookup_miss' +can enable retry on both sync and async lookup, only fixed delay retry strategy is supported currently. + +Options of fixed delay retry strategy: +```gitexclude +'retry-strategy'='fixed_delay' +-- fixed delay duration +'fixed-delay'='10s' +-- max number of retry(counting from the retry operation, if set to '1', then a single lookup process +-- for a specific lookup key will actually execute up to 2 lookup requests) +'max-attempts'='3' +``` + +Example: +1. enable retry on async lookup +```sql +LOOKUP('table'='Customers', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +2. enable retry on sync lookup +```sql +LOOKUP('table'='Customers', 'async'='false', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +If the lookup source only has one capability, then the 'async' mode option can be omitted: + +```sql +LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') +``` + +##### Further Notes + +###### Effect Of Enabling Caching On Retries +[FLIP-221](https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job) adds caching support for lookup source, Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org