Hey Timo, I think this will be a very valuable addition to Flink SQL. I think it's necessary to drive SQL and Table API adoption further since it can help stateful upgrades. This will close a few gaps between SQL and DataStream enabling more people to use SQL type system and catalog ecosystem which is not available in DS.
+1 for the proposal. Best, Dawid On Mon, 14 Oct 2024 at 14:07, Timo Walther <twal...@apache.org> wrote: > Hi Jim, > > thanks for taking a deep look into the FLIP. Happy to answer your > questions: > > 1. Pass through semantics and partition columns > We won't support pass through semantics in the first version. But I > don't think that columns are duplicated unless they use a different > name. E.g. the PTF can of course emit the partition column as part of > the "payload" output. > > 2. Calcite tickets for missing features > We will create the corresponding tickets once the FLIP is approved and > help contributing those back. > > 3. Change of interfaces for multiple output tables > Currently, I think using a STATEMENT SET should be enough for side > output semantics. I have added an example in section 5.2.3.2 for that. > We are still free to add more methods to Context, let the function > implement additional interfaces or use more code generation together > with @ArgumentHints. > > 4. More explanation for empty semantics > I added: "Empty semantics are required to model outer joins or returning > values for empty input > (e.g. 0 for a COUNT(*) on an empty table). A PTFs finish() method can > still emit rows and thus output data even if no input record was seen. > The finish() method needs to access state to check whether input was > present.". > > 5. Adding Pass Through Semantics later > That should be easy to do. As it is a pure optimizer thing and won't > required UDF interface changes other than adding another ArgumentTrait. > > 6. Typo > Fixed. > > 7. Why not upsert? > Added "Upsert mode is only an optimization. No pre-mature optimization > in the first version. > Even for the Kafka connector we don't trust the input and convert to > retract internally. > Passing upsert keys and let users deal with it would complicate the > interfaces. It's also difficult to tell the optimizer back whether > upsert keys could have been preseved after the PTF. > We can still offer an additional interface that functions can implement > for power users. But retract should get the job done.". > > 8. Updates to SESSION window > For the "timecol", we can implement this backwards compatible i.e. > support both names. The reordering of columns might cause issues for > SELECT * queries, but CompiledPlan should not be affected by this. At > least we should ensure that this is the case. > > 9. Implement existing window TVFs > It will be possible to implement custom window functions. They might not > be as performant as built-in ones. E.g. not leverage local/global > aggregation. But at least we make complex use case possible. > > 10. Support for State TTL > Supporting state TTL will be easy. We just need to add a parameter to > @StateHint and pass it through. > > Thanks, > Timo > > > On 07.10.24 03:46, Jim Hughes wrote: > > Hi Timo, > > > > After thinking about it more today, I have some additionals questions / > > topics: > > > > 9. For the existing window TVFs, GROUP BYs over the window start and end > > are optimized to return only one result per group. With the proposal as > > is, would it be possible to implement a replacement/copy of the temporal > > windows which is optimized in a similar way? (The statement "The > optimizer > > optimizes around PTFs." makes me think that this would not be possible.) > > 10. How hard is it to have support for State TTL with the initial > > implementation? From the list of future work, that seems to be the first > > one / most important that I'd find missing. > > > > Cheers, > > > > Jim > > > > On Tue, Oct 1, 2024 at 5:02 PM Jim Hughes <jhug...@confluent.io> wrote: > > > >> Hi Timo, > >> > >> Thanks for the FLIP! Also, thanks for saying that this has been in your > >> head for a few years; there is a ton here. > >> > >> 1. For the pass through semantics, if the partition columns are already > >> listed in the pass through column, they are not duplicated right? > >> 2. A number of places mention that Calcite doesn't support XYZ. Do we > >> have tickets for that work? > >> 3. I like the scoping section. Relative to "More than two tables is out > >> of scope for this FLIP.", will need to change any of the interfaces in a > >> major way to support multiple output tables in the future? > >> 4. The section "Empty Semantics" under Scoping is a bit terse and I'll > >> admit that I don't understand it. Could you say more in the FLIP there? > >> 5. For Pass Through Semantics, will adding PASS THROUGH later be easy to > >> do? Or is there some reason to avoid it completely? > >> 6. nit: Under `TimedLastValue`, there is a line which is copied from the > >> above "The on_time descriptor takes two arguments in this case to > forward > >> the time attributes of each side." > >> 7. Will upsert mode be possible later? Can you say more about why > upsert > >> is not supported? (I can guess, but it seems like a brief discussion in > >> the FLIP would be useful.) > >> 8. In terms of the two bullets at the end migration plan, I am for both > >> changing the order of SESSION window colums and changing the name from > >> TIMECOL to on_time (or support both names?). Is there any downside to > >> doing so? > >> > >> Thanks, > >> > >> Jim > >> > >> On Mon, Sep 23, 2024 at 6:38 PM Timo Walther <twal...@apache.org> > wrote: > >> > >>> Hi everyone, > >>> > >>> I'm super excited to start a discussion about FLIP-440: User-defined > SQL > >>> operators / ProcessTableFunction (PTF) [1]. > >>> > >>> This FLIP has been in my head for many years and Flink 2.0 is a good > >>> time to open up Flink SQL to a new category of use cases. Following the > >>> principle "Make simple things easy, and complex ones possible", I would > >>> like to propose a new UDF type "ProcessTableFunction" that gives users > >>> access to Flink's primitives for advanced stream processing. This > should > >>> unblock people when hitting shortcomings in Flink SQL and expand the > >>> scope of SQL from analytical to more event-driven applications. > >>> > >>> This proposal is by no means a full replacement of DataStream API. > >>> DataStream API will always provide the full power of Flink whereas PTFs > >>> provide at least a necessary toolbox to cover ~80% of all use cases > >>> without leaving the SQL ecosystem. The SQL ecosystem is a great > >>> foundation with well-defined type system, catalog integration, CDC > >>> support, and built-in functions/operators. PTFs complete it by offering > >>> a standard compliant extension point. > >>> > >>> Looking forward to your feedback. > >>> > >>> Thanks, > >>> Timo > >>> > >>> [1] https://cwiki.apache.org/confluence/x/pQnPEQ > >>> > >> > > > >