Hi Ted,

The design doc is in late draft status and proposes support for SQL DDL
statements (CREATE TABLE, CREATE  FUNCTION, etc.).
The question about registering JARs came up because we need a way to
distribute JAR files that contain the code of user-defined functions.

The design doc will soon be shared on the dev mailing list to gather
feedback from the community.

Best, Fabian

2018-05-16 10:45 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:

> bq. In a design document, Timo mentioned that we can ship multiple JAR
> files
>
> Mind telling us where the design doc can be retrieved ?
>
> Thanks
>
> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm not sure if we need to modify the existing method.
> > What we need is a bit different from what registerCachedFile() provides.
> > The method ensures that a file is copied to each TaskManager and can be
> > locally accessed from a function's RuntimeContext.
> > In our case, we don't need to access the file but would like to make sure
> > that it is loaded into the class loader.
> > So, we could also just add a method like registerUserJarFile().
> >
> > In a design document, Timo mentioned that we can ship multiple JAR files
> > with a job.
> > So, we could also implement the UDF shipping logic by loading the Jar
> > file(s) to the client and distribute them from there.
> > In that case, we would not need to add new method to the execution
> > environment.
> >
> > Best,
> > Fabian
> >
> > 2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>:
> >
> > > +1. This could be very useful for "dynamic" UDF.
> > >
> > > Just to clarify, if I understand correctly, we are tying to use an ENUM
> > > indicator to
> > > (1) Replace the current Boolean isExecutable flag.
> > > (2) Provide additional information used by ExecutionEnvironment to
> decide
> > > when/where to use the DistributedCached file.
> > >
> > > In this case, DistributedCache.CacheType or DistributedCache.FileType
> > > sounds more intuitive, what do you think?
> > >
> > > Also, I was wondering is there any other useful information for the
> > cached
> > > file to be passed to runtime.
> > > If we are just talking about including the library to the classloader,
> > can
> > > we directly extend the interface with
> > >
> > > public void registerCachedFile(
> > >     String filePath,
> > >     String name,
> > >     boolean executable,
> > >     boolean includeInClassLoader)
> > >
> > >
> > > Thanks,
> > > Rong
> > >
> > > On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <suez1...@gmail.com>
> wrote:
> > >
> > > > Hi Flink devs,
> > > >
> > > > In an effort to support loading external libraries and creating UDFs
> > from
> > > > external libraries using DDL in Flink SQL, we want to use Flink’s
> Blob
> > > > Server to distribute the external libraries in runtime and load those
> > > > libraries into the user code classloader automatically.
> > > >
> > > > However, the current [Stream]ExecutionEnvironment.registerCachedFile
> > > > interface limits only to registering executable or non-executable
> > blobs.
> > > > It’s not possible to tell in runtime if the blob files are libraries
> > and
> > > > should be loaded into the user code classloader in RuntimeContext.
> > > > Therefore, I want to propose to add an enum called *BlobType*
> > explicitly
> > > to
> > > > indicate the type of the Blob file being distributed, and the
> following
> > > > interface in [Stream]ExecutionEnvironment to support it. In general,
> I
> > > > think the new BlobType information can be used by Flink runtime to
> > > > preprocess the Blob files if needed.
> > > >
> > > > */***
> > > > ** Registers a file at the distributed cache under the given name.
> The
> > > file
> > > > will be accessible*
> > > > ** from any user-defined function in the (distributed) runtime under
> a
> > > > local path. Files*
> > > > ** may be local files (as long as all relevant workers have access to
> > > it),
> > > > or files in a distributed file system.*
> > > > ** The runtime will copy the files temporarily to a local cache, if
> > > > needed.*
> > > > ***
> > > > ** <p>The {@link org.apache.flink.api.common.
> functions.RuntimeContext}
> > > can
> > > > be obtained inside UDFs via*
> > > > ** {@link
> > > > org.apache.flink.api.common.functions.RichFunction#
> > getRuntimeContext()}
> > > > and
> > > > provides access*
> > > > ** {@link org.apache.flink.api.common.ca
> > > > <http://org.apache.flink.api.common.ca>che.DistributedCache} via*
> > > > ** {@link
> > > > org.apache.flink.api.common.functions.RuntimeContext#
> > > > getDistributedCache()}.*
> > > > ***
> > > > ** @param filePath The path of the file, as a URI (e.g.
> > > "file:///some/path"
> > > > or "hdfs://host:port/and/path")*
> > > > ** @param name The name under which the file is registered.*
> > > > ** @param blobType indicating the type of the Blob file*
> > > > **/*
> > > >
> > > > *public void registerCachedFile(String filePath, String name,
> > > > DistributedCache.BlobType blobType) {...}*
> > > >
> > > > Optionally, we can add another interface to register UDF Jars which
> > will
> > > > use the interface above to implement.
> > > >
> > > > *public void registerJarFile(String filePath, String name) {...}*
> > > >
> > > > The existing interface in the following will be marked deprecated:
> > > >
> > > > *public void registerCachedFile(String filePath, String name, boolean
> > > > executable) {...}*
> > > >
> > > > And the following interface will be implemented using the new
> interface
> > > > proposed above with a EXECUTABLE BlobType:
> > > >
> > > > *public void registerCachedFile(String filePath, String name) { ...
> }*
> > > >
> > > > Thanks a lot.
> > > > Shuyi
> > > >
> > > > "So you have to trust that the dots will somehow connect in your
> > future."
> > > >
> > >
> >
>

Reply via email to