Hi David,

as mentioned in "Scoping and Simplifications" a PTF will not support late events. It will filter them out. We have to solve the late events topic at an earlier stage in the SQL pipeline. This is a different FLIP discussion. Not every SQL operators should deal with late events in a different way.

> Is there a guarantee that watermarking will be applied upstream of the split between the two statements in the resulting job graph?

Yes, the uid will make the PTF unique in the entire job graph. If this is not possible (e.g. because the two PTF invocations are used in completely different statements and a common subgraph cannot be determined), we can throw an errror.

Regards,
Timo


On 01.11.24 12:13, David Anderson wrote:
3. Change of interfaces for multiple output tables
Currently, I think using a STATEMENT SET should be enough for side
output semantics. I have added an example in section 5.2.3.2 for that.

I question whether this really works. Is there a guarantee that
watermarking will be applied upstream of the split between the two
statements in the resulting job graph? Otherwise, important use cases like
sending late events to a side output will behave non-deterministically, and
be useless.

David


On Fri, Nov 1, 2024 at 10:26 AM Timo Walther <twal...@apache.org> wrote:

Hi Xuyang,

thanks for the good questions.

1. What happens if the TTLs for these different StateHints are not the
same?

The eval() fully determines available state and their TTL. Helper
methods such as onTimer() and finish() can references a subset of
declared state. It is not necessary that the helper methods declare all
state properties one more time. The name should be sufficient and we
should forbid setting additional properties.

2. I believe the named arguments introduced in FLIP-387[1] can also be
applied to this ProcessTableFunction, right?

Absolutely, the PTF actually needs named arguments. Esp for optional
fields such uid or on_time. For forward compatibility, I would even
suggest that PTFs only support named arguments. Not sure if we can
enforce that.

3. Will we expose the original RowKind in the eval method's row input?

Yes, it's likely that only advanced users will take use of that. In that
case users have to work with Row/RowData. It's likely that rather
build-in functions will make use of this. The default changelog mode for
both input and output is append.

4. Are we allowing users to define both styles simultaneously

Yes. Context is optional. And state access in helper methods
(finish/onTimer) as well. This reduces the overhead in case a PTF runs
in a container/other process.

I will update the FLIP to reflect these answers.

Thanks,
Timo



On 01.11.24 05:10, Xuyang wrote:
Hi, Timo.

Thank you for this great work! When I previously introduced the session
window TVF, I was contemplating

how to enable users to define a PTF in SQL. I'm glad to see this work
being discussed and that it has

improved the integration with the DataStream API.

After reading the entire flip, I have a few questions that I hope you
can address.

1. I noticed that in the example, the same field (e.g., CountState) can
declare a StateHint in the eval, onTimer,

and finish methods. What happens if the TTLs for these different
StateHints are not the same?

2. I believe the named arguments introduced in FLIP-387[1] can also be
applied to this ProcessTableFunction, right?

3. In our UDAFs, we expect users to provide accumulate and retract
methods to handle input data for +I/+U and -U/-D.

However, in the eval method of a ScalarFunction/UDTF, users do not have
visibility into the input's RowKind. In the new PTF,

will we expose the original RowKind in the eval method's row input,
allowing users to determine the row's RowKind themselves?

4. I noticed that in the examples, the eval method sometimes includes
the Context, @StateHint fields, and the input data (Row

input), while other times it only consists of the input data. Are we
allowing users to define both styles simultaneously?




[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures







--

      Best!
      Xuyang





At 2024-10-31 21:57:37, "Timo Walther" <twal...@apache.org> wrote:
Hi everyone,

thanks for all the feedback I received so far. I had very healthy
discussions with various people both online and offline at Current and
Flink Forward Berlin. The general user responses were also very
positive. The FLIP should be ready to start a VOTE thread.

This is the last call for feedback. I would start a VOTE tomorrow if
there are no objections. Happy to take further feedback during
implementation as well.

Thanks,
Timo

On 30.10.24 14:34, Timo Walther wrote:
Hi Jim,

3. Multiple output tables

   > Does the target_table need to be specified in the SELECT clause?

No. Similar to reading from a regular table. The filter column must not
be part of SELECT part.

   > It seems like the two target_table could have separate schemas
defined.

That is true. The SELECT is responsible to transforms the columns into
the target table's schema. The output row of the PTF might be a union
of
various columns in this case.

10. Support for State TTL

   > I'd be strongly in favor of doing any interface / base work we
need in
   > the initial implementation so that state size can be managed.

I agree, State TTL is crucial. I updated the FLIP and added interfaces
to StateTypeStrategy and @StateHint.

Cheers,
Timo



On 23.10.24 17:59, Jim Hughes wrote:
Hi Timo,

Thank you for the answers.  I have a few clarifications inlined.

On Mon, Oct 14, 2024 at 8:07 AM Timo Walther <twal...@apache.org>
wrote:

3. Change of interfaces for multiple output tables
Currently, I think using a STATEMENT SET should be enough for side
output semantics. I have added an example in section 5.2.3.2 for
that.
We are still free to add more methods to Context, let the function
implement additional interfaces or use more code generation together
with @ArgumentHints.


Does the target_table need to be specified in the SELECT clause?  Or
could
it read

EXECUTE STATEMENT SET BEGIN
      INSERT INTO main SELECT a, b FROM FunctionWithSideOutput(input =>
data,
uid = 'only_once') WHERE target_table = 'main';
      INSERT INTO side SELECT a, b FROM FunctionWithSideOutput(input =>
data,
uid = 'only_once') WHERE target_table = 'side';
END;

Separately, for clarity, it seems like the two target_table could have
separate schemas defined.


10. Support for State TTL
Supporting state TTL will be easy. We just need to add a parameter to
@StateHint and pass it through.


If PTFs can have state, I'd be strongly in favor of doing any
interface /
base work we need in the initial implementation so that state size
can be
managed.  If it is just sufficient to have hints in the interface,
awesome!

Cheers,

Jim






Reply via email to