Hi Marvin,

The discussion was not resolved yet.
Can you tell us a bit about your use case? Maybe that would help with the
decision.
Which of the discussed approaches would work best for your use case and why?

Thanks, Fabian

2018-06-25 13:27 GMT+02:00 Marvin777 <xymaqingxiang...@gmail.com>:

> Hi, Shuyi:
>
> What is the progress of the discussion?  We also look forward to this
> feature.
> Thanks.
>
> Shuyi Chen <suez1...@gmail.com> 于2018年6月8日周五 下午3:04写道:
>
> > Thanks a lot for the comments, Till and Fabian.
> >
> > The RemoteEnvrionment does provide a way to specify jar files at
> > construction, but we want the jar files to be specified dynamically in
> the
> > user code, e.g. in a DDL statement, and the jar files might be in a
> remote
> > DFS. As we discussed, I think there are 2 approaches:
> >
> > 1) add new interface env.registerJarFile(jarFiles...), which ships the
> JAR
> > files using JobGraph.addJar(). In this case, all jars will be loaded by
> > default at runtime. This approach will be the same as how SQL client ship
> > UDF jars now.
> > 2) add new interface env.registerJarFile(name, jarFiles...). It will do
> > similar things as env.registerCachedFile(), which will register a set of
> > Jar files with a key name, and we can add a new interface in
> > RuntimeContext as Fabian suggests, i.e.,
> > RuntimeContext.getClassloaderWithJar(<key name>). Now user will be able
> to
> > load the functions in remote jar dynamically using the returned
> > ClassLoader.
> >
> > Comparing the 2 approaches:
> >
> >    - Approach 1) will be simpler for user to use.
> >    - Approach 2) will allow us to use different versions of a class in
> the
> >    same code, and might solve some dependency conflict issues. Also in
> 2),
> > we
> >    can load Jars on demand, while in 1) all jars will be loaded by
> default.
> >
> > I think we can support both interfaces. On the SQL DDL implementation,
> both
> > will work and approach 2) will be more complicated, but with some nice
> > benefit as stated above. However, the implementation choice should be
> > transparent to the end user. Also, I am wondering outside of the SQL DDL,
> > will these new functionality/interface be helpful in other scenarios?
> > Maybe, that will help make the interface better and more generic. Thanks
> a
> > lot.
> >
> > Shuyi
> >
> > On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske <fhue...@gmail.com> wrote:
> >
> > > We could also offer a feature that users can request classloaders with
> > > additional jars.
> > > This could work as follows:
> > >
> > > 1) Users register jar files in the ExecutionEnvironment (similar to
> > cached
> > > files) with a name, e.g., env.registerJarFile("~/myJar.jar",
> "myName");
> > > 2) In a function, the user can request a user classloader with the
> > > additional classes, e.g., RuntimeContext.
> getClassloaderWithJar("myName");
> > > This could also support to load multiple jar files in the same
> > classloader.
> > >
> > > IMO, the interesting part of Shuyi's proposal is to be able to
> > dynamically
> > > load code from remote locations without fetching it to the client
> first.
> > >
> > > Best, Fabian
> > >
> > >
> > > 2018-05-29 12:42 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
> > >
> > > > I see Shuyi's point that it would nice to allow adding jar files
> which
> > > > should be part of the user code classloader programmatically.
> Actually,
> > > we
> > > > expose this functionality in the `RemoteEnvironment` where you can
> > > specify
> > > > additional jars which shall be shipped to the cluster in the
> > > constructor. I
> > > > assume that is exactly the functionality you are looking for. In that
> > > > sense, it might be an API inconsistency that we allow it for some
> cases
> > > and
> > > > for others not.
> > > >
> > > > But I could also see that the whole functionality of dynamically
> > loading
> > > > jars at runtime could also perfectly live in the `UdfSqlOperator`.
> > This,
> > > of
> > > > course, would entail that one has to take care of clean up of the
> > > > downloaded resources. But it should be possible to first download the
> > > > resources and create a custom URLClassLoader at startup and then use
> > this
> > > > class loader when calling into the UDF.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen <suez1...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Aljoscha, Fabian, Rong, Ted and Timo,
> > > > >
> > > > > Thanks a lot for the feedback. Let me clarify the usage scenario
> in a
> > > bit
> > > > > more detail. The context is that we want to add support for SQL DDL
> > to
> > > > load
> > > > > UDF from external JARs located either in local filesystem or HDFS
> or
> > a
> > > > HTTP
> > > > > endpoint in Flink SQL. The local FS option is more for debugging
> > > purpose
> > > > > for user to submit the job jar locally, and the later 2 are for
> > > > production
> > > > > uses. Below is an example User application with the *CREATE
> FUNCTION*
> > > DDL
> > > > > (Note: grammar and interface not finalized yet).
> > > > >
> > > > > ------------------------------------------------------------
> > > > > -------------------------------------
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval
> > tEnv =
> > > > > TableEnvironment.getTableEnvironment(env)// setup the
> > > > DataStream//......*
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > *// register the DataStream under the name
> > > > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> > > > > 'amount)tEnv.sqlDDL(  "create function helloFunc as
> > > > > 'com.example.udf.HelloWorld' using jars
> > > > > ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val
> > result
> > > > =
> > > > > tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM
> OrderA
> > > > WHERE
> > > > > amount > 2")result.toAppendStream[Order].print()env.execute()*
> > > > > ------------------------------------------------------------
> > > > > -------------------------------------
> > > > >
> > > > > The example application above does the following:
> > > > > 1) it registers a DataStream as a Calcite table(
> > > > > *org.apache.calcite.schema.Table*) under name "OrderA", so SQL can
> > > > > reference the DataStream as table "OrderA".
> > > > > 2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not
> > > > > finalized yet) to create a SQL UDF called *helloFunc* from a JAR
> > > located
> > > > in
> > > > > a remote HDFS path.
> > > > > 3) it issues a sql query that uses the *helloFunc* UDF defined
> above
> > > and
> > > > > generate a Flink table (*org.apache.flink.table.api.Table*)
> > > > > 4) it convert the Flink table back to a DataStream and print it.
> > > > >
> > > > > Step 1), 3), and 4) are already implemented. To implement 2), we
> need
> > > to
> > > > do
> > > > > the following to implement the *tEnv.sqlDDL()* function.
> > > > >
> > > > > a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*,
> > UDF
> > > > > remote path *udfUrls[]* and UDF SQL name *udfName*.
> > > > > b) use the URLClassLoader to load the JARs specified in
> *udfUrls[]*,
> > > and
> > > > > register the SQL UDF using the {Batch/Stream/}TableEnvironment
> > > > > registerFunction methods using*  udfClasspath* under name
> *udfName.*
> > > > > c) register the JARs *udfUrls[]* through the
> > > > {Stream}ExecutionEnvironment,
> > > > > so that the JARs can be distributed to all the TaskManagers during
> > > > runtime.
> > > > >
> > > > >
> > > > > Since the CREATE FUNCTION DDL is executed within the user
> > application,
> > > I
> > > > > dont think we have access to the ClusterClient at the point when
> > > > > *tEnv.sqlDDL()* is executed. Also the JARs can be in a remote
> > > filesystem
> > > > > (which is the main usage scenarios), so the user can't really
> prepare
> > > the
> > > > > jar somehow in advance statically.
> > > > >
> > > > > For normal user application, I think {Stream}ExecutionEnvironment
> is
> > > the
> > > > > right place for the functionality, since it provides methods to
> > control
> > > > the
> > > > > job execution and to interact with the outside world, and also, it
> > > > actually
> > > > > already does similar things provided through the
> *registerCachedFile*
> > > > > interface.
> > > > >
> > > > > However, in such case, SQL FUNCTION DDL and SQL client will use 2
> > > > different
> > > > > routes to register UDF jars, one through
> *JobGraph.jobConfiguration*
> > > and
> > > > > the other through *JobGraph.userJars*. So *maybe we can, as Fabian
> > > > > suggests, add **registerUserJarFile()/getUserJarFiles() interfaces
> > > > > in {Stream}ExecutionEnvironment, which stores the jars internally
> in
> > a
> > > > > List, and when generating JobGraph, copy the jars to the JobGraph
> > using
> > > > > the  {Stream}ExecutionEnvironment.getUserJarFiles() and
> > > > > JobGraph.addJar()* (Note,
> > > > > streaming and batch implementations might vary). In such case, both
> > SQL
> > > > > FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship
> the
> > > UDF
> > > > > jars.
> > > > >
> > > > > Hope that clarifies better. What do you guys think? Thanks a lot.
> > > > >
> > > > > Cheers!
> > > > > Shuyi
> > > > >
> > > > > On Wed, May 16, 2018 at 9:45 AM, Rong Rong <walter...@gmail.com>
> > > wrote:
> > > > >
> > > > > > I think the question here is whether registering Jar files (or
> > other
> > > > > > executable files) during job submission is sufficient for
> @shuyi's
> > > use
> > > > > > case.
> > > > > >
> > > > > > If I understand correctly regarding the part of dynamic
> > distribution
> > > of
> > > > > the
> > > > > > external libraries in runtime. This is used to deal with DDL/DSL
> > such
> > > > as:
> > > > > >     CREATE FUNCTION my_fun FROM url://<some_remote_jar>
> > > > > > during execution. Correct me if I am wrong @shuyi, The basic
> > > assumption
> > > > > > that "we can locate and ship all executable JARs during job
> > > submission"
> > > > > no
> > > > > > longer holds for your use case right?
> > > > > >
> > > > > > I guess we are missing details here regarding the "distribution
> of
> > > > > external
> > > > > > libraries in runtime" part. Maybe you can share more example of
> > this
> > > > use
> > > > > > case. Would this be included in the design doc @Timo?
> > > > > >
> > > > > > --
> > > > > > Rong
> > > > > >
> > > > > > On Wed, May 16, 2018 at 5:41 AM, Timo Walther <
> twal...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Yes, we are using the addJar functionionality of the JobGraph
> as
> > > well
> > > > > for
> > > > > > > the SQL Client.
> > > > > > >
> > > > > > > I think the execution environment is not the right place to
> > specify
> > > > > jars.
> > > > > > > The location of the jars depends on the submission method. If a
> > > local
> > > > > > path
> > > > > > > is specified in the main() method of a packaged Flink jar, it
> > would
> > > > not
> > > > > > > work when such a program is submitted through the REST API.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Timo
> > > > > > >
> > > > > > > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek:
> > > > > > >
> > > > > > > I think this functionality is already there, we just have to
> > expose
> > > > it
> > > > > in
> > > > > > >> the right places: ClusterClient.submitJob() takes a JobGraph,
> > > > JobGraph
> > > > > > has
> > > > > > >> method addJar() for adding jars that need to be in the
> > classloader
> > > > for
> > > > > > >> executing a user program.
> > > > > > >>
> > > > > > >> On 16. May 2018, at 12:34, Fabian Hueske <fhue...@gmail.com>
> > > wrote:
> > > > > > >>>
> > > > > > >>> Hi Ted,
> > > > > > >>>
> > > > > > >>> The design doc is in late draft status and proposes support
> for
> > > SQL
> > > > > DDL
> > > > > > >>> statements (CREATE TABLE, CREATE  FUNCTION, etc.).
> > > > > > >>> The question about registering JARs came up because we need a
> > way
> > > > to
> > > > > > >>> distribute JAR files that contain the code of user-defined
> > > > functions.
> > > > > > >>>
> > > > > > >>> The design doc will soon be shared on the dev mailing list to
> > > > gather
> > > > > > >>> feedback from the community.
> > > > > > >>>
> > > > > > >>> Best, Fabian
> > > > > > >>>
> > > > > > >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:
> > > > > > >>>
> > > > > > >>> bq. In a design document, Timo mentioned that we can ship
> > > multiple
> > > > > JAR
> > > > > > >>>> files
> > > > > > >>>>
> > > > > > >>>> Mind telling us where the design doc can be retrieved ?
> > > > > > >>>>
> > > > > > >>>> Thanks
> > > > > > >>>>
> > > > > > >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske <
> > > fhue...@gmail.com
> > > > >
> > > > > > >>>> wrote:
> > > > > > >>>>
> > > > > > >>>> Hi,
> > > > > > >>>>>
> > > > > > >>>>> I'm not sure if we need to modify the existing method.
> > > > > > >>>>> What we need is a bit different from what
> > registerCachedFile()
> > > > > > >>>>> provides.
> > > > > > >>>>> The method ensures that a file is copied to each
> TaskManager
> > > and
> > > > > can
> > > > > > be
> > > > > > >>>>> locally accessed from a function's RuntimeContext.
> > > > > > >>>>> In our case, we don't need to access the file but would
> like
> > to
> > > > > make
> > > > > > >>>>> sure
> > > > > > >>>>> that it is loaded into the class loader.
> > > > > > >>>>> So, we could also just add a method like
> > registerUserJarFile().
> > > > > > >>>>>
> > > > > > >>>>> In a design document, Timo mentioned that we can ship
> > multiple
> > > > JAR
> > > > > > >>>>> files
> > > > > > >>>>> with a job.
> > > > > > >>>>> So, we could also implement the UDF shipping logic by
> loading
> > > the
> > > > > Jar
> > > > > > >>>>> file(s) to the client and distribute them from there.
> > > > > > >>>>> In that case, we would not need to add new method to the
> > > > execution
> > > > > > >>>>> environment.
> > > > > > >>>>>
> > > > > > >>>>> Best,
> > > > > > >>>>> Fabian
> > > > > > >>>>>
> > > > > > >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>:
> > > > > > >>>>>
> > > > > > >>>>> +1. This could be very useful for "dynamic" UDF.
> > > > > > >>>>>>
> > > > > > >>>>>> Just to clarify, if I understand correctly, we are tying
> to
> > > use
> > > > an
> > > > > > >>>>>> ENUM
> > > > > > >>>>>> indicator to
> > > > > > >>>>>> (1) Replace the current Boolean isExecutable flag.
> > > > > > >>>>>> (2) Provide additional information used by
> > > ExecutionEnvironment
> > > > to
> > > > > > >>>>>>
> > > > > > >>>>> decide
> > > > > > >>>>
> > > > > > >>>>> when/where to use the DistributedCached file.
> > > > > > >>>>>>
> > > > > > >>>>>> In this case, DistributedCache.CacheType or
> > > > > > DistributedCache.FileType
> > > > > > >>>>>> sounds more intuitive, what do you think?
> > > > > > >>>>>>
> > > > > > >>>>>> Also, I was wondering is there any other useful
> information
> > > for
> > > > > the
> > > > > > >>>>>>
> > > > > > >>>>> cached
> > > > > > >>>>>
> > > > > > >>>>>> file to be passed to runtime.
> > > > > > >>>>>> If we are just talking about including the library to the
> > > > > > classloader,
> > > > > > >>>>>>
> > > > > > >>>>> can
> > > > > > >>>>>
> > > > > > >>>>>> we directly extend the interface with
> > > > > > >>>>>>
> > > > > > >>>>>> public void registerCachedFile(
> > > > > > >>>>>>     String filePath,
> > > > > > >>>>>>     String name,
> > > > > > >>>>>>     boolean executable,
> > > > > > >>>>>>     boolean includeInClassLoader)
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks,
> > > > > > >>>>>> Rong
> > > > > > >>>>>>
> > > > > > >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen <
> > > > suez1...@gmail.com>
> > > > > > >>>>>>
> > > > > > >>>>> wrote:
> > > > > > >>>>
> > > > > > >>>>> Hi Flink devs,
> > > > > > >>>>>>>
> > > > > > >>>>>>> In an effort to support loading external libraries and
> > > creating
> > > > > > UDFs
> > > > > > >>>>>>>
> > > > > > >>>>>> from
> > > > > > >>>>>
> > > > > > >>>>>> external libraries using DDL in Flink SQL, we want to use
> > > > Flink’s
> > > > > > >>>>>>>
> > > > > > >>>>>> Blob
> > > > > > >>>>
> > > > > > >>>>> Server to distribute the external libraries in runtime and
> > load
> > > > > those
> > > > > > >>>>>>> libraries into the user code classloader automatically.
> > > > > > >>>>>>>
> > > > > > >>>>>>> However, the current [Stream]ExecutionEnvironment.
> > > > > > registerCachedFile
> > > > > > >>>>>>> interface limits only to registering executable or
> > > > non-executable
> > > > > > >>>>>>>
> > > > > > >>>>>> blobs.
> > > > > > >>>>>
> > > > > > >>>>>> It’s not possible to tell in runtime if the blob files are
> > > > > libraries
> > > > > > >>>>>>>
> > > > > > >>>>>> and
> > > > > > >>>>>
> > > > > > >>>>>> should be loaded into the user code classloader in
> > > > RuntimeContext.
> > > > > > >>>>>>> Therefore, I want to propose to add an enum called
> > *BlobType*
> > > > > > >>>>>>>
> > > > > > >>>>>> explicitly
> > > > > > >>>>>
> > > > > > >>>>>> to
> > > > > > >>>>>>
> > > > > > >>>>>>> indicate the type of the Blob file being distributed, and
> > the
> > > > > > >>>>>>>
> > > > > > >>>>>> following
> > > > > > >>>>
> > > > > > >>>>> interface in [Stream]ExecutionEnvironment to support it. In
> > > > > general,
> > > > > > >>>>>>>
> > > > > > >>>>>> I
> > > > > > >>>>
> > > > > > >>>>> think the new BlobType information can be used by Flink
> > runtime
> > > > to
> > > > > > >>>>>>> preprocess the Blob files if needed.
> > > > > > >>>>>>>
> > > > > > >>>>>>> */***
> > > > > > >>>>>>> ** Registers a file at the distributed cache under the
> > given
> > > > > name.
> > > > > > >>>>>>>
> > > > > > >>>>>> The
> > > > > > >>>>
> > > > > > >>>>> file
> > > > > > >>>>>>
> > > > > > >>>>>>> will be accessible*
> > > > > > >>>>>>> ** from any user-defined function in the (distributed)
> > > runtime
> > > > > > under
> > > > > > >>>>>>>
> > > > > > >>>>>> a
> > > > > > >>>>
> > > > > > >>>>> local path. Files*
> > > > > > >>>>>>> ** may be local files (as long as all relevant workers
> have
> > > > > access
> > > > > > to
> > > > > > >>>>>>>
> > > > > > >>>>>> it),
> > > > > > >>>>>>
> > > > > > >>>>>>> or files in a distributed file system.*
> > > > > > >>>>>>> ** The runtime will copy the files temporarily to a local
> > > > cache,
> > > > > if
> > > > > > >>>>>>> needed.*
> > > > > > >>>>>>> ***
> > > > > > >>>>>>> ** <p>The {@link org.apache.flink.api.common.
> > > > > > >>>>>>>
> > > > > > >>>>>> functions.RuntimeContext}
> > > > > > >>>>
> > > > > > >>>>> can
> > > > > > >>>>>>
> > > > > > >>>>>>> be obtained inside UDFs via*
> > > > > > >>>>>>> ** {@link
> > > > > > >>>>>>> org.apache.flink.api.common.functions.RichFunction#
> > > > > > >>>>>>>
> > > > > > >>>>>> getRuntimeContext()}
> > > > > > >>>>>
> > > > > > >>>>>> and
> > > > > > >>>>>>> provides access*
> > > > > > >>>>>>> ** {@link org.apache.flink.api.common.ca
> > > > > > >>>>>>> <http://org.apache.flink.api.common.ca
> > >che.DistributedCache}
> > > > > via*
> > > > > > >>>>>>> ** {@link
> > > > > > >>>>>>> org.apache.flink.api.common.functions.RuntimeContext#
> > > > > > >>>>>>> getDistributedCache()}.*
> > > > > > >>>>>>> ***
> > > > > > >>>>>>> ** @param filePath The path of the file, as a URI (e.g.
> > > > > > >>>>>>>
> > > > > > >>>>>> "file:///some/path"
> > > > > > >>>>>>
> > > > > > >>>>>>> or "hdfs://host:port/and/path")*
> > > > > > >>>>>>> ** @param name The name under which the file is
> > registered.*
> > > > > > >>>>>>> ** @param blobType indicating the type of the Blob file*
> > > > > > >>>>>>> **/*
> > > > > > >>>>>>>
> > > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> > name,
> > > > > > >>>>>>> DistributedCache.BlobType blobType) {...}*
> > > > > > >>>>>>>
> > > > > > >>>>>>> Optionally, we can add another interface to register UDF
> > Jars
> > > > > which
> > > > > > >>>>>>>
> > > > > > >>>>>> will
> > > > > > >>>>>
> > > > > > >>>>>> use the interface above to implement.
> > > > > > >>>>>>>
> > > > > > >>>>>>> *public void registerJarFile(String filePath, String
> name)
> > > > {...}*
> > > > > > >>>>>>>
> > > > > > >>>>>>> The existing interface in the following will be marked
> > > > > deprecated:
> > > > > > >>>>>>>
> > > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> > name,
> > > > > > boolean
> > > > > > >>>>>>> executable) {...}*
> > > > > > >>>>>>>
> > > > > > >>>>>>> And the following interface will be implemented using the
> > new
> > > > > > >>>>>>>
> > > > > > >>>>>> interface
> > > > > > >>>>
> > > > > > >>>>> proposed above with a EXECUTABLE BlobType:
> > > > > > >>>>>>>
> > > > > > >>>>>>> *public void registerCachedFile(String filePath, String
> > > name) {
> > > > > ...
> > > > > > >>>>>>>
> > > > > > >>>>>> }*
> > > > > > >>>>
> > > > > > >>>>> Thanks a lot.
> > > > > > >>>>>>> Shuyi
> > > > > > >>>>>>>
> > > > > > >>>>>>> "So you have to trust that the dots will somehow connect
> in
> > > > your
> > > > > > >>>>>>>
> > > > > > >>>>>> future."
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > "So you have to trust that the dots will somehow connect in your
> > > future."
> > > > >
> > > >
> > >
> >
> >
> > --
> > "So you have to trust that the dots will somehow connect in your future."
> >
>

Reply via email to