Hey Lincoln, Thanks for opening the discussion.
To be honest I am not convinced if emitting from close there is a contract that was envisioned and thus should be maintained. As far as I can see it does affect only the TableFunction, because it has the collect method. None of the other UDFs (ScalarFunction, AggregateFunction) have means to emit records from close().
To be honest I am not sure what would be the consequences of interplay with other operators which expect TableFunction to emit only when eval is called. Not sure if there are such.
If it is a thing that we are certain we want to support, I'd be much more comfortable adding finish() to the TableFunction instead. Would be happy to hear opinions from the Table API folks.
Best, Dawid On 14/09/2022 15:55, Lincoln Lee wrote:
Thanks @Piort for your valuable inputs! I did a quick read of the previous discussion you mentioned, seems my flip title doesn't give a clear scope here and make some confusions, if my understanding is correct, the UDFs in your context is the user implemented `org.apache.flink.api.common.functions.Function`s, while the `UserDefinedFunction` I mentioned in the flip is limited to the flink-table module which located in `org.apache.flink.table.functions`. Here's an use case we've met recently (which is indeed the motivation to propose this): one of our user implemented a `org.apache.flink.table.functions.TableFunction`, the simplified pseudo-code is as below: ``` class XFunction extend TableFunction<Out> { open(FunctionContext context){ initMemQueue(); initPythonProc() } eval(In in){ queue.offer(data) Out out = queue.poll() if (out != null) { collect(out) } } close(){ waitForPythonFinish() List<Out> outputs = drainQueue() outputs.foreach(out -> collect(out)) } } ``` It works well in lower flink versions until they attempt to do a upgrade recently, the 'flush' logic in the legacy close method of `TableFunction` cannot work properly any more. Before proposing the flip, I also considered the `flush()` extension on the `org.apache.flink.api.common.functions.Function`, because some sql operators are also related, but currently not included in the scope of this flip, maybe we can discuss it in another thread. Wish this helps explaining the reason and welcome your comments here! Best, Lincoln Lee Piotr Nowojski <pnowoj...@apache.org> 于2022年9月14日周三 16:56写道:Hi Lincoln, Thanks for the proposal. Have you seen the old discussion about adding this `finish()` method? [1] We didn't add it to UDFs, as we didn't see a motivation (maybe we have missed something), and at the same time it wasn't that easy. Plain `finish()` wouldn't be enough. Users would need a way to output records from the `finish()` call, so it would have to be typed with the user record (`finish(Collector<T> output)`). On the other hand, we couldn't find an example where a user would actually need the `finish()` call in an UDF, as it seemed to us it makes only sense for operators/functions that are buffering records. Note back then, during the discussion, we were referring to this method as `flush()` or `drain()`. Can you shed some more light and provide more details on the exact motivating example behind this proposal? Best, Piotrek [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.8...@gmail.com> napisał(a):Hello everyone, I’d like to open a discussion on FLIP-260[1]: expose finish method for UserDefinedFunction, this makes a chance for users who rely on finishlogicin the legacy close() method (< 1.14) to migrate to the new finish() method. The task lifecycle was changed in FLINK-22972[2]: a new finish() phase was introduced (extracted the ‘finish’ part out of the ‘close’) andremovedthe dispose() method. This change was also done in table module (e.g., `AbstractMapBundleOperator` for mini-batch operation ) but not coveredtheUserDefinedFunction which only exposes open() and close() api for custom usage, those customers who rely on the legacy close() api may encounter wrong result or suffer runtime errors after upgrading to the new version. Strictly speaking, it is a bug caused by the breaking change, but due to the public api change, we propose this flip. Looking forward to your comments or feedback. [1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction[2] https://issues.apache.org/jira/browse/FLINK-22972 Best, Lincoln Lee
OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key
OpenPGP_signature
Description: OpenPGP digital signature