TO:@Arvid Heise @Caizhi Weng

thanks for your reply, as the stream api code become more and more complicated, 
we will add more dependenies from  third pary. 

This kind of problem will be inevitable. If we only rely on this kind of trick 
like shade the dependencies  to solve this kind of problem, I think it is far 
from enough . 
There should be a work-and-for-all solution. I proposed a plan for 
discussion,the figure show as below:

make server side classLoader which create by 
BloblibraryCacheManager.DefaultClassLoaderFactory pluggable in able to make the 
parent classloader of  ChildFirstClassLoader variable
the parent classloader facade client side multi classloader for class finding 
by polling
how about it , thanks


------------------------------------------------------------------
发件人:Arvid Heise <ar...@apache.org>
发送时间:2021年10月18日(星期一) 17:28
收件人:Caizhi Weng <tsreape...@gmail.com>
抄 送:dev <dev@flink.apache.org>; 百岁 <bai...@dingtalk.com>; user 
<u...@flink.apache.org>
主 题:Re: dataStream can not use multiple classloaders

You also must ensure that your SourceFunction is serializable, so it's not 
enough to just refer to some classloader, you must ensure that you have access 
to it also after deserialization on the task managers.

On Mon, Oct 18, 2021 at 4:24 AM Caizhi Weng <tsreape...@gmail.com> wrote:

Hi!

There is only one classloader for user code by default in runtime. The main 
method of your code is only executed on the client side. It generates a job 
graph and sends it to the cluster.

To avoid class loading conflict it is recommended to shade the dependencies of 
your source and sink function jars. If you really have to load some 
dependencies with different class loaders, you can load them in the open method 
of a RichSourceFunction or RichSinkFunction.
百岁 <bai...@dingtalk.com.invalid> 于2021年10月16日周六 下午11:47写道:
TO: everyone
 I have create a dataStream demo as below,in the demo,create a very simple 
example,
 read stream data from sourceFunction,and send it to sinkFunction without any 
processing.
 The point is,by creating the instance of SourceFunction and SinkFunction has 
used two separately URLClassLoader with different dependencies,for avoiding the 
code conflict .
 but the problem is flink client send to server ,the server side throw an 
classNotFoundException which defined the de classloader dependencies, Obviously 
the server side has not use the classloader as client side.
 how can I solve the problem ,is there any one can give me some advice ? thanks 
a lot



 public class FlinkStreamDemo {
     public static void main(String[] args) throws Exception {

         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         SourceFunction<DTO> sourceFunc = createSourceFunction();
         DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc);
         SinkFunction<DTO> sinkFunction = createSink();
         dtoDataStreamSource.addSink(sinkFunction);
         env.execute("flink-example");
     }

     private static SinkFunction<DTO> createSink() {
         URL[] urls = new URL[]{...};
         ClassLoader classLoader = new URLClassLoader(urls);
         ServiceLoader<ISinkFunctionFactory> loaders = 
ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
         Iterator<ISinkFunctionFactory> it = loaders.iterator();
         if (it.hasNext()) {
             return it.next().create();
         }
         throw new IllegalStateException();
     }

     private static SourceFunction<DTO> createSourceFunction() {
         URL[] urls = new URL[]{...};
         ClassLoader classLoader = new URLClassLoader(urls);
         ServiceLoader<ISourceFunctionFactory> loaders = 
ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
         Iterator<ISourceFunctionFactory> it = loaders.iterator();
         if (it.hasNext()) {
             return it.next().create();
         }
         throw new IllegalStateException();
     }

     public interface ISinkFunctionFactory {
         SinkFunction<DTO> create();
     }

     public interface ISourceFunctionFactory {
         SourceFunction<DTO> create();
     }
 }


 from: 
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues

Reply via email to