Hi, Shuyi:

What is the progress of the discussion?  We also look forward to this
feature.
Thanks.

Shuyi Chen <suez1...@gmail.com> 于2018年6月8日周五 下午3:04写道:

> Thanks a lot for the comments, Till and Fabian.
>
> The RemoteEnvrionment does provide a way to specify jar files at
> construction, but we want the jar files to be specified dynamically in the
> user code, e.g. in a DDL statement, and the jar files might be in a remote
> DFS. As we discussed, I think there are 2 approaches:
>
> 1) add new interface env.registerJarFile(jarFiles...), which ships the JAR
> files using JobGraph.addJar(). In this case, all jars will be loaded by
> default at runtime. This approach will be the same as how SQL client ship
> UDF jars now.
> 2) add new interface env.registerJarFile(name, jarFiles...). It will do
> similar things as env.registerCachedFile(), which will register a set of
> Jar files with a key name, and we can add a new interface in
> RuntimeContext as Fabian suggests, i.e.,
> RuntimeContext.getClassloaderWithJar(<key name>). Now user will be able to
> load the functions in remote jar dynamically using the returned
> ClassLoader.
>
> Comparing the 2 approaches:
>
>    - Approach 1) will be simpler for user to use.
>    - Approach 2) will allow us to use different versions of a class in the
>    same code, and might solve some dependency conflict issues. Also in 2),
> we
>    can load Jars on demand, while in 1) all jars will be loaded by default.
>
> I think we can support both interfaces. On the SQL DDL implementation, both
> will work and approach 2) will be more complicated, but with some nice
> benefit as stated above. However, the implementation choice should be
> transparent to the end user. Also, I am wondering outside of the SQL DDL,
> will these new functionality/interface be helpful in other scenarios?
> Maybe, that will help make the interface better and more generic. Thanks a
> lot.
>
> Shuyi
>
> On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
> > We could also offer a feature that users can request classloaders with
> > additional jars.
> > This could work as follows:
> >
> > 1) Users register jar files in the ExecutionEnvironment (similar to
> cached
> > files) with a name, e.g., env.registerJarFile("~/myJar.jar", "myName");
> > 2) In a function, the user can request a user classloader with the
> > additional classes, e.g., RuntimeContext.getClassloaderWithJar("myName");
> > This could also support to load multiple jar files in the same
> classloader.
> >
> > IMO, the interesting part of Shuyi's proposal is to be able to
> dynamically
> > load code from remote locations without fetching it to the client first.
> >
> > Best, Fabian
> >
> >
> > 2018-05-29 12:42 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
> >
> > > I see Shuyi's point that it would nice to allow adding jar files which
> > > should be part of the user code classloader programmatically. Actually,
> > we
> > > expose this functionality in the `RemoteEnvironment` where you can
> > specify
> > > additional jars which shall be shipped to the cluster in the
> > constructor. I
> > > assume that is exactly the functionality you are looking for. In that
> > > sense, it might be an API inconsistency that we allow it for some cases
> > and
> > > for others not.
> > >
> > > But I could also see that the whole functionality of dynamically
> loading
> > > jars at runtime could also perfectly live in the `UdfSqlOperator`.
> This,
> > of
> > > course, would entail that one has to take care of clean up of the
> > > downloaded resources. But it should be possible to first download the
> > > resources and create a custom URLClassLoader at startup and then use
> this
> > > class loader when calling into the UDF.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen <suez1...@gmail.com>
> wrote:
> > >
> > > > Hi Aljoscha, Fabian, Rong, Ted and Timo,
> > > >
> > > > Thanks a lot for the feedback. Let me clarify the usage scenario in a
> > bit
> > > > more detail. The context is that we want to add support for SQL DDL
> to
> > > load
> > > > UDF from external JARs located either in local filesystem or HDFS or
> a
> > > HTTP
> > > > endpoint in Flink SQL. The local FS option is more for debugging
> > purpose
> > > > for user to submit the job jar locally, and the later 2 are for
> > > production
> > > > uses. Below is an example User application with the *CREATE FUNCTION*
> > DDL
> > > > (Note: grammar and interface not finalized yet).
> > > >
> > > > ------------------------------------------------------------
> > > > -------------------------------------
> > > >
> > > >
> > > >
> > > >
> > > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval
> tEnv =
> > > > TableEnvironment.getTableEnvironment(env)// setup the
> > > DataStream//......*
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > *// register the DataStream under the name
> > > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> > > > 'amount)tEnv.sqlDDL(  "create function helloFunc as
> > > > 'com.example.udf.HelloWorld' using jars
> > > > ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val
> result
> > > =
> > > > tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM OrderA
> > > WHERE
> > > > amount > 2")result.toAppendStream[Order].print()env.execute()*
> > > > ------------------------------------------------------------
> > > > -------------------------------------
> > > >
> > > > The example application above does the following:
> > > > 1) it registers a DataStream as a Calcite table(
> > > > *org.apache.calcite.schema.Table*) under name "OrderA", so SQL can
> > > > reference the DataStream as table "OrderA".
> > > > 2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not
> > > > finalized yet) to create a SQL UDF called *helloFunc* from a JAR
> > located
> > > in
> > > > a remote HDFS path.
> > > > 3) it issues a sql query that uses the *helloFunc* UDF defined above
> > and
> > > > generate a Flink table (*org.apache.flink.table.api.Table*)
> > > > 4) it convert the Flink table back to a DataStream and print it.
> > > >
> > > > Step 1), 3), and 4) are already implemented. To implement 2), we need
> > to
> > > do
> > > > the following to implement the *tEnv.sqlDDL()* function.
> > > >
> > > > a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*,
> UDF
> > > > remote path *udfUrls[]* and UDF SQL name *udfName*.
> > > > b) use the URLClassLoader to load the JARs specified in *udfUrls[]*,
> > and
> > > > register the SQL UDF using the {Batch/Stream/}TableEnvironment
> > > > registerFunction methods using*  udfClasspath* under name *udfName.*
> > > > c) register the JARs *udfUrls[]* through the
> > > {Stream}ExecutionEnvironment,
> > > > so that the JARs can be distributed to all the TaskManagers during
> > > runtime.
> > > >
> > > >
> > > > Since the CREATE FUNCTION DDL is executed within the user
> application,
> > I
> > > > dont think we have access to the ClusterClient at the point when
> > > > *tEnv.sqlDDL()* is executed. Also the JARs can be in a remote
> > filesystem
> > > > (which is the main usage scenarios), so the user can't really prepare
> > the
> > > > jar somehow in advance statically.
> > > >
> > > > For normal user application, I think {Stream}ExecutionEnvironment is
> > the
> > > > right place for the functionality, since it provides methods to
> control
> > > the
> > > > job execution and to interact with the outside world, and also, it
> > > actually
> > > > already does similar things provided through the *registerCachedFile*
> > > > interface.
> > > >
> > > > However, in such case, SQL FUNCTION DDL and SQL client will use 2
> > > different
> > > > routes to register UDF jars, one through *JobGraph.jobConfiguration*
> > and
> > > > the other through *JobGraph.userJars*. So *maybe we can, as Fabian
> > > > suggests, add **registerUserJarFile()/getUserJarFiles() interfaces
> > > > in {Stream}ExecutionEnvironment, which stores the jars internally in
> a
> > > > List, and when generating JobGraph, copy the jars to the JobGraph
> using
> > > > the  {Stream}ExecutionEnvironment.getUserJarFiles() and
> > > > JobGraph.addJar()* (Note,
> > > > streaming and batch implementations might vary). In such case, both
> SQL
> > > > FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship the
> > UDF
> > > > jars.
> > > >
> > > > Hope that clarifies better. What do you guys think? Thanks a lot.
> > > >
> > > > Cheers!
> > > > Shuyi
> > > >
> > > > On Wed, May 16, 2018 at 9:45 AM, Rong Rong <walter...@gmail.com>
> > wrote:
> > > >
> > > > > I think the question here is whether registering Jar files (or
> other
> > > > > executable files) during job submission is sufficient for @shuyi's
> > use
> > > > > case.
> > > > >
> > > > > If I understand correctly regarding the part of dynamic
> distribution
> > of
> > > > the
> > > > > external libraries in runtime. This is used to deal with DDL/DSL
> such
> > > as:
> > > > >     CREATE FUNCTION my_fun FROM url://<some_remote_jar>
> > > > > during execution. Correct me if I am wrong @shuyi, The basic
> > assumption
> > > > > that "we can locate and ship all executable JARs during job
> > submission"
> > > > no
> > > > > longer holds for your use case right?
> > > > >
> > > > > I guess we are missing details here regarding the "distribution of
> > > > external
> > > > > libraries in runtime" part. Maybe you can share more example of
> this
> > > use
> > > > > case. Would this be included in the design doc @Timo?
> > > > >
> > > > > --
> > > > > Rong
> > > > >
> > > > > On Wed, May 16, 2018 at 5:41 AM, Timo Walther <twal...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Yes, we are using the addJar functionionality of the JobGraph as
> > well
> > > > for
> > > > > > the SQL Client.
> > > > > >
> > > > > > I think the execution environment is not the right place to
> specify
> > > > jars.
> > > > > > The location of the jars depends on the submission method. If a
> > local
> > > > > path
> > > > > > is specified in the main() method of a packaged Flink jar, it
> would
> > > not
> > > > > > work when such a program is submitted through the REST API.
> > > > > >
> > > > > > Regards,
> > > > > > Timo
> > > > > >
> > > > > > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> > > > > >
> > > > > > I think this functionality is already there, we just have to
> expose
> > > it
> > > > in
> > > > > >> the right places: ClusterClient.submitJob() takes a JobGraph,
> > > JobGraph
> > > > > has
> > > > > >> method addJar() for adding jars that need to be in the
> classloader
> > > for
> > > > > >> executing a user program.
> > > > > >>
> > > > > >> On 16. May 2018, at 12:34, Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > > > > >>>
> > > > > >>> Hi Ted,
> > > > > >>>
> > > > > >>> The design doc is in late draft status and proposes support for
> > SQL
> > > > DDL
> > > > > >>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> > > > > >>> The question about registering JARs came up because we need a
> way
> > > to
> > > > > >>> distribute JAR files that contain the code of user-defined
> > > functions.
> > > > > >>>
> > > > > >>> The design doc will soon be shared on the dev mailing list to
> > > gather
> > > > > >>> feedback from the community.
> > > > > >>>
> > > > > >>> Best, Fabian
> > > > > >>>
> > > > > >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:
> > > > > >>>
> > > > > >>> bq. In a design document, Timo mentioned that we can ship
> > multiple
> > > > JAR
> > > > > >>>> files
> > > > > >>>>
> > > > > >>>> Mind telling us where the design doc can be retrieved ?
> > > > > >>>>
> > > > > >>>> Thanks
> > > > > >>>>
> > > > > >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <
> > fhue...@gmail.com
> > > >
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Hi,
> > > > > >>>>>
> > > > > >>>>> I'm not sure if we need to modify the existing method.
> > > > > >>>>> What we need is a bit different from what
> registerCachedFile()
> > > > > >>>>> provides.
> > > > > >>>>> The method ensures that a file is copied to each TaskManager
> > and
> > > > can
> > > > > be
> > > > > >>>>> locally accessed from a function's RuntimeContext.
> > > > > >>>>> In our case, we don't need to access the file but would like
> to
> > > > make
> > > > > >>>>> sure
> > > > > >>>>> that it is loaded into the class loader.
> > > > > >>>>> So, we could also just add a method like
> registerUserJarFile().
> > > > > >>>>>
> > > > > >>>>> In a design document, Timo mentioned that we can ship
> multiple
> > > JAR
> > > > > >>>>> files
> > > > > >>>>> with a job.
> > > > > >>>>> So, we could also implement the UDF shipping logic by loading
> > the
> > > > Jar
> > > > > >>>>> file(s) to the client and distribute them from there.
> > > > > >>>>> In that case, we would not need to add new method to the
> > > execution
> > > > > >>>>> environment.
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Fabian
> > > > > >>>>>
> > > > > >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>:
> > > > > >>>>>
> > > > > >>>>> +1. This could be very useful for "dynamic" UDF.
> > > > > >>>>>>
> > > > > >>>>>> Just to clarify, if I understand correctly, we are tying to
> > use
> > > an
> > > > > >>>>>> ENUM
> > > > > >>>>>> indicator to
> > > > > >>>>>> (1) Replace the current Boolean isExecutable flag.
> > > > > >>>>>> (2) Provide additional information used by
> > ExecutionEnvironment
> > > to
> > > > > >>>>>>
> > > > > >>>>> decide
> > > > > >>>>
> > > > > >>>>> when/where to use the DistributedCached file.
> > > > > >>>>>>
> > > > > >>>>>> In this case, DistributedCache.CacheType or
> > > > > DistributedCache.FileType
> > > > > >>>>>> sounds more intuitive, what do you think?
> > > > > >>>>>>
> > > > > >>>>>> Also, I was wondering is there any other useful information
> > for
> > > > the
> > > > > >>>>>>
> > > > > >>>>> cached
> > > > > >>>>>
> > > > > >>>>>> file to be passed to runtime.
> > > > > >>>>>> If we are just talking about including the library to the
> > > > > classloader,
> > > > > >>>>>>
> > > > > >>>>> can
> > > > > >>>>>
> > > > > >>>>>> we directly extend the interface with
> > > > > >>>>>>
> > > > > >>>>>> public void registerCachedFile(
> > > > > >>>>>>     String filePath,
> > > > > >>>>>>     String name,
> > > > > >>>>>>     boolean executable,
> > > > > >>>>>>     boolean includeInClassLoader)
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> Thanks,
> > > > > >>>>>> Rong
> > > > > >>>>>>
> > > > > >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <
> > > suez1...@gmail.com>
> > > > > >>>>>>
> > > > > >>>>> wrote:
> > > > > >>>>
> > > > > >>>>> Hi Flink devs,
> > > > > >>>>>>>
> > > > > >>>>>>> In an effort to support loading external libraries and
> > creating
> > > > > UDFs
> > > > > >>>>>>>
> > > > > >>>>>> from
> > > > > >>>>>
> > > > > >>>>>> external libraries using DDL in Flink SQL, we want to use
> > > Flink’s
> > > > > >>>>>>>
> > > > > >>>>>> Blob
> > > > > >>>>
> > > > > >>>>> Server to distribute the external libraries in runtime and
> load
> > > > those
> > > > > >>>>>>> libraries into the user code classloader automatically.
> > > > > >>>>>>>
> > > > > >>>>>>> However, the current [Stream]ExecutionEnvironment.
> > > > > registerCachedFile
> > > > > >>>>>>> interface limits only to registering executable or
> > > non-executable
> > > > > >>>>>>>
> > > > > >>>>>> blobs.
> > > > > >>>>>
> > > > > >>>>>> It’s not possible to tell in runtime if the blob files are
> > > > libraries
> > > > > >>>>>>>
> > > > > >>>>>> and
> > > > > >>>>>
> > > > > >>>>>> should be loaded into the user code classloader in
> > > RuntimeContext.
> > > > > >>>>>>> Therefore, I want to propose to add an enum called
> *BlobType*
> > > > > >>>>>>>
> > > > > >>>>>> explicitly
> > > > > >>>>>
> > > > > >>>>>> to
> > > > > >>>>>>
> > > > > >>>>>>> indicate the type of the Blob file being distributed, and
> the
> > > > > >>>>>>>
> > > > > >>>>>> following
> > > > > >>>>
> > > > > >>>>> interface in [Stream]ExecutionEnvironment to support it. In
> > > > general,
> > > > > >>>>>>>
> > > > > >>>>>> I
> > > > > >>>>
> > > > > >>>>> think the new BlobType information can be used by Flink
> runtime
> > > to
> > > > > >>>>>>> preprocess the Blob files if needed.
> > > > > >>>>>>>
> > > > > >>>>>>> */***
> > > > > >>>>>>> ** Registers a file at the distributed cache under the
> given
> > > > name.
> > > > > >>>>>>>
> > > > > >>>>>> The
> > > > > >>>>
> > > > > >>>>> file
> > > > > >>>>>>
> > > > > >>>>>>> will be accessible*
> > > > > >>>>>>> ** from any user-defined function in the (distributed)
> > runtime
> > > > > under
> > > > > >>>>>>>
> > > > > >>>>>> a
> > > > > >>>>
> > > > > >>>>> local path. Files*
> > > > > >>>>>>> ** may be local files (as long as all relevant workers have
> > > > access
> > > > > to
> > > > > >>>>>>>
> > > > > >>>>>> it),
> > > > > >>>>>>
> > > > > >>>>>>> or files in a distributed file system.*
> > > > > >>>>>>> ** The runtime will copy the files temporarily to a local
> > > cache,
> > > > if
> > > > > >>>>>>> needed.*
> > > > > >>>>>>> ***
> > > > > >>>>>>> ** <p>The {@link org.apache.flink.api.common.
> > > > > >>>>>>>
> > > > > >>>>>> functions.RuntimeContext}
> > > > > >>>>
> > > > > >>>>> can
> > > > > >>>>>>
> > > > > >>>>>>> be obtained inside UDFs via*
> > > > > >>>>>>> ** {@link
> > > > > >>>>>>> org.apache.flink.api.common.functions.RichFunction#
> > > > > >>>>>>>
> > > > > >>>>>> getRuntimeContext()}
> > > > > >>>>>
> > > > > >>>>>> and
> > > > > >>>>>>> provides access*
> > > > > >>>>>>> ** {@link org.apache.flink.api.common.ca
> > > > > >>>>>>> <http://org.apache.flink.api.common.ca
> >che.DistributedCache}
> > > > via*
> > > > > >>>>>>> ** {@link
> > > > > >>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
> > > > > >>>>>>> getDistributedCache()}.*
> > > > > >>>>>>> ***
> > > > > >>>>>>> ** @param filePath The path of the file, as a URI (e.g.
> > > > > >>>>>>>
> > > > > >>>>>> "file:///some/path"
> > > > > >>>>>>
> > > > > >>>>>>> or "hdfs://host:port/and/path")*
> > > > > >>>>>>> ** @param name The name under which the file is
> registered.*
> > > > > >>>>>>> ** @param blobType indicating the type of the Blob file*
> > > > > >>>>>>> **/*
> > > > > >>>>>>>
> > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> name,
> > > > > >>>>>>> DistributedCache.BlobType blobType) {...}*
> > > > > >>>>>>>
> > > > > >>>>>>> Optionally, we can add another interface to register UDF
> Jars
> > > > which
> > > > > >>>>>>>
> > > > > >>>>>> will
> > > > > >>>>>
> > > > > >>>>>> use the interface above to implement.
> > > > > >>>>>>>
> > > > > >>>>>>> *public void registerJarFile(String filePath, String name)
> > > {...}*
> > > > > >>>>>>>
> > > > > >>>>>>> The existing interface in the following will be marked
> > > > deprecated:
> > > > > >>>>>>>
> > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> name,
> > > > > boolean
> > > > > >>>>>>> executable) {...}*
> > > > > >>>>>>>
> > > > > >>>>>>> And the following interface will be implemented using the
> new
> > > > > >>>>>>>
> > > > > >>>>>> interface
> > > > > >>>>
> > > > > >>>>> proposed above with a EXECUTABLE BlobType:
> > > > > >>>>>>>
> > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> > name) {
> > > > ...
> > > > > >>>>>>>
> > > > > >>>>>> }*
> > > > > >>>>
> > > > > >>>>> Thanks a lot.
> > > > > >>>>>>> Shuyi
> > > > > >>>>>>>
> > > > > >>>>>>> "So you have to trust that the dots will somehow connect in
> > > your
> > > > > >>>>>>>
> > > > > >>>>>> future."
> > > > > >>>>>
> > > > > >>>>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > "So you have to trust that the dots will somehow connect in your
> > future."
> > > >
> > >
> >
>
>
> --
> "So you have to trust that the dots will somehow connect in your future."
>

Reply via email to