Hi everyone,

great to see that this discussion gains momentum. Let me answer your questions below:

@Hao

1. SEARCH_KEY planner representation and options

Yes, internally nothing in the planner should change. The PTFs are just syntactic sugar to rules and nodes that already exist. Regarding the options, we already support passing options via lookup hints. See also [1]. The option keys are in sync with the hints, so currently I'm not proposing any new functionality. If there is a need, I'm sure we can extend the existing interfaces.

2. Do we need to introduce a `DataStream` resource in SQL first?

Clear no. We want to stay SQL compliant. The result remains a table where the changeflag become an additional column in the table. All other operations on tables should still be possible.

@Shengkai:

1. Timing of Option Consumption

The planner is able to consume these options at an early stage. We can and should enforce literals at this location, so the map can be accessed via type inference. Similar to CallContext#getArgumentValue().

2. Column Name Conflicts in SEARCH_KEY

The resolution logic should be similar to SystemTypeInference [2]. Thus, conflicting columns can be named `value` and `value0`.

3. Details on Correlated PTFs

I'm not suggesting a user-defined correlated PTF. Similar to how window TVF are "built-in PTFs" (and we added user-defined PTF later), the correlated PTFs only exist in planner. They actually already exist today if you use a PTF and LATERAL. They will be translated into a LogicalTableScan and LogicalCorrelate, but fail at code generation which only support regular table functions.

So implementation-wise correlated PTFs are just a planner/optimizer/RelNode translation step. I will try to make this clearer in the FLIP.

4. Specifying Literal Values in SEARCH_KEY Function

Can you give an example for a complex temporal condition?

@Lincoln:

1. Naming of the SEARCH_KEY

I don't have a strong opinion on naming here. My biggest concern was that both vector search or full text search are also a LOOKUP into a different system. So I wanted to focus on what we are looking for, which is KEY, VECTOR, or TEXT. We can also name it LOOKUP_KEY, LOOKUP_VECTOR, LOOKUP_TEXT. What do you think?

Thanks,
Timo

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#lookup [2] https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java#L240

On 15.04.25 07:46, Lincoln Lee wrote:
+1 for the flip, the snapshot, changelog related ptfs are nice addition
to flink sql!

I have a question about the naming of the SEARCH_KEY, considering
that it is a simplified alternative to lookup join, and the word 'search'
may be more likely to remind users of keyword searching which is a bit
different from joins in sql, would it be better to consider following the
naming of lookup, e.g.,
```
SELECT *
FROM
   t1,
   LATERAL LOOKUP(
     table => dim,
     key => DESCRIPTOR(k1,k2),
     t1, t2...)
```


Best,
Lincoln Lee


Shengkai Fang <fskm...@gmail.com> 于2025年4月15日周二 10:12写道:

Thanks for the FLIP, it helps a lot for us to develop features like
VECTOR_SEARCH. But I have some questions about the FLIP:

1. Timing of Option Consumption for SEARCH_KEY Parameters
For the FOR SYSTEM_TIME AS OF syntax, the planner leverages hints and
catalog tables to load the table scan during the toRel phase. However, if
users use the SEARCH_KEY function, is the planner able to consume these
options at the same early stage?

2.Handling Column Name Conflicts in SEARCH_KEY Output Schema
What is the output schema behavior for the SEARCH_KEY function when the
left and right tables have columns with conflicting names?
How can users resolve these name conflicts to ensure unambiguous access to
the desired columns?

3. Details on Correlated PTFs
Could you elaborate on correlated PTFs? What is the API design for
implementing correlated PTFs? How does a PTF retrieve the required model
and lookup function during execution?

4. Specifying Literal Values in SEARCH_KEY Function
How can users include literal values in the SEARCH_KEY function parameters?
The FOR SYSTEM_TIME AS OF syntax allows users to define complex temporal
conditions. Does SEARCH_KEY support equivalent flexibility for specifying
dynamic or literal values in its parameters?

Best,
Shengkai



Hao Li <h...@confluent.io.invalid> 于2025年4月3日周四 00:12写道:

Hi Timo,

Any question I have is what's the SEARCH_KEY result schema you have in
mind? Can it output multiple rows for every row in the left table or it
needs to pack the result in a single row as an array?

Thanks,
Hao

On Mon, Mar 24, 2025 at 10:20 AM Hao Li <h...@confluent.io> wrote:

Thanks Timo for the FLIP! This is a great improvement to the FLINK sql
syntax around tables. I have two clarification questions:

1. For SEARCH_KEY
```
SELECT *
FROM
   t_other,
   LATERAL SEARCH_KEY(
     input => t,
     on_key => DESCRIPTOR(k),
     lookup => t_other.name,
     options => MAP[
       'async', 'true',
       'retry-predicate', 'lookup_miss',
       'retry-strategy', 'fixed_delay',
       'fixed-delay'='10s'
     ]
   )
```
Table `t` needs to be an existing `LookupTableSource` [1], right? And
we
will rewrite it to `StreamPhysicalLookupJoin`  [2] or similar operator
during the physical optimization phase.
Also to support passing options, we need to extend `LookupContext` [3]
to
have a `getOptions` or `getRuntimeOptions` method?

2. For FROM_CHANGELOG
```
SELECT * FROM FROM_CHANGELOG(s) AS t;
```
Do we need to introduce a `DataStream` resource in sql first?


Hao




[1]



https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java
[2]



https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala#L41
[3]



https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L82


On Fri, Mar 21, 2025 at 6:25 AM Timo Walther <twal...@apache.org>
wrote:

Hi everyone,

I would like to start a discussion about FLIP-517: Better Handling of
Dynamic Table Primitives with PTFs [1].

In the past months, I have spent a significant amount of time with SQL
semantics and the SQL standard around PTFs, when designing and
implementing FLIP-440 [2]. For those of you that have not taken a look
into the standard, the concept of Polymorphic Table Functions (PTF)
enables syntax for implementing custom SQL operators. In my opinion,
they are kind of a revolution in the SQL language. PTFs can take
scalar
values, tables, models (in Flink), and column lists as arguments. With
these primitives, we can further evolve shortcomings in the Flink SQL
language by leveraging syntax and semantics.

I would like introduce a couple of built-in PTFs with the goal to make
the handling of dynamic tables easier for users. Once users understand
how a PTF works, they can easily select from a list of functions to
approach a table for snapshots, changelogs, or searching.

The FLIP proposes:

SNAPSHOT()
SEARCH_KEY()
TO_CHANGELOG()
FROM_CHANGELOG()

I'm aware that this is a delicate topic, and might lead to
controversial
discussions. I hope with concise naming and syntax the benefit over
the
existing syntax becomes clear.

There are more useful PTFs to come, but those are the ones that I
currently see as the most fundamental ones to tell a round story
around
Flink SQL.

Looking forward to your feedback.

Thanks,
Timo

[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-517%3A+Better+Handling+of+Dynamic+Table+Primitives+with+PTFs
[2]


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=298781093






Reply via email to