Hey Lincoln,

Thanks for opening the discussion.

To be honest I am not convinced if emitting from close there is a contract that was envisioned and thus should be maintained. As far as I can see it does affect only the TableFunction, because it has the collect method. None of the other UDFs (ScalarFunction, AggregateFunction) have means to emit records from close().

To be honest I am not sure what would be the consequences of interplay with other operators which expect TableFunction to emit only when eval is called. Not sure if there are such.

If it is a thing that we are certain we want to support, I'd be much more comfortable adding finish() to the TableFunction instead. Would be happy to hear opinions from the Table API folks.

Best,

Dawid

On 14/09/2022 15:55, Lincoln Lee wrote:
Thanks @Piort for your valuable inputs!

I did a quick read of the previous discussion you mentioned, seems my flip
title doesn't give a clear scope here and make some confusions, if my
understanding is correct, the UDFs in your context is the user
implemented `org.apache.flink.api.common.functions.Function`s, while the
`UserDefinedFunction` I mentioned in the flip is limited to the flink-table
module which located in `org.apache.flink.table.functions`.

Here's an use case we've met recently (which is indeed the motivation to
propose this):
one of our user implemented a
`org.apache.flink.table.functions.TableFunction`, the simplified
pseudo-code is as below:

```
class XFunction extend TableFunction<Out> {

   open(FunctionContext context){
       initMemQueue();
       initPythonProc()
   }

   eval(In in){
       queue.offer(data)
       Out out = queue.poll()
       if (out != null) {
         collect(out)
       }
   }

   close(){
       waitForPythonFinish()
       List<Out> outputs = drainQueue()
       outputs.foreach(out -> collect(out))
   }
}
```
It works well in lower flink versions until they attempt to do a upgrade
recently, the 'flush' logic in the legacy close method of `TableFunction`
cannot work properly any more.

Before proposing the flip, I also considered the `flush()` extension on the
`org.apache.flink.api.common.functions.Function`, because some sql
operators are also related, but currently not included in the scope of this
flip, maybe we can discuss it in another thread.

Wish this helps explaining the reason and welcome your comments here!

Best,
Lincoln Lee


Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道:

Hi Lincoln,

Thanks for the proposal. Have you seen the old discussion about adding this
`finish()` method? [1] We didn't add it to UDFs, as we didn't see a
motivation (maybe we have missed something), and at the same time it wasn't
that easy. Plain `finish()` wouldn't be enough. Users would need a way to
output records from the `finish()` call, so it would have to be typed with
the user record (`finish(Collector<T> output)`). On the other hand, we
couldn't find an example where a user would actually need the `finish()`
call in an UDF, as it seemed to us it makes only sense for
operators/functions that are buffering records. Note back then, during the
discussion, we were referring to this method as `flush()` or `drain()`.

Can you shed some more light and provide more details on the exact
motivating example behind this proposal?

Best,
Piotrek

[1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp

śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com> napisał(a):

Hello everyone,

   I’d like to open a discussion on FLIP-260[1]: expose finish method for
UserDefinedFunction, this makes a chance for users who rely on finish
logic
in the legacy close() method (< 1.14) to migrate to the new finish()
method.

   The task lifecycle was changed in FLINK-22972[2]: a new finish() phase
was introduced (extracted the ‘finish’ part out of the ‘close’) and
removed
the dispose() method. This change was also done in table module (e.g.,
`AbstractMapBundleOperator` for mini-batch operation ) but not covered
the
UserDefinedFunction which only exposes open() and close() api for custom
usage, those customers who rely on the legacy close() api may encounter
wrong result or suffer runtime errors after upgrading to the new version.
Strictly speaking, it is a bug caused by the breaking change, but due to
the public api change, we propose this flip.

   Looking forward to your comments or feedback.

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
[2] https://issues.apache.org/jira/browse/FLINK-22972

Best,
Lincoln Lee

Attachment: OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to