Timo, thanks for the response. I have a few more questions. as mentioned in "Scoping and Simplifications" a PTF will not support > late events. It will filter them out. We have to solve the late events > topic at an earlier stage in the SQL pipeline. This is a different FLIP > discussion. Not every SQL operator should deal with late events in a > different way.
So long as we can someday cleanly handle late events, I'm okay with this. Some followup questions: can I apply a PTF to a stream that doesn't have a time attribute? The section on time and watermarks seems to allow for this, but it also seems to expect that watermarks are present, regardless. What if they aren't? I'm wondering if there's a case where there are no watermarks, and a PTF registers a timer that can never fire. And one unrelated question: In the CountWithTimeout example, is @StateHint CountState state referring to the class named Count? This appears to be a typo. David On Fri, Nov 1, 2024 at 3:03 PM Timo Walther <twal...@apache.org> wrote: > Hi David, > > as mentioned in "Scoping and Simplifications" a PTF will not support > late events. It will filter them out. We have to solve the late events > topic at an earlier stage in the SQL pipeline. This is a different FLIP > discussion. Not every SQL operators should deal with late events in a > different way. > > > Is there a guarantee that watermarking will be applied upstream of > the split between the two statements in the resulting job graph? > > Yes, the uid will make the PTF unique in the entire job graph. If this > is not possible (e.g. because the two PTF invocations are used in > completely different statements and a common subgraph cannot be > determined), we can throw an errror. > > Regards, > Timo > > > On 01.11.24 12:13, David Anderson wrote: > >> 3. Change of interfaces for multiple output tables > >> Currently, I think using a STATEMENT SET should be enough for side > >> output semantics. I have added an example in section 5.2.3.2 for that. > > > > I question whether this really works. Is there a guarantee that > > watermarking will be applied upstream of the split between the two > > statements in the resulting job graph? Otherwise, important use cases > like > > sending late events to a side output will behave non-deterministically, > and > > be useless. > > > > David > > > > > > On Fri, Nov 1, 2024 at 10:26 AM Timo Walther <twal...@apache.org> wrote: > > > >> Hi Xuyang, > >> > >> thanks for the good questions. > >> > >> 1. What happens if the TTLs for these different StateHints are not the > >> same? > >> > >> The eval() fully determines available state and their TTL. Helper > >> methods such as onTimer() and finish() can references a subset of > >> declared state. It is not necessary that the helper methods declare all > >> state properties one more time. The name should be sufficient and we > >> should forbid setting additional properties. > >> > >> 2. I believe the named arguments introduced in FLIP-387[1] can also be > >> applied to this ProcessTableFunction, right? > >> > >> Absolutely, the PTF actually needs named arguments. Esp for optional > >> fields such uid or on_time. For forward compatibility, I would even > >> suggest that PTFs only support named arguments. Not sure if we can > >> enforce that. > >> > >> 3. Will we expose the original RowKind in the eval method's row input? > >> > >> Yes, it's likely that only advanced users will take use of that. In that > >> case users have to work with Row/RowData. It's likely that rather > >> build-in functions will make use of this. The default changelog mode for > >> both input and output is append. > >> > >> 4. Are we allowing users to define both styles simultaneously > >> > >> Yes. Context is optional. And state access in helper methods > >> (finish/onTimer) as well. This reduces the overhead in case a PTF runs > >> in a container/other process. > >> > >> I will update the FLIP to reflect these answers. > >> > >> Thanks, > >> Timo > >> > >> > >> > >> On 01.11.24 05:10, Xuyang wrote: > >>> Hi, Timo. > >>> > >>> Thank you for this great work! When I previously introduced the session > >> window TVF, I was contemplating > >>> > >>> how to enable users to define a PTF in SQL. I'm glad to see this work > >> being discussed and that it has > >>> > >>> improved the integration with the DataStream API. > >>> > >>> After reading the entire flip, I have a few questions that I hope you > >> can address. > >>> > >>> 1. I noticed that in the example, the same field (e.g., CountState) can > >> declare a StateHint in the eval, onTimer, > >>> > >>> and finish methods. What happens if the TTLs for these different > >> StateHints are not the same? > >>> > >>> 2. I believe the named arguments introduced in FLIP-387[1] can also be > >> applied to this ProcessTableFunction, right? > >>> > >>> 3. In our UDAFs, we expect users to provide accumulate and retract > >> methods to handle input data for +I/+U and -U/-D. > >>> > >>> However, in the eval method of a ScalarFunction/UDTF, users do not have > >> visibility into the input's RowKind. In the new PTF, > >>> > >>> will we expose the original RowKind in the eval method's row input, > >> allowing users to determine the row's RowKind themselves? > >>> > >>> 4. I noticed that in the examples, the eval method sometimes includes > >> the Context, @StateHint fields, and the input data (Row > >>> > >>> input), while other times it only consists of the input data. Are we > >> allowing users to define both styles simultaneously? > >>> > >>> > >>> > >>> > >>> [1] > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> -- > >>> > >>> Best! > >>> Xuyang > >>> > >>> > >>> > >>> > >>> > >>> At 2024-10-31 21:57:37, "Timo Walther" <twal...@apache.org> wrote: > >>>> Hi everyone, > >>>> > >>>> thanks for all the feedback I received so far. I had very healthy > >>>> discussions with various people both online and offline at Current and > >>>> Flink Forward Berlin. The general user responses were also very > >>>> positive. The FLIP should be ready to start a VOTE thread. > >>>> > >>>> This is the last call for feedback. I would start a VOTE tomorrow if > >>>> there are no objections. Happy to take further feedback during > >>>> implementation as well. > >>>> > >>>> Thanks, > >>>> Timo > >>>> > >>>> On 30.10.24 14:34, Timo Walther wrote: > >>>>> Hi Jim, > >>>>> > >>>>> 3. Multiple output tables > >>>>> > >>>>> > Does the target_table need to be specified in the SELECT clause? > >>>>> > >>>>> No. Similar to reading from a regular table. The filter column must > not > >>>>> be part of SELECT part. > >>>>> > >>>>> > It seems like the two target_table could have separate schemas > >> defined. > >>>>> > >>>>> That is true. The SELECT is responsible to transforms the columns > into > >>>>> the target table's schema. The output row of the PTF might be a union > >> of > >>>>> various columns in this case. > >>>>> > >>>>> 10. Support for State TTL > >>>>> > >>>>> > I'd be strongly in favor of doing any interface / base work we > >> need in > >>>>> > the initial implementation so that state size can be managed. > >>>>> > >>>>> I agree, State TTL is crucial. I updated the FLIP and added > interfaces > >>>>> to StateTypeStrategy and @StateHint. > >>>>> > >>>>> Cheers, > >>>>> Timo > >>>>> > >>>>> > >>>>> > >>>>> On 23.10.24 17:59, Jim Hughes wrote: > >>>>>> Hi Timo, > >>>>>> > >>>>>> Thank you for the answers. I have a few clarifications inlined. > >>>>>> > >>>>>> On Mon, Oct 14, 2024 at 8:07 AM Timo Walther <twal...@apache.org> > >> wrote: > >>>>>> > >>>>>>> 3. Change of interfaces for multiple output tables > >>>>>>> Currently, I think using a STATEMENT SET should be enough for side > >>>>>>> output semantics. I have added an example in section 5.2.3.2 for > >> that. > >>>>>>> We are still free to add more methods to Context, let the function > >>>>>>> implement additional interfaces or use more code generation > together > >>>>>>> with @ArgumentHints. > >>>>>>> > >>>>>> > >>>>>> Does the target_table need to be specified in the SELECT clause? Or > >>>>>> could > >>>>>> it read > >>>>>> > >>>>>> EXECUTE STATEMENT SET BEGIN > >>>>>> INSERT INTO main SELECT a, b FROM > FunctionWithSideOutput(input => > >>>>>> data, > >>>>>> uid = 'only_once') WHERE target_table = 'main'; > >>>>>> INSERT INTO side SELECT a, b FROM > FunctionWithSideOutput(input => > >>>>>> data, > >>>>>> uid = 'only_once') WHERE target_table = 'side'; > >>>>>> END; > >>>>>> > >>>>>> Separately, for clarity, it seems like the two target_table could > have > >>>>>> separate schemas defined. > >>>>>> > >>>>>> > >>>>>>> 10. Support for State TTL > >>>>>>> Supporting state TTL will be easy. We just need to add a parameter > to > >>>>>>> @StateHint and pass it through. > >>>>>>> > >>>>>> > >>>>>> If PTFs can have state, I'd be strongly in favor of doing any > >> interface / > >>>>>> base work we need in the initial implementation so that state size > >> can be > >>>>>> managed. If it is just sufficient to have hints in the interface, > >>>>>> awesome! > >>>>>> > >>>>>> Cheers, > >>>>>> > >>>>>> Jim > >>>>>> > >>>>> > >> > >> > > > >