Hi, Timo. Thanks for your proposal. This FLIP greatly extends the ease of use of SQL! But I have some questions about this FLIP:
1. How do users register PTF to Flink? It looks like users can use CREATE FUNCTION to register the PTF. 2. Can the input parameter of PTF be a view? If the input parameter is a table, how can the developer of PTF know the schema of the input table? Without type information, I don't know how to extract fields from the Row or RowData. 3. I see the API change mentioned that `getInputChangelogMode` and `getOutputChangelogMode` will be added to FunctionDefinition? Can we just add these two methods to ProcessTableFunction? After all, other functions(e.g. scalar function) don't need these two methods. 4. If an insert-only stream is converted to a changelog stream by the PTF, what is the upsert key of the changelog stream? 5. I see that a new type, DESCRIPTOR, has been added. Can the user declare a type as DESCRIPTOR directly in the DDL, or is the type only available to PTFs? If it is only used for PTFs, do we need to provide the DataTypes#DESCRIPTOR() method to allow the user to declare the type? Or is it just a type for internal use like DistinctType. Also, can you add the conversion relationship between this type and other types? Best, Shengkai David Anderson <da...@alpinegizmo.com> 于2024年11月2日周六 00:03写道: > Timo, thanks for the response. I have a few more questions. > > as mentioned in "Scoping and Simplifications" a PTF will not support > > late events. It will filter them out. We have to solve the late events > > topic at an earlier stage in the SQL pipeline. This is a different FLIP > > discussion. Not every SQL operator should deal with late events in a > > different way. > > > So long as we can someday cleanly handle late events, I'm okay with this. > > Some followup questions: can I apply a PTF to a stream that doesn't have a > time attribute? The section on time and watermarks seems to allow for this, > but it also seems to expect that watermarks are present, regardless. What > if they aren't? I'm wondering if there's a case where there are no > watermarks, and a PTF registers a timer that can never fire. > > And one unrelated question: > > In the CountWithTimeout example, is @StateHint CountState state referring > to the class named Count? This appears to be a typo. > > David > > On Fri, Nov 1, 2024 at 3:03 PM Timo Walther <twal...@apache.org> wrote: > > > Hi David, > > > > as mentioned in "Scoping and Simplifications" a PTF will not support > > late events. It will filter them out. We have to solve the late events > > topic at an earlier stage in the SQL pipeline. This is a different FLIP > > discussion. Not every SQL operators should deal with late events in a > > different way. > > > > > Is there a guarantee that watermarking will be applied upstream of > > the split between the two statements in the resulting job graph? > > > > Yes, the uid will make the PTF unique in the entire job graph. If this > > is not possible (e.g. because the two PTF invocations are used in > > completely different statements and a common subgraph cannot be > > determined), we can throw an errror. > > > > Regards, > > Timo > > > > > > On 01.11.24 12:13, David Anderson wrote: > > >> 3. Change of interfaces for multiple output tables > > >> Currently, I think using a STATEMENT SET should be enough for side > > >> output semantics. I have added an example in section 5.2.3.2 for that. > > > > > > I question whether this really works. Is there a guarantee that > > > watermarking will be applied upstream of the split between the two > > > statements in the resulting job graph? Otherwise, important use cases > > like > > > sending late events to a side output will behave non-deterministically, > > and > > > be useless. > > > > > > David > > > > > > > > > On Fri, Nov 1, 2024 at 10:26 AM Timo Walther <twal...@apache.org> > wrote: > > > > > >> Hi Xuyang, > > >> > > >> thanks for the good questions. > > >> > > >> 1. What happens if the TTLs for these different StateHints are not the > > >> same? > > >> > > >> The eval() fully determines available state and their TTL. Helper > > >> methods such as onTimer() and finish() can references a subset of > > >> declared state. It is not necessary that the helper methods declare > all > > >> state properties one more time. The name should be sufficient and we > > >> should forbid setting additional properties. > > >> > > >> 2. I believe the named arguments introduced in FLIP-387[1] can also be > > >> applied to this ProcessTableFunction, right? > > >> > > >> Absolutely, the PTF actually needs named arguments. Esp for optional > > >> fields such uid or on_time. For forward compatibility, I would even > > >> suggest that PTFs only support named arguments. Not sure if we can > > >> enforce that. > > >> > > >> 3. Will we expose the original RowKind in the eval method's row input? > > >> > > >> Yes, it's likely that only advanced users will take use of that. In > that > > >> case users have to work with Row/RowData. It's likely that rather > > >> build-in functions will make use of this. The default changelog mode > for > > >> both input and output is append. > > >> > > >> 4. Are we allowing users to define both styles simultaneously > > >> > > >> Yes. Context is optional. And state access in helper methods > > >> (finish/onTimer) as well. This reduces the overhead in case a PTF runs > > >> in a container/other process. > > >> > > >> I will update the FLIP to reflect these answers. > > >> > > >> Thanks, > > >> Timo > > >> > > >> > > >> > > >> On 01.11.24 05:10, Xuyang wrote: > > >>> Hi, Timo. > > >>> > > >>> Thank you for this great work! When I previously introduced the > session > > >> window TVF, I was contemplating > > >>> > > >>> how to enable users to define a PTF in SQL. I'm glad to see this work > > >> being discussed and that it has > > >>> > > >>> improved the integration with the DataStream API. > > >>> > > >>> After reading the entire flip, I have a few questions that I hope you > > >> can address. > > >>> > > >>> 1. I noticed that in the example, the same field (e.g., CountState) > can > > >> declare a StateHint in the eval, onTimer, > > >>> > > >>> and finish methods. What happens if the TTLs for these different > > >> StateHints are not the same? > > >>> > > >>> 2. I believe the named arguments introduced in FLIP-387[1] can also > be > > >> applied to this ProcessTableFunction, right? > > >>> > > >>> 3. In our UDAFs, we expect users to provide accumulate and retract > > >> methods to handle input data for +I/+U and -U/-D. > > >>> > > >>> However, in the eval method of a ScalarFunction/UDTF, users do not > have > > >> visibility into the input's RowKind. In the new PTF, > > >>> > > >>> will we expose the original RowKind in the eval method's row input, > > >> allowing users to determine the row's RowKind themselves? > > >>> > > >>> 4. I noticed that in the examples, the eval method sometimes includes > > >> the Context, @StateHint fields, and the input data (Row > > >>> > > >>> input), while other times it only consists of the input data. Are we > > >> allowing users to define both styles simultaneously? > > >>> > > >>> > > >>> > > >>> > > >>> [1] > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> -- > > >>> > > >>> Best! > > >>> Xuyang > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> At 2024-10-31 21:57:37, "Timo Walther" <twal...@apache.org> wrote: > > >>>> Hi everyone, > > >>>> > > >>>> thanks for all the feedback I received so far. I had very healthy > > >>>> discussions with various people both online and offline at Current > and > > >>>> Flink Forward Berlin. The general user responses were also very > > >>>> positive. The FLIP should be ready to start a VOTE thread. > > >>>> > > >>>> This is the last call for feedback. I would start a VOTE tomorrow if > > >>>> there are no objections. Happy to take further feedback during > > >>>> implementation as well. > > >>>> > > >>>> Thanks, > > >>>> Timo > > >>>> > > >>>> On 30.10.24 14:34, Timo Walther wrote: > > >>>>> Hi Jim, > > >>>>> > > >>>>> 3. Multiple output tables > > >>>>> > > >>>>> > Does the target_table need to be specified in the SELECT > clause? > > >>>>> > > >>>>> No. Similar to reading from a regular table. The filter column must > > not > > >>>>> be part of SELECT part. > > >>>>> > > >>>>> > It seems like the two target_table could have separate schemas > > >> defined. > > >>>>> > > >>>>> That is true. The SELECT is responsible to transforms the columns > > into > > >>>>> the target table's schema. The output row of the PTF might be a > union > > >> of > > >>>>> various columns in this case. > > >>>>> > > >>>>> 10. Support for State TTL > > >>>>> > > >>>>> > I'd be strongly in favor of doing any interface / base work we > > >> need in > > >>>>> > the initial implementation so that state size can be managed. > > >>>>> > > >>>>> I agree, State TTL is crucial. I updated the FLIP and added > > interfaces > > >>>>> to StateTypeStrategy and @StateHint. > > >>>>> > > >>>>> Cheers, > > >>>>> Timo > > >>>>> > > >>>>> > > >>>>> > > >>>>> On 23.10.24 17:59, Jim Hughes wrote: > > >>>>>> Hi Timo, > > >>>>>> > > >>>>>> Thank you for the answers. I have a few clarifications inlined. > > >>>>>> > > >>>>>> On Mon, Oct 14, 2024 at 8:07 AM Timo Walther <twal...@apache.org> > > >> wrote: > > >>>>>> > > >>>>>>> 3. Change of interfaces for multiple output tables > > >>>>>>> Currently, I think using a STATEMENT SET should be enough for > side > > >>>>>>> output semantics. I have added an example in section 5.2.3.2 for > > >> that. > > >>>>>>> We are still free to add more methods to Context, let the > function > > >>>>>>> implement additional interfaces or use more code generation > > together > > >>>>>>> with @ArgumentHints. > > >>>>>>> > > >>>>>> > > >>>>>> Does the target_table need to be specified in the SELECT clause? > Or > > >>>>>> could > > >>>>>> it read > > >>>>>> > > >>>>>> EXECUTE STATEMENT SET BEGIN > > >>>>>> INSERT INTO main SELECT a, b FROM > > FunctionWithSideOutput(input => > > >>>>>> data, > > >>>>>> uid = 'only_once') WHERE target_table = 'main'; > > >>>>>> INSERT INTO side SELECT a, b FROM > > FunctionWithSideOutput(input => > > >>>>>> data, > > >>>>>> uid = 'only_once') WHERE target_table = 'side'; > > >>>>>> END; > > >>>>>> > > >>>>>> Separately, for clarity, it seems like the two target_table could > > have > > >>>>>> separate schemas defined. > > >>>>>> > > >>>>>> > > >>>>>>> 10. Support for State TTL > > >>>>>>> Supporting state TTL will be easy. We just need to add a > parameter > > to > > >>>>>>> @StateHint and pass it through. > > >>>>>>> > > >>>>>> > > >>>>>> If PTFs can have state, I'd be strongly in favor of doing any > > >> interface / > > >>>>>> base work we need in the initial implementation so that state size > > >> can be > > >>>>>> managed. If it is just sufficient to have hints in the interface, > > >>>>>> awesome! > > >>>>>> > > >>>>>> Cheers, > > >>>>>> > > >>>>>> Jim > > >>>>>> > > >>>>> > > >> > > >> > > > > > > > >