Hi, Timo.

Thanks for your proposal. This FLIP greatly extends the ease of use of SQL!
But I have some questions about this FLIP:

1. How do users register PTF to Flink? It looks like users can use CREATE
FUNCTION to register the PTF.

2. Can the input parameter of PTF be a view? If the input parameter is a
table, how can the developer of PTF know the schema of the input table?
Without type information, I don't know how to extract fields from the Row
or RowData.

3. I see the API change mentioned that `getInputChangelogMode` and
`getOutputChangelogMode` will be added to FunctionDefinition? Can we just
add these two methods to ProcessTableFunction? After all, other
functions(e.g. scalar function) don't need these two methods.

4. If an insert-only stream is converted to a changelog stream by the PTF,
what is the upsert key of the changelog stream?

5. I see that a new type, DESCRIPTOR, has been added. Can the user declare
a type as DESCRIPTOR directly in the DDL, or is the type only available to
PTFs? If it is only used for PTFs, do we need to provide the
DataTypes#DESCRIPTOR() method to allow the user to declare the type? Or is
it just a type for internal use like DistinctType. Also, can you add the
conversion relationship between this type and other types?

Best,
Shengkai

David Anderson <da...@alpinegizmo.com> 于2024年11月2日周六 00:03写道:

> Timo, thanks for the response. I have a few more questions.
>
> as mentioned in "Scoping and Simplifications" a PTF will not support
> > late events. It will filter them out. We have to solve the late events
> > topic at an earlier stage in the SQL pipeline. This is a different FLIP
> > discussion. Not every SQL operator should deal with late events in a
> > different way.
>
>
> So long as we can someday cleanly handle late events, I'm okay with this.
>
> Some followup questions: can I apply a PTF to a stream that doesn't have a
> time attribute? The section on time and watermarks seems to allow for this,
> but it also seems to expect that watermarks are present, regardless. What
> if they aren't? I'm wondering if there's a case where there are no
> watermarks, and a PTF registers a timer that can never fire.
>
> And one unrelated question:
>
> In the CountWithTimeout example, is @StateHint CountState state referring
> to the class named Count? This appears to be a typo.
>
> David
>
> On Fri, Nov 1, 2024 at 3:03 PM Timo Walther <twal...@apache.org> wrote:
>
> > Hi David,
> >
> > as mentioned in "Scoping and Simplifications" a PTF will not support
> > late events. It will filter them out. We have to solve the late events
> > topic at an earlier stage in the SQL pipeline. This is a different FLIP
> > discussion. Not every SQL operators should deal with late events in a
> > different way.
> >
> >  > Is there a guarantee that watermarking will be applied upstream of
> > the split between the two statements in the resulting job graph?
> >
> > Yes, the uid will make the PTF unique in the entire job graph. If this
> > is not possible (e.g. because the two PTF invocations are used in
> > completely different statements and a common subgraph cannot be
> > determined), we can throw an errror.
> >
> > Regards,
> > Timo
> >
> >
> > On 01.11.24 12:13, David Anderson wrote:
> > >> 3. Change of interfaces for multiple output tables
> > >> Currently, I think using a STATEMENT SET should be enough for side
> > >> output semantics. I have added an example in section 5.2.3.2 for that.
> > >
> > > I question whether this really works. Is there a guarantee that
> > > watermarking will be applied upstream of the split between the two
> > > statements in the resulting job graph? Otherwise, important use cases
> > like
> > > sending late events to a side output will behave non-deterministically,
> > and
> > > be useless.
> > >
> > > David
> > >
> > >
> > > On Fri, Nov 1, 2024 at 10:26 AM Timo Walther <twal...@apache.org>
> wrote:
> > >
> > >> Hi Xuyang,
> > >>
> > >> thanks for the good questions.
> > >>
> > >> 1. What happens if the TTLs for these different StateHints are not the
> > >> same?
> > >>
> > >> The eval() fully determines available state and their TTL. Helper
> > >> methods such as onTimer() and finish() can references a subset of
> > >> declared state. It is not necessary that the helper methods declare
> all
> > >> state properties one more time. The name should be sufficient and we
> > >> should forbid setting additional properties.
> > >>
> > >> 2. I believe the named arguments introduced in FLIP-387[1] can also be
> > >> applied to this ProcessTableFunction, right?
> > >>
> > >> Absolutely, the PTF actually needs named arguments. Esp for optional
> > >> fields such uid or on_time. For forward compatibility, I would even
> > >> suggest that PTFs only support named arguments. Not sure if we can
> > >> enforce that.
> > >>
> > >> 3. Will we expose the original RowKind in the eval method's row input?
> > >>
> > >> Yes, it's likely that only advanced users will take use of that. In
> that
> > >> case users have to work with Row/RowData. It's likely that rather
> > >> build-in functions will make use of this. The default changelog mode
> for
> > >> both input and output is append.
> > >>
> > >> 4. Are we allowing users to define both styles simultaneously
> > >>
> > >> Yes. Context is optional. And state access in helper methods
> > >> (finish/onTimer) as well. This reduces the overhead in case a PTF runs
> > >> in a container/other process.
> > >>
> > >> I will update the FLIP to reflect these answers.
> > >>
> > >> Thanks,
> > >> Timo
> > >>
> > >>
> > >>
> > >> On 01.11.24 05:10, Xuyang wrote:
> > >>> Hi, Timo.
> > >>>
> > >>> Thank you for this great work! When I previously introduced the
> session
> > >> window TVF, I was contemplating
> > >>>
> > >>> how to enable users to define a PTF in SQL. I'm glad to see this work
> > >> being discussed and that it has
> > >>>
> > >>> improved the integration with the DataStream API.
> > >>>
> > >>> After reading the entire flip, I have a few questions that I hope you
> > >> can address.
> > >>>
> > >>> 1. I noticed that in the example, the same field (e.g., CountState)
> can
> > >> declare a StateHint in the eval, onTimer,
> > >>>
> > >>> and finish methods. What happens if the TTLs for these different
> > >> StateHints are not the same?
> > >>>
> > >>> 2. I believe the named arguments introduced in FLIP-387[1] can also
> be
> > >> applied to this ProcessTableFunction, right?
> > >>>
> > >>> 3. In our UDAFs, we expect users to provide accumulate and retract
> > >> methods to handle input data for +I/+U and -U/-D.
> > >>>
> > >>> However, in the eval method of a ScalarFunction/UDTF, users do not
> have
> > >> visibility into the input's RowKind. In the new PTF,
> > >>>
> > >>> will we expose the original RowKind in the eval method's row input,
> > >> allowing users to determine the row's RowKind themselves?
> > >>>
> > >>> 4. I noticed that in the examples, the eval method sometimes includes
> > >> the Context, @StateHint fields, and the input data (Row
> > >>>
> > >>> input), while other times it only consists of the input data. Are we
> > >> allowing users to define both styles simultaneously?
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>>       Best!
> > >>>       Xuyang
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> At 2024-10-31 21:57:37, "Timo Walther" <twal...@apache.org> wrote:
> > >>>> Hi everyone,
> > >>>>
> > >>>> thanks for all the feedback I received so far. I had very healthy
> > >>>> discussions with various people both online and offline at Current
> and
> > >>>> Flink Forward Berlin. The general user responses were also very
> > >>>> positive. The FLIP should be ready to start a VOTE thread.
> > >>>>
> > >>>> This is the last call for feedback. I would start a VOTE tomorrow if
> > >>>> there are no objections. Happy to take further feedback during
> > >>>> implementation as well.
> > >>>>
> > >>>> Thanks,
> > >>>> Timo
> > >>>>
> > >>>> On 30.10.24 14:34, Timo Walther wrote:
> > >>>>> Hi Jim,
> > >>>>>
> > >>>>> 3. Multiple output tables
> > >>>>>
> > >>>>>    > Does the target_table need to be specified in the SELECT
> clause?
> > >>>>>
> > >>>>> No. Similar to reading from a regular table. The filter column must
> > not
> > >>>>> be part of SELECT part.
> > >>>>>
> > >>>>>    > It seems like the two target_table could have separate schemas
> > >> defined.
> > >>>>>
> > >>>>> That is true. The SELECT is responsible to transforms the columns
> > into
> > >>>>> the target table's schema. The output row of the PTF might be a
> union
> > >> of
> > >>>>> various columns in this case.
> > >>>>>
> > >>>>> 10. Support for State TTL
> > >>>>>
> > >>>>>    > I'd be strongly in favor of doing any interface / base work we
> > >> need in
> > >>>>>    > the initial implementation so that state size can be managed.
> > >>>>>
> > >>>>> I agree, State TTL is crucial. I updated the FLIP and added
> > interfaces
> > >>>>> to StateTypeStrategy and @StateHint.
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Timo
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On 23.10.24 17:59, Jim Hughes wrote:
> > >>>>>> Hi Timo,
> > >>>>>>
> > >>>>>> Thank you for the answers.  I have a few clarifications inlined.
> > >>>>>>
> > >>>>>> On Mon, Oct 14, 2024 at 8:07 AM Timo Walther <twal...@apache.org>
> > >> wrote:
> > >>>>>>
> > >>>>>>> 3. Change of interfaces for multiple output tables
> > >>>>>>> Currently, I think using a STATEMENT SET should be enough for
> side
> > >>>>>>> output semantics. I have added an example in section 5.2.3.2 for
> > >> that.
> > >>>>>>> We are still free to add more methods to Context, let the
> function
> > >>>>>>> implement additional interfaces or use more code generation
> > together
> > >>>>>>> with @ArgumentHints.
> > >>>>>>>
> > >>>>>>
> > >>>>>> Does the target_table need to be specified in the SELECT clause?
> Or
> > >>>>>> could
> > >>>>>> it read
> > >>>>>>
> > >>>>>> EXECUTE STATEMENT SET BEGIN
> > >>>>>>       INSERT INTO main SELECT a, b FROM
> > FunctionWithSideOutput(input =>
> > >>>>>> data,
> > >>>>>> uid = 'only_once') WHERE target_table = 'main';
> > >>>>>>       INSERT INTO side SELECT a, b FROM
> > FunctionWithSideOutput(input =>
> > >>>>>> data,
> > >>>>>> uid = 'only_once') WHERE target_table = 'side';
> > >>>>>> END;
> > >>>>>>
> > >>>>>> Separately, for clarity, it seems like the two target_table could
> > have
> > >>>>>> separate schemas defined.
> > >>>>>>
> > >>>>>>
> > >>>>>>> 10. Support for State TTL
> > >>>>>>> Supporting state TTL will be easy. We just need to add a
> parameter
> > to
> > >>>>>>> @StateHint and pass it through.
> > >>>>>>>
> > >>>>>>
> > >>>>>> If PTFs can have state, I'd be strongly in favor of doing any
> > >> interface /
> > >>>>>> base work we need in the initial implementation so that state size
> > >> can be
> > >>>>>> managed.  If it is just sufficient to have hints in the interface,
> > >>>>>> awesome!
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>>
> > >>>>>> Jim
> > >>>>>>
> > >>>>>
> > >>
> > >>
> > >
> >
> >
>

Reply via email to