Hi Ted, The design doc is in late draft status and proposes support for SQL DDL statements (CREATE TABLE, CREATE FUNCTION, etc.). The question about registering JARs came up because we need a way to distribute JAR files that contain the code of user-defined functions.
The design doc will soon be shared on the dev mailing list to gather feedback from the community. Best, Fabian 2018-05-16 10:45 GMT+02:00 Ted Yu <yuzhih...@gmail.com>: > bq. In a design document, Timo mentioned that we can ship multiple JAR > files > > Mind telling us where the design doc can be retrieved ? > > Thanks > > On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > > Hi, > > > > I'm not sure if we need to modify the existing method. > > What we need is a bit different from what registerCachedFile() provides. > > The method ensures that a file is copied to each TaskManager and can be > > locally accessed from a function's RuntimeContext. > > In our case, we don't need to access the file but would like to make sure > > that it is loaded into the class loader. > > So, we could also just add a method like registerUserJarFile(). > > > > In a design document, Timo mentioned that we can ship multiple JAR files > > with a job. > > So, we could also implement the UDF shipping logic by loading the Jar > > file(s) to the client and distribute them from there. > > In that case, we would not need to add new method to the execution > > environment. > > > > Best, > > Fabian > > > > 2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>: > > > > > +1. This could be very useful for "dynamic" UDF. > > > > > > Just to clarify, if I understand correctly, we are tying to use an ENUM > > > indicator to > > > (1) Replace the current Boolean isExecutable flag. > > > (2) Provide additional information used by ExecutionEnvironment to > decide > > > when/where to use the DistributedCached file. > > > > > > In this case, DistributedCache.CacheType or DistributedCache.FileType > > > sounds more intuitive, what do you think? > > > > > > Also, I was wondering is there any other useful information for the > > cached > > > file to be passed to runtime. > > > If we are just talking about including the library to the classloader, > > can > > > we directly extend the interface with > > > > > > public void registerCachedFile( > > > String filePath, > > > String name, > > > boolean executable, > > > boolean includeInClassLoader) > > > > > > > > > Thanks, > > > Rong > > > > > > On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <suez1...@gmail.com> > wrote: > > > > > > > Hi Flink devs, > > > > > > > > In an effort to support loading external libraries and creating UDFs > > from > > > > external libraries using DDL in Flink SQL, we want to use Flink’s > Blob > > > > Server to distribute the external libraries in runtime and load those > > > > libraries into the user code classloader automatically. > > > > > > > > However, the current [Stream]ExecutionEnvironment.registerCachedFile > > > > interface limits only to registering executable or non-executable > > blobs. > > > > It’s not possible to tell in runtime if the blob files are libraries > > and > > > > should be loaded into the user code classloader in RuntimeContext. > > > > Therefore, I want to propose to add an enum called *BlobType* > > explicitly > > > to > > > > indicate the type of the Blob file being distributed, and the > following > > > > interface in [Stream]ExecutionEnvironment to support it. In general, > I > > > > think the new BlobType information can be used by Flink runtime to > > > > preprocess the Blob files if needed. > > > > > > > > */*** > > > > ** Registers a file at the distributed cache under the given name. > The > > > file > > > > will be accessible* > > > > ** from any user-defined function in the (distributed) runtime under > a > > > > local path. Files* > > > > ** may be local files (as long as all relevant workers have access to > > > it), > > > > or files in a distributed file system.* > > > > ** The runtime will copy the files temporarily to a local cache, if > > > > needed.* > > > > *** > > > > ** <p>The {@link org.apache.flink.api.common. > functions.RuntimeContext} > > > can > > > > be obtained inside UDFs via* > > > > ** {@link > > > > org.apache.flink.api.common.functions.RichFunction# > > getRuntimeContext()} > > > > and > > > > provides access* > > > > ** {@link org.apache.flink.api.common.ca > > > > <http://org.apache.flink.api.common.ca>che.DistributedCache} via* > > > > ** {@link > > > > org.apache.flink.api.common.functions.RuntimeContext# > > > > getDistributedCache()}.* > > > > *** > > > > ** @param filePath The path of the file, as a URI (e.g. > > > "file:///some/path" > > > > or "hdfs://host:port/and/path")* > > > > ** @param name The name under which the file is registered.* > > > > ** @param blobType indicating the type of the Blob file* > > > > **/* > > > > > > > > *public void registerCachedFile(String filePath, String name, > > > > DistributedCache.BlobType blobType) {...}* > > > > > > > > Optionally, we can add another interface to register UDF Jars which > > will > > > > use the interface above to implement. > > > > > > > > *public void registerJarFile(String filePath, String name) {...}* > > > > > > > > The existing interface in the following will be marked deprecated: > > > > > > > > *public void registerCachedFile(String filePath, String name, boolean > > > > executable) {...}* > > > > > > > > And the following interface will be implemented using the new > interface > > > > proposed above with a EXECUTABLE BlobType: > > > > > > > > *public void registerCachedFile(String filePath, String name) { ... > }* > > > > > > > > Thanks a lot. > > > > Shuyi > > > > > > > > "So you have to trust that the dots will somehow connect in your > > future." > > > > > > > > > >