Yes, we are using the addJar functionionality of the JobGraph as well for the SQL Client.

I think the execution environment is not the right place to specify jars. The location of the jars depends on the submission method. If a local path is specified in the main() method of a packaged Flink jar, it would not work when such a program is submitted through the REST API.

Regards,
Timo

Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
I think this functionality is already there, we just have to expose it in the 
right places: ClusterClient.submitJob() takes a JobGraph, JobGraph has method 
addJar() for adding jars that need to be in the classloader for executing a 
user program.

On 16. May 2018, at 12:34, Fabian Hueske <fhue...@gmail.com> wrote:

Hi Ted,

The design doc is in late draft status and proposes support for SQL DDL
statements (CREATE TABLE, CREATE  FUNCTION, etc.).
The question about registering JARs came up because we need a way to
distribute JAR files that contain the code of user-defined functions.

The design doc will soon be shared on the dev mailing list to gather
feedback from the community.

Best, Fabian

2018-05-16 10:45 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:

bq. In a design document, Timo mentioned that we can ship multiple JAR
files

Mind telling us where the design doc can be retrieved ?

Thanks

On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <fhue...@gmail.com> wrote:

Hi,

I'm not sure if we need to modify the existing method.
What we need is a bit different from what registerCachedFile() provides.
The method ensures that a file is copied to each TaskManager and can be
locally accessed from a function's RuntimeContext.
In our case, we don't need to access the file but would like to make sure
that it is loaded into the class loader.
So, we could also just add a method like registerUserJarFile().

In a design document, Timo mentioned that we can ship multiple JAR files
with a job.
So, we could also implement the UDF shipping logic by loading the Jar
file(s) to the client and distribute them from there.
In that case, we would not need to add new method to the execution
environment.

Best,
Fabian

2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>:

+1. This could be very useful for "dynamic" UDF.

Just to clarify, if I understand correctly, we are tying to use an ENUM
indicator to
(1) Replace the current Boolean isExecutable flag.
(2) Provide additional information used by ExecutionEnvironment to
decide
when/where to use the DistributedCached file.

In this case, DistributedCache.CacheType or DistributedCache.FileType
sounds more intuitive, what do you think?

Also, I was wondering is there any other useful information for the
cached
file to be passed to runtime.
If we are just talking about including the library to the classloader,
can
we directly extend the interface with

public void registerCachedFile(
    String filePath,
    String name,
    boolean executable,
    boolean includeInClassLoader)


Thanks,
Rong

On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <suez1...@gmail.com>
wrote:
Hi Flink devs,

In an effort to support loading external libraries and creating UDFs
from
external libraries using DDL in Flink SQL, we want to use Flink’s
Blob
Server to distribute the external libraries in runtime and load those
libraries into the user code classloader automatically.

However, the current [Stream]ExecutionEnvironment.registerCachedFile
interface limits only to registering executable or non-executable
blobs.
It’s not possible to tell in runtime if the blob files are libraries
and
should be loaded into the user code classloader in RuntimeContext.
Therefore, I want to propose to add an enum called *BlobType*
explicitly
to
indicate the type of the Blob file being distributed, and the
following
interface in [Stream]ExecutionEnvironment to support it. In general,
I
think the new BlobType information can be used by Flink runtime to
preprocess the Blob files if needed.

*/***
** Registers a file at the distributed cache under the given name.
The
file
will be accessible*
** from any user-defined function in the (distributed) runtime under
a
local path. Files*
** may be local files (as long as all relevant workers have access to
it),
or files in a distributed file system.*
** The runtime will copy the files temporarily to a local cache, if
needed.*
***
** <p>The {@link org.apache.flink.api.common.
functions.RuntimeContext}
can
be obtained inside UDFs via*
** {@link
org.apache.flink.api.common.functions.RichFunction#
getRuntimeContext()}
and
provides access*
** {@link org.apache.flink.api.common.ca
<http://org.apache.flink.api.common.ca>che.DistributedCache} via*
** {@link
org.apache.flink.api.common.functions.RuntimeContext#
getDistributedCache()}.*
***
** @param filePath The path of the file, as a URI (e.g.
"file:///some/path"
or "hdfs://host:port/and/path")*
** @param name The name under which the file is registered.*
** @param blobType indicating the type of the Blob file*
**/*

*public void registerCachedFile(String filePath, String name,
DistributedCache.BlobType blobType) {...}*

Optionally, we can add another interface to register UDF Jars which
will
use the interface above to implement.

*public void registerJarFile(String filePath, String name) {...}*

The existing interface in the following will be marked deprecated:

*public void registerCachedFile(String filePath, String name, boolean
executable) {...}*

And the following interface will be implemented using the new
interface
proposed above with a EXECUTABLE BlobType:

*public void registerCachedFile(String filePath, String name) { ...
}*
Thanks a lot.
Shuyi

"So you have to trust that the dots will somehow connect in your
future."


Reply via email to