Hi Jim,

thanks for taking a deep look into the FLIP. Happy to answer your questions:

1. Pass through semantics and partition columns
We won't support pass through semantics in the first version. But I don't think that columns are duplicated unless they use a different name. E.g. the PTF can of course emit the partition column as part of the "payload" output.

2. Calcite tickets for missing features
We will create the corresponding tickets once the FLIP is approved and help contributing those back.

3. Change of interfaces for multiple output tables
Currently, I think using a STATEMENT SET should be enough for side output semantics. I have added an example in section 5.2.3.2 for that. We are still free to add more methods to Context, let the function implement additional interfaces or use more code generation together with @ArgumentHints.

4. More explanation for empty semantics
I added: "Empty semantics are required to model outer joins or returning values for empty input (e.g. 0 for a COUNT(*) on an empty table). A PTFs finish() method can still emit rows and thus output data even if no input record was seen. The finish() method needs to access state to check whether input was present.".

5. Adding Pass Through Semantics later
That should be easy to do. As it is a pure optimizer thing and won't required UDF interface changes other than adding another ArgumentTrait.

6. Typo
Fixed.

7. Why not upsert?
Added "Upsert mode is only an optimization. No pre-mature optimization in the first version. Even for the Kafka connector we don't trust the input and convert to retract internally. Passing upsert keys and let users deal with it would complicate the interfaces. It's also difficult to tell the optimizer back whether upsert keys could have been preseved after the PTF. We can still offer an additional interface that functions can implement for power users. But retract should get the job done.".

8. Updates to SESSION window
For the "timecol", we can implement this backwards compatible i.e. support both names. The reordering of columns might cause issues for SELECT * queries, but CompiledPlan should not be affected by this. At least we should ensure that this is the case.

9. Implement existing window TVFs
It will be possible to implement custom window functions. They might not be as performant as built-in ones. E.g. not leverage local/global aggregation. But at least we make complex use case possible.

10. Support for State TTL
Supporting state TTL will be easy. We just need to add a parameter to @StateHint and pass it through.

Thanks,
Timo


On 07.10.24 03:46, Jim Hughes wrote:
Hi Timo,

After thinking about it more today, I have some additionals questions /
topics:

9.  For the existing window TVFs, GROUP BYs over the window start and end
are optimized to return only one result per group.  With the proposal as
is, would it be possible to implement a replacement/copy of the temporal
windows which is optimized in a similar way?  (The statement "The optimizer
optimizes around PTFs." makes me think that this would not be possible.)
10. How hard is it to have support for State TTL with the initial
implementation?  From the list of future work, that seems to be the first
one / most important that I'd find missing.

Cheers,

Jim

On Tue, Oct 1, 2024 at 5:02 PM Jim Hughes <jhug...@confluent.io> wrote:

Hi Timo,

Thanks for the FLIP!  Also, thanks for saying that this has been in your
head for a few years; there is a ton here.

1. For the pass through semantics, if the partition columns are already
listed in the pass through column, they are not duplicated right?
2. A number of places mention that Calcite doesn't support XYZ.  Do we
have tickets for that work?
3. I like the scoping section.  Relative to "More than two tables is out
of scope for this FLIP.", will need to change any of the interfaces in a
major way to support multiple output tables in the future?
4. The section "Empty Semantics" under Scoping is a bit terse and I'll
admit that I don't understand it.  Could you say more in the FLIP there?
5. For Pass Through Semantics, will adding PASS THROUGH later be easy to
do?  Or is there some reason to avoid it completely?
6. nit: Under `TimedLastValue`, there is a line which is copied from the
above "The on_time descriptor takes two arguments in this case to forward
the time attributes of each side."
7. Will upsert mode be possible later?  Can you say more about why upsert
is not supported?  (I can guess, but it seems like a brief discussion in
the FLIP would be useful.)
8. In terms of the two bullets at the end migration plan, I am for both
changing the order of SESSION window colums and changing the name from
TIMECOL to on_time (or support both names?).  Is there any downside to
doing so?

Thanks,

Jim

On Mon, Sep 23, 2024 at 6:38 PM Timo Walther <twal...@apache.org> wrote:

Hi everyone,

I'm super excited to start a discussion about FLIP-440: User-defined SQL
operators / ProcessTableFunction (PTF) [1].

This FLIP has been in my head for many years and Flink 2.0 is a good
time to open up Flink SQL to a new category of use cases. Following the
principle "Make simple things easy, and complex ones possible", I would
like to propose a new UDF type "ProcessTableFunction" that gives users
access to Flink's primitives for advanced stream processing. This should
unblock people when hitting shortcomings in Flink SQL and expand the
scope of SQL from analytical to more event-driven applications.

This proposal is by no means a full replacement of DataStream API.
DataStream API will always provide the full power of Flink whereas PTFs
provide at least a necessary toolbox to cover ~80% of all use cases
without leaving the SQL ecosystem. The SQL ecosystem is a great
foundation with well-defined type system, catalog integration, CDC
support, and built-in functions/operators. PTFs complete it by offering
a standard compliant extension point.

Looking forward to your feedback.

Thanks,
Timo

[1] https://cwiki.apache.org/confluence/x/pQnPEQ




Reply via email to