bq. In a design document, Timo mentioned that we can ship multiple JAR files
Mind telling us where the design doc can be retrieved ? Thanks On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > I'm not sure if we need to modify the existing method. > What we need is a bit different from what registerCachedFile() provides. > The method ensures that a file is copied to each TaskManager and can be > locally accessed from a function's RuntimeContext. > In our case, we don't need to access the file but would like to make sure > that it is loaded into the class loader. > So, we could also just add a method like registerUserJarFile(). > > In a design document, Timo mentioned that we can ship multiple JAR files > with a job. > So, we could also implement the UDF shipping logic by loading the Jar > file(s) to the client and distribute them from there. > In that case, we would not need to add new method to the execution > environment. > > Best, > Fabian > > 2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>: > > > +1. This could be very useful for "dynamic" UDF. > > > > Just to clarify, if I understand correctly, we are tying to use an ENUM > > indicator to > > (1) Replace the current Boolean isExecutable flag. > > (2) Provide additional information used by ExecutionEnvironment to decide > > when/where to use the DistributedCached file. > > > > In this case, DistributedCache.CacheType or DistributedCache.FileType > > sounds more intuitive, what do you think? > > > > Also, I was wondering is there any other useful information for the > cached > > file to be passed to runtime. > > If we are just talking about including the library to the classloader, > can > > we directly extend the interface with > > > > public void registerCachedFile( > > String filePath, > > String name, > > boolean executable, > > boolean includeInClassLoader) > > > > > > Thanks, > > Rong > > > > On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <suez1...@gmail.com> wrote: > > > > > Hi Flink devs, > > > > > > In an effort to support loading external libraries and creating UDFs > from > > > external libraries using DDL in Flink SQL, we want to use Flink’s Blob > > > Server to distribute the external libraries in runtime and load those > > > libraries into the user code classloader automatically. > > > > > > However, the current [Stream]ExecutionEnvironment.registerCachedFile > > > interface limits only to registering executable or non-executable > blobs. > > > It’s not possible to tell in runtime if the blob files are libraries > and > > > should be loaded into the user code classloader in RuntimeContext. > > > Therefore, I want to propose to add an enum called *BlobType* > explicitly > > to > > > indicate the type of the Blob file being distributed, and the following > > > interface in [Stream]ExecutionEnvironment to support it. In general, I > > > think the new BlobType information can be used by Flink runtime to > > > preprocess the Blob files if needed. > > > > > > */*** > > > ** Registers a file at the distributed cache under the given name. The > > file > > > will be accessible* > > > ** from any user-defined function in the (distributed) runtime under a > > > local path. Files* > > > ** may be local files (as long as all relevant workers have access to > > it), > > > or files in a distributed file system.* > > > ** The runtime will copy the files temporarily to a local cache, if > > > needed.* > > > *** > > > ** <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} > > can > > > be obtained inside UDFs via* > > > ** {@link > > > org.apache.flink.api.common.functions.RichFunction# > getRuntimeContext()} > > > and > > > provides access* > > > ** {@link org.apache.flink.api.common.ca > > > <http://org.apache.flink.api.common.ca>che.DistributedCache} via* > > > ** {@link > > > org.apache.flink.api.common.functions.RuntimeContext# > > > getDistributedCache()}.* > > > *** > > > ** @param filePath The path of the file, as a URI (e.g. > > "file:///some/path" > > > or "hdfs://host:port/and/path")* > > > ** @param name The name under which the file is registered.* > > > ** @param blobType indicating the type of the Blob file* > > > **/* > > > > > > *public void registerCachedFile(String filePath, String name, > > > DistributedCache.BlobType blobType) {...}* > > > > > > Optionally, we can add another interface to register UDF Jars which > will > > > use the interface above to implement. > > > > > > *public void registerJarFile(String filePath, String name) {...}* > > > > > > The existing interface in the following will be marked deprecated: > > > > > > *public void registerCachedFile(String filePath, String name, boolean > > > executable) {...}* > > > > > > And the following interface will be implemented using the new interface > > > proposed above with a EXECUTABLE BlobType: > > > > > > *public void registerCachedFile(String filePath, String name) { ... }* > > > > > > Thanks a lot. > > > Shuyi > > > > > > "So you have to trust that the dots will somehow connect in your > future." > > > > > >