Hi Ron,
thanks for opening this discussion and proposing a FLIP. This feature
has been requested multiple times before and it is definitely something
that the Flink community wants. However, we have to ensure that this
change fits into the overall architecture and doesn't break core
assumptions. The function stack is already pretty complex today and we
should encapsulate the change as much as possible from other components.
Some feedback from my side:
1) Filesystem abstraction
I share Martijn's concerns. This FLIP seems to be HDFS-centric to me.
There are other filesystems such as S3 that should be considered as
well. It would be good to use Flink's filesystem abstraction to load
resources.
2) Table API
Currently, the FLIP only mentions SQL syntax. Please also provide a
programmatic API. So far we offer `TableEnvironment.createFunction`but
all of them only take classes or instances, we should provide a method
that takes a string and resource information.
3) ResourceUri
You mention ResourceUri in the interface of CatalogFunction. Where does
this class come from? What does it contain?
4) Function registration vs. function usage
You mentioned that a `CREATE FUNCTION ... USING JAR` will directly use
load resources into the user class loader. This would break our current
assumptions. Currently, `CREATE` are pure metadata catalog operations.
For `CREATE TABLE`, we neither check for available connectors nor
validity of the provided options. We should stick to this principle and
let `CREATE FUNCTION` an easy "put metadata into catalog" operation.
In the end, the function usage is the important piece and not the
function registration. The planner needs to validate whether resources
are still available and load them when looking up a function in a catalog.
5) Cluster execution
It would be great if you can elaborate a bit on the implementation by
mentioning internal classes. Does pipeline.jars support HDFS or other
file systems already? I think currently it is meant only to submit local
files via blob store. So how are the HDFS/S3 resources fetched during
execution in the cluster? Which entity fetches them?
5) API Implementation
Could you elaborate roughly what should go into Planner,
FunctionCatalog, CatalogManager, TableEnvironment and higher layers?
What happens in case of an EXPLAIN? Will the loaded resources be cleaned
up?
What happens if TableEnvironment is used in an interactive session for
hundreds of submissions? Will the class loader grow infinitely?
I think we will need some "Resource Manager" entity (either in
CatalogManager or as a separate entity in TableEnvironment) that
performs bookkeeping of used/loaded resources. This entity should also
provide a UserClassLoader that inherits from the classloader given via
EnvironmentSettings.
The planner should return which resources it needs in the end. Take
`SELECT MyFunc(1)` as an example, this function will be executed on the
client side during constant folding, thus the resources of `MyFunc` are
not necessary anymore in the cluster. Only the planner knows this after
optimization. It might change the signature of Planner#translate to
return Tuple2<List<Transformation<?>>, List<Resources>>.
However, in Table API we resolve functions eagerly so we need to keep
the resources forever and should provide a cleanup method.
Regarding FLINK-15635, my colleague Francesco is currently working on
resolving this issue.
Regards,
Timo
Am 24.03.22 um 09:18 schrieb Martijn Visser:
Hi Ron,
Thanks for creating the FLIP. You're talking about both local and remote
resources. With regards to remote resources, how do you see this work with
Flink's filesystem abstraction? I did read in the FLIP that Hadoop
dependencies are not packaged, but I would hope that we do that for all
filesystem implementation. I don't think it's a good idea to have any tight
coupling to file system implementations, especially if at some point we
could also externalize file system implementations (like we're doing for
connectors already). I think the FLIP would be better by not only
referring to "Hadoop" as a remote resource provider, but a more generic
term since there are more options than Hadoop.
I'm also thinking about security/operations implications: would it be
possible for bad actor X to create a JAR that either influences other
running jobs, leaks data or credentials or anything else? If so, I think it
would also be good to have an option to disable this feature completely. I
think there are roughly two types of companies who run Flink: those who
open it up for everyone to use (here the feature would be welcomed) and
those who need to follow certain minimum standards/have a more closed Flink
ecosystem). They usually want to validate a JAR upfront before making it
available, even at the expense of speed, because it gives them more control
over what will be running in their environment.
Best regards,
Martijn Visser
https://twitter.com/MartijnVisser82
On Wed, 23 Mar 2022 at 16:47, 刘大龙 <ld...@zju.edu.cn> wrote:
-----原始邮件-----
发件人: "Peter Huang" <huangzhenqiu0...@gmail.com>
发送时间: 2022-03-23 11:13:32 (星期三)
收件人: dev <dev@flink.apache.org>
抄送:
主题: Re: 回复:Re:[DISCUSS] FLIP-214 Support Advanced Function DDL
Hi Ron,
Thanks for reviving the discussion of the work. The design looks good. A
small typo in the FLIP is that currently it is marked as released in
1.16.
Best Regards
Peter Huang
On Tue, Mar 22, 2022 at 10:58 PM Mang Zhang <zhangma...@163.com> wrote:
hi Yuxia,
Thanks for your reply. Your reminder is very important !
Since we download the file to the local, remember to clean it up when
the
flink client exits
--
Best regards,
Mang Zhang
At 2022-03-23 10:02:26, "罗宇侠(莫辞)"
<luoyuxia.luoyu...@alibaba-inc.com.INVALID> wrote:
Hi Ron, Thanks for starting this dicuss, some Spark/Hive users will
benefit from it. The flip looks good to me. I just have two minor
questions:
1. For synax explanation, I see it's "Create .... function as
identifier....", I think the word "identifier" may not be
self-dedescriptive for actually it's not a random name but the name of
the
class that provides the implementation for function to be create.
May be it'll be more clear to use "class_name" replace "identifier"
just
like what Hive[1]/Spark[2] do.
2. >> If the resource used is a remote resource, it will first
download
the resource to a local temporary directory, which will be generated
using
UUID, and then register the local path to the user class loader.
For the above explanation in this FLIP, It seems for such statement
sets,
""
Create function as org.apache.udf1 using jar 'hdfs://myudfs.jar';
Create function as org.apache.udf2 using jar 'hdfs://myudfs.jar';
""
it'll download the resource 'hdfs://myudfs.jar' for twice. So is it
possible to provide some cache mechanism that we won't need to
download /
store for twice?
Best regards,
Yuxia
[1]
https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl
[2]
https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-ddl-create-function.html------------------------------------------------------------------
发件人:Mang Zhang<zhangma...@163.com>
日 期:2022年03月22日 11:35:24
收件人:<dev@flink.apache.org>
主 题:Re:[DISCUSS] FLIP-214 Support Advanced Function DDL
Hi Ron, Thank you so much for this suggestion, this is so good.
In our company, when users use custom UDF, it is very inconvenient,
and
the code needs to be packaged into the job jar,
and cannot refer to the existing udf jar through the existing udf jar.
Or pass in the jar reference in the startup command.
If we implement this feature, users can focus on their own business
development.
I can also contribute if needed.
--
Best regards,
Mang Zhang
At 2022-03-21 14:57:32, "刘大龙" <ld...@zju.edu.cn> wrote:
Hi, everyone
I would like to open a discussion for support advanced Function DDL,
this proposal is a continuation of FLIP-79 in which Flink Function DDL
is
defined. Until now it is partially released as the Flink function DDL
with
user defined resources is not clearly discussed and implemented. It is
an
important feature for support to register UDF with custom jar resource,
users can use UDF more more easily without having to put jars under the
classpath in advance.
Looking forward to your feedback.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-214+Support+Advanced+Function+DDL
Best,
Ron
Hi, Peter, Thanks for your feedback. This work also has your effort, thank
you very much.