Hi Timo, Thank you for the well structured and referenced Flip. This looks really useful.
Some thoughts: 1. I wonder what class of scenarios that PTF’s do not do well in and that DataStream API would be preferable/required? Assuming we have PTF Map and List support. 2. I see /** * Registers a timer to be fired when the event time watermark passes the given time. * Replaces an existing timer under the same name. */ void registerOnTime(String name, T time); Does this mean: “Registers a timer. After that the time interval, the watermark is progressed to that time by the framework and the timer is fired.“ 1. When it says “A PTF can still make progress in time without an explicit on_time attribute”, how would it do this? Is business as usual with the watermark generator or is there also something else the PTF can do? 2. When it says “in case of onWatermark the output is rowtime = TimeContext.followingWatermark() - 1” . It would be good to state in the Flip the thinking behind this. 3. I agree with Fabian the currentWatermark and followingWatermark is confusing. 4. On void registerOnPeriodicTime(String name, T relativeTime, Duration period); what is relativeTime? relativeTime sounds like a duration. Kind regards, David. From: Fabian Hüske <fhue...@confluent.io.INVALID> Date: Monday, 4 November 2024 at 15:41 To: dev@flink.apache.org <dev@flink.apache.org> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-440: User-defined SQL operators / ProcessTableFunction (PTF) Hi Timo, Thanks for the detailed and very well structured FLIP document! This is an important feature and will enable many more use-cases for Flink SQL and Table API. I have a few questions / suggestions: 1. "Scoping and Simplifications", "Partition and Order Semantics": "By default, we require a partitioning for tables with set semantics." What does "by default" mean? Will there be a case where we won't require partitioning? What's the non-default case? 2. "Public Interfaces", "ProcessTableFunction": 2.1 Can the `ProcessTableFunction.getKind()` method be `final`? 2.2 "Public Interfaces", "ProcessTableFunction", `OnTimerContext`: Should we also expose the timestamp for which the timer was set? 2.3 (minor) "As" in `Context.getTimeContextAs(Class<T>)` implies IMO that the method returns an instance of T. Maybe rename to `getTimeContext(Class<T>)` or `getTimeContextFor(Class<T>)` 3. "Public Interfaces", "TimeContext": 3.1 `currentTime()`: this method name is a bit confusing IMO. I would say, the "current time" is the WM time, because the WM drives the global event-time clock of the query. I think it would be good, to make it clear that the method returns the event-time column of the current row. 3.2 `currentWatermark()`: should we rather emit `0` when not WM has been received yet? 3.3 `followingWatermark()`: TBH, I find this name quite confusing and it took me a while until I understood its purpose. If I understand correctly, it makes only sense to call it in the `onTimer()` method because only there, we know the next watermark. Would it be possible to rename it to `triggeringWatermark()` and ensure that it is only callable in `onTimer()` and not in `eval()` and `finish()`? 3.4 Are the methods to register periodic timers necessary? How would they be used? How much do we help the user? What becomes easier / less boilerplate code? How much do we confuse them with additional API surface? 4. "Public Interfaces", "TableSemantics": 4.1 rename to `TableArgSemantics`? 4.2 Why are the partitionBy and orderBy column indexes in two-dimensional arrays? Wouldn't simple arrays be sufficient? Using indexes also means that we don't allow partitionBy/orderBy expressions, right? There's a simple work around using CTEs,views or nested queries, but just want to mention it. 4.3 Why are the copartition args not indexes but names? 4.4 rename methods `xxxColumns()` -> `xxxColumnIdxs()`? 5. "Public Interfaces", "TypeInference": 5.1 Could `getStaticArguments()` return an empty list instead of Optional<List<StaticArgument>>? 6. "Public Interfaces", "FunctionDefinition": 6.1 `getInputChangelogMode()` do we need to be able to specify different requirements if a PTF supports multiple inputs? 6.2 `getOutputChangelogMode()` doesn't the mode depend on the input changelog mode? Wouldn't we need to pass this information into the method? 7. "Proposed Changes", "Implementation Details", "Time Semantics" 7.1 "Watermark by default": How is the described behavior different from the standard behavior? All timers fire on watermarks. The only difference is that we cannot set timers to row timestamps but that depends on the developer of the function and not on the framework. I don't disagree with that behavior, I'm just not sure if this needs to be mentioned at all. 7.2 "Output Timestamp": I'm not sure if the output timestamp behavior results in limitations on how PTFs can be used. This is a power-user API and it might make sense to allow developers to chose what timestamp to emit. They should of course make sure that the timestamp is not less than the current watermark. For example our implementation of OVER aggregates could not be implemented without setting explicit timestamps because we only collect rows in `eval()` and emit them in `onTimer()` but preserve the per-row timestamp. 8. "Proposed Changes", "Implementation Details", "Query Evolution" 8.1 I agree that the uid should be optional. However, why not use a generated name (PTF-name + count suffix, UUID) if it isn't set? Then we don't need to block multiple invocations per query which might not be uncommon (UNION ALL multiple PTF branches). Sure, users could just set the uid but do we want to fail a query for that? Finally, I found a few minor mistakes that you might want to fix: 1. "Public Interfaces", "ProcessTableFunction", "The collector is globally available for both eval() , finish(), and onTimer()". Usage of "both" although there are three items listed. 2. "Public Interfaces", "TypeInference": Should the first `table()` method use `EnumSet<ArgTrait> traits` instead of `EnumSet<StaticArgumentTraits> traits`? 3. "Proposed Changes", "Implementation Details", "Query Evolution": Wouldn't the side-output example fail due to the same uids? Finally, I have a question regarding the compatibility with the new async state access patterns. AFAICT, the PTF proposal should work nicely with these new APIs. Is that assessment correct? Thanks, Fabian On Mon, Nov 4, 2024 at 2:41 PM Timo Walther <twal...@apache.org> wrote: > Hi David, Hi Shengkai, > > > can I apply a PTF to a stream that doesn't have a time attribute? > > Yes, time attributes are optional. This is why the > REQUIRES_TIME_ATTRIBUTE argument trait exists. If no on_time has been > specified in the SQL call and the REQUIRES_TIME_ATTRIBUTE trait is not > present, timers will fire on watermark by default. If there is no > watermark present, the timers will not fire - similar to DataStream API. > For built-in functions I suggest that we set the REQUIRES_TIME_ATTRIBUTE > to return early errors. > > > is @StateHint CountState state referring to the class named Count? > > This appears to be a typo. > > Yes it was a typo. Fixed. Thanks! > > > 1. How do users register PTF to Flink? It looks like users can > > use CREATE FUNCTION to register the PTF. > > PTFs behave like scalar or table functions in this regard. You can use > CREATE FUNCTION, TableEnvironment.createFunction, or inline in Table API > using call(PTFClass.class). No special case. > > > 2. Can the input parameter of PTF be a view? If the input parameter > is > a table, how can the developer of PTF know the schema of the input > > table? Without type information, I don't know how to extract fields > > from the Row or RowData. > > Yes, the input parameter can be a table, view or CTE. In theory also a > subquery should work but that might need more work in the Calcite > parser/validator. > > Regarding type information, this is good feedback. I added the type > information to the Context. Under > Context.tableSemanticsFor(arg).getDataType. Built-in functions won't > need this as they can also access the type via getTypeInference in > CallContext. > > > 3. I see the API change mentioned that `getInputChangelogMode` and > > `getOutputChangelogMode` will be added to FunctionDefinition? Can we > > just add these two methods to ProcessTableFunction? After all, other > > functions(e.g. scalar function) don't need these two methods. > > Unfortunately, this is not possible. As mentioned in the FLIP, > "Following the specification of FunctionDefinition: Instances of this > class provide all details necessary to validate a function call and > perform planning.". So everything for planning should be present in > FunctionDefinition. For ScalarFunction and other UDFs we can make these > methods `final`, so it won't be possible to change the append-only. > > > 4. If an insert-only stream is converted to a changelog stream by the > > PTF, what is the upsert key of the changelog stream? > > Upsert keys won't be considered because only retract mode will be > supported. > > > 5. I see that a new type, DESCRIPTOR, has been added. Can the user > > declare a type as DESCRIPTOR directly in the DDL, or is the type only > > available to PTFs? If it is only used for PTFs, do we need to provide > > the DataTypes#DESCRIPTOR() method to allow the user to declare the > > type? Or is it just a type for internal use like DistinctType. Also, > > can you add the conversion relationship between this type and > > other types? > > DESCRIPTOR is a logical type similar to SymbolType. In order to define > input signatures, I thought it makes sense to also expose > DataTypes#DESCRIPTOR() method. It is a bit more public and more useful > than SymbolType. Similar to INTERVAL it should not be used in DDL. It > cannot be casted to other types. I updated the section in the FLIP with > more information. > > Best, > Timo > > > > > > On 03.11.24 07:12, Shengkai Fang wrote: > > Hi, Timo. > > > > Thanks for your proposal. This FLIP greatly extends the ease of use of > SQL! > > But I have some questions about this FLIP: > > > > 1. How do users register PTF to Flink? It looks like users can use CREATE > > FUNCTION to register the PTF. > > > > 2. Can the input parameter of PTF be a view? If the input parameter is a > > table, how can the developer of PTF know the schema of the input table? > > Without type information, I don't know how to extract fields from the Row > > or RowData. > > > > 3. I see the API change mentioned that `getInputChangelogMode` and > > `getOutputChangelogMode` will be added to FunctionDefinition? Can we just > > add these two methods to ProcessTableFunction? After all, other > > functions(e.g. scalar function) don't need these two methods. > > > > 4. If an insert-only stream is converted to a changelog stream by the > PTF, > > what is the upsert key of the changelog stream? > > > > 5. I see that a new type, DESCRIPTOR, has been added. Can the user > declare > > a type as DESCRIPTOR directly in the DDL, or is the type only available > to > > PTFs? If it is only used for PTFs, do we need to provide the > > DataTypes#DESCRIPTOR() method to allow the user to declare the type? Or > is > > it just a type for internal use like DistinctType. Also, can you add the > > conversion relationship between this type and other types? > > > > Best, > > Shengkai > > > > David Anderson <da...@alpinegizmo.com> 于2024年11月2日周六 00:03写道: > > > >> Timo, thanks for the response. I have a few more questions. > >> > >> as mentioned in "Scoping and Simplifications" a PTF will not support > >>> late events. It will filter them out. We have to solve the late events > >>> topic at an earlier stage in the SQL pipeline. This is a different FLIP > >>> discussion. Not every SQL operator should deal with late events in a > >>> different way. > >> > >> > >> So long as we can someday cleanly handle late events, I'm okay with > this. > >> > >> Some followup questions: can I apply a PTF to a stream that doesn't > have a > >> time attribute? The section on time and watermarks seems to allow for > this, > >> but it also seems to expect that watermarks are present, regardless. > What > >> if they aren't? I'm wondering if there's a case where there are no > >> watermarks, and a PTF registers a timer that can never fire. > >> > >> And one unrelated question: > >> > >> In the CountWithTimeout example, is @StateHint CountState state > referring > >> to the class named Count? This appears to be a typo. > >> > >> David > >> > >> On Fri, Nov 1, 2024 at 3:03 PM Timo Walther <twal...@apache.org> wrote: > >> > >>> Hi David, > >>> > >>> as mentioned in "Scoping and Simplifications" a PTF will not support > >>> late events. It will filter them out. We have to solve the late events > >>> topic at an earlier stage in the SQL pipeline. This is a different FLIP > >>> discussion. Not every SQL operators should deal with late events in a > >>> different way. > >>> > >>> > Is there a guarantee that watermarking will be applied upstream of > >>> the split between the two statements in the resulting job graph? > >>> > >>> Yes, the uid will make the PTF unique in the entire job graph. If this > >>> is not possible (e.g. because the two PTF invocations are used in > >>> completely different statements and a common subgraph cannot be > >>> determined), we can throw an errror. > >>> > >>> Regards, > >>> Timo > >>> > >>> > >>> On 01.11.24 12:13, David Anderson wrote: > >>>>> 3. Change of interfaces for multiple output tables > >>>>> Currently, I think using a STATEMENT SET should be enough for side > >>>>> output semantics. I have added an example in section 5.2.3.2 for > that. > >>>> > >>>> I question whether this really works. Is there a guarantee that > >>>> watermarking will be applied upstream of the split between the two > >>>> statements in the resulting job graph? Otherwise, important use cases > >>> like > >>>> sending late events to a side output will behave > non-deterministically, > >>> and > >>>> be useless. > >>>> > >>>> David > >>>> > >>>> > >>>> On Fri, Nov 1, 2024 at 10:26 AM Timo Walther <twal...@apache.org> > >> wrote: > >>>> > >>>>> Hi Xuyang, > >>>>> > >>>>> thanks for the good questions. > >>>>> > >>>>> 1. What happens if the TTLs for these different StateHints are not > the > >>>>> same? > >>>>> > >>>>> The eval() fully determines available state and their TTL. Helper > >>>>> methods such as onTimer() and finish() can references a subset of > >>>>> declared state. It is not necessary that the helper methods declare > >> all > >>>>> state properties one more time. The name should be sufficient and we > >>>>> should forbid setting additional properties. > >>>>> > >>>>> 2. I believe the named arguments introduced in FLIP-387[1] can also > be > >>>>> applied to this ProcessTableFunction, right? > >>>>> > >>>>> Absolutely, the PTF actually needs named arguments. Esp for optional > >>>>> fields such uid or on_time. For forward compatibility, I would even > >>>>> suggest that PTFs only support named arguments. Not sure if we can > >>>>> enforce that. > >>>>> > >>>>> 3. Will we expose the original RowKind in the eval method's row > input? > >>>>> > >>>>> Yes, it's likely that only advanced users will take use of that. In > >> that > >>>>> case users have to work with Row/RowData. It's likely that rather > >>>>> build-in functions will make use of this. The default changelog mode > >> for > >>>>> both input and output is append. > >>>>> > >>>>> 4. Are we allowing users to define both styles simultaneously > >>>>> > >>>>> Yes. Context is optional. And state access in helper methods > >>>>> (finish/onTimer) as well. This reduces the overhead in case a PTF > runs > >>>>> in a container/other process. > >>>>> > >>>>> I will update the FLIP to reflect these answers. > >>>>> > >>>>> Thanks, > >>>>> Timo > >>>>> > >>>>> > >>>>> > >>>>> On 01.11.24 05:10, Xuyang wrote: > >>>>>> Hi, Timo. > >>>>>> > >>>>>> Thank you for this great work! When I previously introduced the > >> session > >>>>> window TVF, I was contemplating > >>>>>> > >>>>>> how to enable users to define a PTF in SQL. I'm glad to see this > work > >>>>> being discussed and that it has > >>>>>> > >>>>>> improved the integration with the DataStream API. > >>>>>> > >>>>>> After reading the entire flip, I have a few questions that I hope > you > >>>>> can address. > >>>>>> > >>>>>> 1. I noticed that in the example, the same field (e.g., CountState) > >> can > >>>>> declare a StateHint in the eval, onTimer, > >>>>>> > >>>>>> and finish methods. What happens if the TTLs for these different > >>>>> StateHints are not the same? > >>>>>> > >>>>>> 2. I believe the named arguments introduced in FLIP-387[1] can also > >> be > >>>>> applied to this ProcessTableFunction, right? > >>>>>> > >>>>>> 3. In our UDAFs, we expect users to provide accumulate and retract > >>>>> methods to handle input data for +I/+U and -U/-D. > >>>>>> > >>>>>> However, in the eval method of a ScalarFunction/UDTF, users do not > >> have > >>>>> visibility into the input's RowKind. In the new PTF, > >>>>>> > >>>>>> will we expose the original RowKind in the eval method's row input, > >>>>> allowing users to determine the row's RowKind themselves? > >>>>>> > >>>>>> 4. I noticed that in the examples, the eval method sometimes > includes > >>>>> the Context, @StateHint fields, and the input data (Row > >>>>>> > >>>>>> input), while other times it only consists of the input data. Are we > >>>>> allowing users to define both styles simultaneously? > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> [1] > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> > >>>>>> Best! > >>>>>> Xuyang > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> At 2024-10-31 21:57:37, "Timo Walther" <twal...@apache.org> wrote: > >>>>>>> Hi everyone, > >>>>>>> > >>>>>>> thanks for all the feedback I received so far. I had very healthy > >>>>>>> discussions with various people both online and offline at Current > >> and > >>>>>>> Flink Forward Berlin. The general user responses were also very > >>>>>>> positive. The FLIP should be ready to start a VOTE thread. > >>>>>>> > >>>>>>> This is the last call for feedback. I would start a VOTE tomorrow > if > >>>>>>> there are no objections. Happy to take further feedback during > >>>>>>> implementation as well. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Timo > >>>>>>> > >>>>>>> On 30.10.24 14:34, Timo Walther wrote: > >>>>>>>> Hi Jim, > >>>>>>>> > >>>>>>>> 3. Multiple output tables > >>>>>>>> > >>>>>>>> > Does the target_table need to be specified in the SELECT > >> clause? > >>>>>>>> > >>>>>>>> No. Similar to reading from a regular table. The filter column > must > >>> not > >>>>>>>> be part of SELECT part. > >>>>>>>> > >>>>>>>> > It seems like the two target_table could have separate > schemas > >>>>> defined. > >>>>>>>> > >>>>>>>> That is true. The SELECT is responsible to transforms the columns > >>> into > >>>>>>>> the target table's schema. The output row of the PTF might be a > >> union > >>>>> of > >>>>>>>> various columns in this case. > >>>>>>>> > >>>>>>>> 10. Support for State TTL > >>>>>>>> > >>>>>>>> > I'd be strongly in favor of doing any interface / base work > we > >>>>> need in > >>>>>>>> > the initial implementation so that state size can be > managed. > >>>>>>>> > >>>>>>>> I agree, State TTL is crucial. I updated the FLIP and added > >>> interfaces > >>>>>>>> to StateTypeStrategy and @StateHint. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 23.10.24 17:59, Jim Hughes wrote: > >>>>>>>>> Hi Timo, > >>>>>>>>> > >>>>>>>>> Thank you for the answers. I have a few clarifications inlined. > >>>>>>>>> > >>>>>>>>> On Mon, Oct 14, 2024 at 8:07 AM Timo Walther <twal...@apache.org > > > >>>>> wrote: > >>>>>>>>> > >>>>>>>>>> 3. Change of interfaces for multiple output tables > >>>>>>>>>> Currently, I think using a STATEMENT SET should be enough for > >> side > >>>>>>>>>> output semantics. I have added an example in section 5.2.3.2 for > >>>>> that. > >>>>>>>>>> We are still free to add more methods to Context, let the > >> function > >>>>>>>>>> implement additional interfaces or use more code generation > >>> together > >>>>>>>>>> with @ArgumentHints. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> Does the target_table need to be specified in the SELECT clause? > >> Or > >>>>>>>>> could > >>>>>>>>> it read > >>>>>>>>> > >>>>>>>>> EXECUTE STATEMENT SET BEGIN > >>>>>>>>> INSERT INTO main SELECT a, b FROM > >>> FunctionWithSideOutput(input => > >>>>>>>>> data, > >>>>>>>>> uid = 'only_once') WHERE target_table = 'main'; > >>>>>>>>> INSERT INTO side SELECT a, b FROM > >>> FunctionWithSideOutput(input => > >>>>>>>>> data, > >>>>>>>>> uid = 'only_once') WHERE target_table = 'side'; > >>>>>>>>> END; > >>>>>>>>> > >>>>>>>>> Separately, for clarity, it seems like the two target_table could > >>> have > >>>>>>>>> separate schemas defined. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> 10. Support for State TTL > >>>>>>>>>> Supporting state TTL will be easy. We just need to add a > >> parameter > >>> to > >>>>>>>>>> @StateHint and pass it through. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> If PTFs can have state, I'd be strongly in favor of doing any > >>>>> interface / > >>>>>>>>> base work we need in the initial implementation so that state > size > >>>>> can be > >>>>>>>>> managed. If it is just sufficient to have hints in the > interface, > >>>>>>>>> awesome! > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> > >>>>>>>>> Jim > >>>>>>>>> > >>>>>>>> > >>>>> > >>>>> > >>>> > >>> > >>> > >> > > > > Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: Building C, IBM Hursley Office, Hursley Park Road, Winchester, Hampshire SO21 2JN