Hi Marvin, The discussion was not resolved yet. Can you tell us a bit about your use case? Maybe that would help with the decision. Which of the discussed approaches would work best for your use case and why?
Thanks, Fabian 2018-06-25 13:27 GMT+02:00 Marvin777 <xymaqingxiang...@gmail.com>: > Hi, Shuyi: > > What is the progress of the discussion? We also look forward to this > feature. > Thanks. > > Shuyi Chen <suez1...@gmail.com> 于2018年6月8日周五 下午3:04写道: > > > Thanks a lot for the comments, Till and Fabian. > > > > The RemoteEnvrionment does provide a way to specify jar files at > > construction, but we want the jar files to be specified dynamically in > the > > user code, e.g. in a DDL statement, and the jar files might be in a > remote > > DFS. As we discussed, I think there are 2 approaches: > > > > 1) add new interface env.registerJarFile(jarFiles...), which ships the > JAR > > files using JobGraph.addJar(). In this case, all jars will be loaded by > > default at runtime. This approach will be the same as how SQL client ship > > UDF jars now. > > 2) add new interface env.registerJarFile(name, jarFiles...). It will do > > similar things as env.registerCachedFile(), which will register a set of > > Jar files with a key name, and we can add a new interface in > > RuntimeContext as Fabian suggests, i.e., > > RuntimeContext.getClassloaderWithJar(<key name>). Now user will be able > to > > load the functions in remote jar dynamically using the returned > > ClassLoader. > > > > Comparing the 2 approaches: > > > > - Approach 1) will be simpler for user to use. > > - Approach 2) will allow us to use different versions of a class in > the > > same code, and might solve some dependency conflict issues. Also in > 2), > > we > > can load Jars on demand, while in 1) all jars will be loaded by > default. > > > > I think we can support both interfaces. On the SQL DDL implementation, > both > > will work and approach 2) will be more complicated, but with some nice > > benefit as stated above. However, the implementation choice should be > > transparent to the end user. Also, I am wondering outside of the SQL DDL, > > will these new functionality/interface be helpful in other scenarios? > > Maybe, that will help make the interface better and more generic. Thanks > a > > lot. > > > > Shuyi > > > > On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske <fhue...@gmail.com> wrote: > > > > > We could also offer a feature that users can request classloaders with > > > additional jars. > > > This could work as follows: > > > > > > 1) Users register jar files in the ExecutionEnvironment (similar to > > cached > > > files) with a name, e.g., env.registerJarFile("~/myJar.jar", > "myName"); > > > 2) In a function, the user can request a user classloader with the > > > additional classes, e.g., RuntimeContext. > getClassloaderWithJar("myName"); > > > This could also support to load multiple jar files in the same > > classloader. > > > > > > IMO, the interesting part of Shuyi's proposal is to be able to > > dynamically > > > load code from remote locations without fetching it to the client > first. > > > > > > Best, Fabian > > > > > > > > > 2018-05-29 12:42 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > > > > > > > I see Shuyi's point that it would nice to allow adding jar files > which > > > > should be part of the user code classloader programmatically. > Actually, > > > we > > > > expose this functionality in the `RemoteEnvironment` where you can > > > specify > > > > additional jars which shall be shipped to the cluster in the > > > constructor. I > > > > assume that is exactly the functionality you are looking for. In that > > > > sense, it might be an API inconsistency that we allow it for some > cases > > > and > > > > for others not. > > > > > > > > But I could also see that the whole functionality of dynamically > > loading > > > > jars at runtime could also perfectly live in the `UdfSqlOperator`. > > This, > > > of > > > > course, would entail that one has to take care of clean up of the > > > > downloaded resources. But it should be possible to first download the > > > > resources and create a custom URLClassLoader at startup and then use > > this > > > > class loader when calling into the UDF. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen <suez1...@gmail.com> > > wrote: > > > > > > > > > Hi Aljoscha, Fabian, Rong, Ted and Timo, > > > > > > > > > > Thanks a lot for the feedback. Let me clarify the usage scenario > in a > > > bit > > > > > more detail. The context is that we want to add support for SQL DDL > > to > > > > load > > > > > UDF from external JARs located either in local filesystem or HDFS > or > > a > > > > HTTP > > > > > endpoint in Flink SQL. The local FS option is more for debugging > > > purpose > > > > > for user to submit the job jar locally, and the later 2 are for > > > > production > > > > > uses. Below is an example User application with the *CREATE > FUNCTION* > > > DDL > > > > > (Note: grammar and interface not finalized yet). > > > > > > > > > > ------------------------------------------------------------ > > > > > ------------------------------------- > > > > > > > > > > > > > > > > > > > > > > > > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval > > tEnv = > > > > > TableEnvironment.getTableEnvironment(env)// setup the > > > > DataStream//......* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *// register the DataStream under the name > > > > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product, > > > > > 'amount)tEnv.sqlDDL( "create function helloFunc as > > > > > 'com.example.udf.HelloWorld' using jars > > > > > ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val > > result > > > > = > > > > > tEnv.sqlQuery( "SELECT user, helloFunc(product), amount FROM > OrderA > > > > WHERE > > > > > amount > 2")result.toAppendStream[Order].print()env.execute()* > > > > > ------------------------------------------------------------ > > > > > ------------------------------------- > > > > > > > > > > The example application above does the following: > > > > > 1) it registers a DataStream as a Calcite table( > > > > > *org.apache.calcite.schema.Table*) under name "OrderA", so SQL can > > > > > reference the DataStream as table "OrderA". > > > > > 2) it uses the SQL *CREATE FUNCTION* DDL (grammar and interface not > > > > > finalized yet) to create a SQL UDF called *helloFunc* from a JAR > > > located > > > > in > > > > > a remote HDFS path. > > > > > 3) it issues a sql query that uses the *helloFunc* UDF defined > above > > > and > > > > > generate a Flink table (*org.apache.flink.table.api.Table*) > > > > > 4) it convert the Flink table back to a DataStream and print it. > > > > > > > > > > Step 1), 3), and 4) are already implemented. To implement 2), we > need > > > to > > > > do > > > > > the following to implement the *tEnv.sqlDDL()* function. > > > > > > > > > > a) parse the DDL into a SqlNode to extract the UDF *udfClasspath*, > > UDF > > > > > remote path *udfUrls[]* and UDF SQL name *udfName*. > > > > > b) use the URLClassLoader to load the JARs specified in > *udfUrls[]*, > > > and > > > > > register the SQL UDF using the {Batch/Stream/}TableEnvironment > > > > > registerFunction methods using* udfClasspath* under name > *udfName.* > > > > > c) register the JARs *udfUrls[]* through the > > > > {Stream}ExecutionEnvironment, > > > > > so that the JARs can be distributed to all the TaskManagers during > > > > runtime. > > > > > > > > > > > > > > > Since the CREATE FUNCTION DDL is executed within the user > > application, > > > I > > > > > dont think we have access to the ClusterClient at the point when > > > > > *tEnv.sqlDDL()* is executed. Also the JARs can be in a remote > > > filesystem > > > > > (which is the main usage scenarios), so the user can't really > prepare > > > the > > > > > jar somehow in advance statically. > > > > > > > > > > For normal user application, I think {Stream}ExecutionEnvironment > is > > > the > > > > > right place for the functionality, since it provides methods to > > control > > > > the > > > > > job execution and to interact with the outside world, and also, it > > > > actually > > > > > already does similar things provided through the > *registerCachedFile* > > > > > interface. > > > > > > > > > > However, in such case, SQL FUNCTION DDL and SQL client will use 2 > > > > different > > > > > routes to register UDF jars, one through > *JobGraph.jobConfiguration* > > > and > > > > > the other through *JobGraph.userJars*. So *maybe we can, as Fabian > > > > > suggests, add **registerUserJarFile()/getUserJarFiles() interfaces > > > > > in {Stream}ExecutionEnvironment, which stores the jars internally > in > > a > > > > > List, and when generating JobGraph, copy the jars to the JobGraph > > using > > > > > the {Stream}ExecutionEnvironment.getUserJarFiles() and > > > > > JobGraph.addJar()* (Note, > > > > > streaming and batch implementations might vary). In such case, both > > SQL > > > > > FUNCTION DDL and SQL client will use *JobGraph.userJars* to ship > the > > > UDF > > > > > jars. > > > > > > > > > > Hope that clarifies better. What do you guys think? Thanks a lot. > > > > > > > > > > Cheers! > > > > > Shuyi > > > > > > > > > > On Wed, May 16, 2018 at 9:45 AM, Rong Rong <walter...@gmail.com> > > > wrote: > > > > > > > > > > > I think the question here is whether registering Jar files (or > > other > > > > > > executable files) during job submission is sufficient for > @shuyi's > > > use > > > > > > case. > > > > > > > > > > > > If I understand correctly regarding the part of dynamic > > distribution > > > of > > > > > the > > > > > > external libraries in runtime. This is used to deal with DDL/DSL > > such > > > > as: > > > > > > CREATE FUNCTION my_fun FROM url://<some_remote_jar> > > > > > > during execution. Correct me if I am wrong @shuyi, The basic > > > assumption > > > > > > that "we can locate and ship all executable JARs during job > > > submission" > > > > > no > > > > > > longer holds for your use case right? > > > > > > > > > > > > I guess we are missing details here regarding the "distribution > of > > > > > external > > > > > > libraries in runtime" part. Maybe you can share more example of > > this > > > > use > > > > > > case. Would this be included in the design doc @Timo? > > > > > > > > > > > > -- > > > > > > Rong > > > > > > > > > > > > On Wed, May 16, 2018 at 5:41 AM, Timo Walther < > twal...@apache.org> > > > > > wrote: > > > > > > > > > > > > > Yes, we are using the addJar functionionality of the JobGraph > as > > > well > > > > > for > > > > > > > the SQL Client. > > > > > > > > > > > > > > I think the execution environment is not the right place to > > specify > > > > > jars. > > > > > > > The location of the jars depends on the submission method. If a > > > local > > > > > > path > > > > > > > is specified in the main() method of a packaged Flink jar, it > > would > > > > not > > > > > > > work when such a program is submitted through the REST API. > > > > > > > > > > > > > > Regards, > > > > > > > Timo > > > > > > > > > > > > > > Am 16.05.18 um 14:32 schrieb Aljoscha Krettek: > > > > > > > > > > > > > > I think this functionality is already there, we just have to > > expose > > > > it > > > > > in > > > > > > >> the right places: ClusterClient.submitJob() takes a JobGraph, > > > > JobGraph > > > > > > has > > > > > > >> method addJar() for adding jars that need to be in the > > classloader > > > > for > > > > > > >> executing a user program. > > > > > > >> > > > > > > >> On 16. May 2018, at 12:34, Fabian Hueske <fhue...@gmail.com> > > > wrote: > > > > > > >>> > > > > > > >>> Hi Ted, > > > > > > >>> > > > > > > >>> The design doc is in late draft status and proposes support > for > > > SQL > > > > > DDL > > > > > > >>> statements (CREATE TABLE, CREATE FUNCTION, etc.). > > > > > > >>> The question about registering JARs came up because we need a > > way > > > > to > > > > > > >>> distribute JAR files that contain the code of user-defined > > > > functions. > > > > > > >>> > > > > > > >>> The design doc will soon be shared on the dev mailing list to > > > > gather > > > > > > >>> feedback from the community. > > > > > > >>> > > > > > > >>> Best, Fabian > > > > > > >>> > > > > > > >>> 2018-05-16 10:45 GMT+02:00 Ted Yu <yuzhih...@gmail.com>: > > > > > > >>> > > > > > > >>> bq. In a design document, Timo mentioned that we can ship > > > multiple > > > > > JAR > > > > > > >>>> files > > > > > > >>>> > > > > > > >>>> Mind telling us where the design doc can be retrieved ? > > > > > > >>>> > > > > > > >>>> Thanks > > > > > > >>>> > > > > > > >>>> On Wed, May 16, 2018 at 1:29 AM, Fabian Hueske < > > > fhue...@gmail.com > > > > > > > > > > > >>>> wrote: > > > > > > >>>> > > > > > > >>>> Hi, > > > > > > >>>>> > > > > > > >>>>> I'm not sure if we need to modify the existing method. > > > > > > >>>>> What we need is a bit different from what > > registerCachedFile() > > > > > > >>>>> provides. > > > > > > >>>>> The method ensures that a file is copied to each > TaskManager > > > and > > > > > can > > > > > > be > > > > > > >>>>> locally accessed from a function's RuntimeContext. > > > > > > >>>>> In our case, we don't need to access the file but would > like > > to > > > > > make > > > > > > >>>>> sure > > > > > > >>>>> that it is loaded into the class loader. > > > > > > >>>>> So, we could also just add a method like > > registerUserJarFile(). > > > > > > >>>>> > > > > > > >>>>> In a design document, Timo mentioned that we can ship > > multiple > > > > JAR > > > > > > >>>>> files > > > > > > >>>>> with a job. > > > > > > >>>>> So, we could also implement the UDF shipping logic by > loading > > > the > > > > > Jar > > > > > > >>>>> file(s) to the client and distribute them from there. > > > > > > >>>>> In that case, we would not need to add new method to the > > > > execution > > > > > > >>>>> environment. > > > > > > >>>>> > > > > > > >>>>> Best, > > > > > > >>>>> Fabian > > > > > > >>>>> > > > > > > >>>>> 2018-05-15 3:50 GMT+02:00 Rong Rong <walter...@gmail.com>: > > > > > > >>>>> > > > > > > >>>>> +1. This could be very useful for "dynamic" UDF. > > > > > > >>>>>> > > > > > > >>>>>> Just to clarify, if I understand correctly, we are tying > to > > > use > > > > an > > > > > > >>>>>> ENUM > > > > > > >>>>>> indicator to > > > > > > >>>>>> (1) Replace the current Boolean isExecutable flag. > > > > > > >>>>>> (2) Provide additional information used by > > > ExecutionEnvironment > > > > to > > > > > > >>>>>> > > > > > > >>>>> decide > > > > > > >>>> > > > > > > >>>>> when/where to use the DistributedCached file. > > > > > > >>>>>> > > > > > > >>>>>> In this case, DistributedCache.CacheType or > > > > > > DistributedCache.FileType > > > > > > >>>>>> sounds more intuitive, what do you think? > > > > > > >>>>>> > > > > > > >>>>>> Also, I was wondering is there any other useful > information > > > for > > > > > the > > > > > > >>>>>> > > > > > > >>>>> cached > > > > > > >>>>> > > > > > > >>>>>> file to be passed to runtime. > > > > > > >>>>>> If we are just talking about including the library to the > > > > > > classloader, > > > > > > >>>>>> > > > > > > >>>>> can > > > > > > >>>>> > > > > > > >>>>>> we directly extend the interface with > > > > > > >>>>>> > > > > > > >>>>>> public void registerCachedFile( > > > > > > >>>>>> String filePath, > > > > > > >>>>>> String name, > > > > > > >>>>>> boolean executable, > > > > > > >>>>>> boolean includeInClassLoader) > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> Thanks, > > > > > > >>>>>> Rong > > > > > > >>>>>> > > > > > > >>>>>> On Sun, May 13, 2018 at 11:14 PM, Shuyi Chen < > > > > suez1...@gmail.com> > > > > > > >>>>>> > > > > > > >>>>> wrote: > > > > > > >>>> > > > > > > >>>>> Hi Flink devs, > > > > > > >>>>>>> > > > > > > >>>>>>> In an effort to support loading external libraries and > > > creating > > > > > > UDFs > > > > > > >>>>>>> > > > > > > >>>>>> from > > > > > > >>>>> > > > > > > >>>>>> external libraries using DDL in Flink SQL, we want to use > > > > Flink’s > > > > > > >>>>>>> > > > > > > >>>>>> Blob > > > > > > >>>> > > > > > > >>>>> Server to distribute the external libraries in runtime and > > load > > > > > those > > > > > > >>>>>>> libraries into the user code classloader automatically. > > > > > > >>>>>>> > > > > > > >>>>>>> However, the current [Stream]ExecutionEnvironment. > > > > > > registerCachedFile > > > > > > >>>>>>> interface limits only to registering executable or > > > > non-executable > > > > > > >>>>>>> > > > > > > >>>>>> blobs. > > > > > > >>>>> > > > > > > >>>>>> It’s not possible to tell in runtime if the blob files are > > > > > libraries > > > > > > >>>>>>> > > > > > > >>>>>> and > > > > > > >>>>> > > > > > > >>>>>> should be loaded into the user code classloader in > > > > RuntimeContext. > > > > > > >>>>>>> Therefore, I want to propose to add an enum called > > *BlobType* > > > > > > >>>>>>> > > > > > > >>>>>> explicitly > > > > > > >>>>> > > > > > > >>>>>> to > > > > > > >>>>>> > > > > > > >>>>>>> indicate the type of the Blob file being distributed, and > > the > > > > > > >>>>>>> > > > > > > >>>>>> following > > > > > > >>>> > > > > > > >>>>> interface in [Stream]ExecutionEnvironment to support it. In > > > > > general, > > > > > > >>>>>>> > > > > > > >>>>>> I > > > > > > >>>> > > > > > > >>>>> think the new BlobType information can be used by Flink > > runtime > > > > to > > > > > > >>>>>>> preprocess the Blob files if needed. > > > > > > >>>>>>> > > > > > > >>>>>>> */*** > > > > > > >>>>>>> ** Registers a file at the distributed cache under the > > given > > > > > name. > > > > > > >>>>>>> > > > > > > >>>>>> The > > > > > > >>>> > > > > > > >>>>> file > > > > > > >>>>>> > > > > > > >>>>>>> will be accessible* > > > > > > >>>>>>> ** from any user-defined function in the (distributed) > > > runtime > > > > > > under > > > > > > >>>>>>> > > > > > > >>>>>> a > > > > > > >>>> > > > > > > >>>>> local path. Files* > > > > > > >>>>>>> ** may be local files (as long as all relevant workers > have > > > > > access > > > > > > to > > > > > > >>>>>>> > > > > > > >>>>>> it), > > > > > > >>>>>> > > > > > > >>>>>>> or files in a distributed file system.* > > > > > > >>>>>>> ** The runtime will copy the files temporarily to a local > > > > cache, > > > > > if > > > > > > >>>>>>> needed.* > > > > > > >>>>>>> *** > > > > > > >>>>>>> ** <p>The {@link org.apache.flink.api.common. > > > > > > >>>>>>> > > > > > > >>>>>> functions.RuntimeContext} > > > > > > >>>> > > > > > > >>>>> can > > > > > > >>>>>> > > > > > > >>>>>>> be obtained inside UDFs via* > > > > > > >>>>>>> ** {@link > > > > > > >>>>>>> org.apache.flink.api.common.functions.RichFunction# > > > > > > >>>>>>> > > > > > > >>>>>> getRuntimeContext()} > > > > > > >>>>> > > > > > > >>>>>> and > > > > > > >>>>>>> provides access* > > > > > > >>>>>>> ** {@link org.apache.flink.api.common.ca > > > > > > >>>>>>> <http://org.apache.flink.api.common.ca > > >che.DistributedCache} > > > > > via* > > > > > > >>>>>>> ** {@link > > > > > > >>>>>>> org.apache.flink.api.common.functions.RuntimeContext# > > > > > > >>>>>>> getDistributedCache()}.* > > > > > > >>>>>>> *** > > > > > > >>>>>>> ** @param filePath The path of the file, as a URI (e.g. > > > > > > >>>>>>> > > > > > > >>>>>> "file:///some/path" > > > > > > >>>>>> > > > > > > >>>>>>> or "hdfs://host:port/and/path")* > > > > > > >>>>>>> ** @param name The name under which the file is > > registered.* > > > > > > >>>>>>> ** @param blobType indicating the type of the Blob file* > > > > > > >>>>>>> **/* > > > > > > >>>>>>> > > > > > > >>>>>>> *public void registerCachedFile(String filePath, String > > name, > > > > > > >>>>>>> DistributedCache.BlobType blobType) {...}* > > > > > > >>>>>>> > > > > > > >>>>>>> Optionally, we can add another interface to register UDF > > Jars > > > > > which > > > > > > >>>>>>> > > > > > > >>>>>> will > > > > > > >>>>> > > > > > > >>>>>> use the interface above to implement. > > > > > > >>>>>>> > > > > > > >>>>>>> *public void registerJarFile(String filePath, String > name) > > > > {...}* > > > > > > >>>>>>> > > > > > > >>>>>>> The existing interface in the following will be marked > > > > > deprecated: > > > > > > >>>>>>> > > > > > > >>>>>>> *public void registerCachedFile(String filePath, String > > name, > > > > > > boolean > > > > > > >>>>>>> executable) {...}* > > > > > > >>>>>>> > > > > > > >>>>>>> And the following interface will be implemented using the > > new > > > > > > >>>>>>> > > > > > > >>>>>> interface > > > > > > >>>> > > > > > > >>>>> proposed above with a EXECUTABLE BlobType: > > > > > > >>>>>>> > > > > > > >>>>>>> *public void registerCachedFile(String filePath, String > > > name) { > > > > > ... > > > > > > >>>>>>> > > > > > > >>>>>> }* > > > > > > >>>> > > > > > > >>>>> Thanks a lot. > > > > > > >>>>>>> Shuyi > > > > > > >>>>>>> > > > > > > >>>>>>> "So you have to trust that the dots will somehow connect > in > > > > your > > > > > > >>>>>>> > > > > > > >>>>>> future." > > > > > > >>>>> > > > > > > >>>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > "So you have to trust that the dots will somehow connect in your > > > future." > > > > > > > > > > > > > > > > > > -- > > "So you have to trust that the dots will somehow connect in your future." > > >