Thanks @Piort for your valuable inputs!

I did a quick read of the previous discussion you mentioned, seems my flip
title doesn't give a clear scope here and make some confusions, if my
understanding is correct, the UDFs in your context is the user
implemented `org.apache.flink.api.common.functions.Function`s, while the
`UserDefinedFunction` I mentioned in the flip is limited to the flink-table
module which located in `org.apache.flink.table.functions`.

Here's an use case we've met recently (which is indeed the motivation to
propose this):
one of our user implemented a
`org.apache.flink.table.functions.TableFunction`, the simplified
pseudo-code is as below:

```
class XFunction extend TableFunction<Out> {

  open(FunctionContext context){
      initMemQueue();
      initPythonProc()
  }

  eval(In in){
      queue.offer(data)
      Out out = queue.poll()
      if (out != null) {
        collect(out)
      }
  }

  close(){
      waitForPythonFinish()
      List<Out> outputs = drainQueue()
      outputs.foreach(out -> collect(out))
  }
}
```
It works well in lower flink versions until they attempt to do a upgrade
recently, the 'flush' logic in the legacy close method of `TableFunction`
cannot work properly any more.

Before proposing the flip, I also considered the `flush()` extension on the
`org.apache.flink.api.common.functions.Function`, because some sql
operators are also related, but currently not included in the scope of this
flip, maybe we can discuss it in another thread.

Wish this helps explaining the reason and welcome your comments here!

Best,
Lincoln Lee


Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道:

> Hi Lincoln,
>
> Thanks for the proposal. Have you seen the old discussion about adding this
> `finish()` method? [1] We didn't add it to UDFs, as we didn't see a
> motivation (maybe we have missed something), and at the same time it wasn't
> that easy. Plain `finish()` wouldn't be enough. Users would need a way to
> output records from the `finish()` call, so it would have to be typed with
> the user record (`finish(Collector<T> output)`). On the other hand, we
> couldn't find an example where a user would actually need the `finish()`
> call in an UDF, as it seemed to us it makes only sense for
> operators/functions that are buffering records. Note back then, during the
> discussion, we were referring to this method as `flush()` or `drain()`.
>
> Can you shed some more light and provide more details on the exact
> motivating example behind this proposal?
>
> Best,
> Piotrek
>
> [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
>
> śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com> napisał(a):
>
> > Hello everyone,
> >
> >   I’d like to open a discussion on FLIP-260[1]: expose finish method for
> > UserDefinedFunction, this makes a chance for users who rely on finish
> logic
> > in the legacy close() method (< 1.14) to migrate to the new finish()
> > method.
> >
> >   The task lifecycle was changed in FLINK-22972[2]: a new finish() phase
> > was introduced (extracted the ‘finish’ part out of the ‘close’) and
> removed
> > the dispose() method. This change was also done in table module (e.g.,
> > `AbstractMapBundleOperator` for mini-batch operation ) but not covered
> the
> > UserDefinedFunction which only exposes open() and close() api for custom
> > usage, those customers who rely on the legacy close() api may encounter
> > wrong result or suffer runtime errors after upgrading to the new version.
> > Strictly speaking, it is a bug caused by the breaking change, but due to
> > the public api change, we propose this flip.
> >
> >   Looking forward to your comments or feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> > [2] https://issues.apache.org/jira/browse/FLINK-22972
> >
> > Best,
> > Lincoln Lee
> >
>

Reply via email to