[ https://issues.apache.org/jira/browse/FLINK-24558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-24558: ----------------------------------- Labels: pull-request-available (was: ) > dataStream can not use multiple classloaders > --------------------------------------------- > > Key: FLINK-24558 > URL: https://issues.apache.org/jira/browse/FLINK-24558 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Reporter: bai sui > Priority: Minor > Labels: pull-request-available > Attachments: Flink ClassLoader优化 (1).png > > > create a dataStream demo as below,in the demo,create a very simple example, > read stream data from sourceFunction,and send it to sinkFunction without any > processing. > The point is,by creating the instance of SourceFunction and SinkFunction has > used two separately URLClassLoader with different dependencies,for avoiding > the code conflict . > but the problem is flink client send to server ,the server side throw an > classNotFoundException which defined the de classloader dependencies, > Obviously the server side has not use the classloader as client side. > how can I solve the problem ,is there any one can give me some advice ? > thanks a lot > > {code:java} > public class FlinkStreamDemo { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SourceFunction<DTO> sourceFunc = createSourceFunction(); > DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc); > SinkFunction<DTO> sinkFunction = createSink(); > dtoDataStreamSource.addSink(sinkFunction); > env.execute("flink-example"); > } > private static SinkFunction<DTO> createSink() { > URL[] urls = new URL[]{...}; > ClassLoader classLoader = new URLClassLoader(urls); > ServiceLoader<ISinkFunctionFactory> loaders = > ServiceLoader.load(ISinkFunctionFactory.class, classLoader); > Iterator<ISinkFunctionFactory> it = loaders.iterator(); > if (it.hasNext()) { > return it.next().create(); > } > throw new IllegalStateException(); > } > private static SourceFunction<DTO> createSourceFunction() { > URL[] urls = new URL[]{...}; > ClassLoader classLoader = new URLClassLoader(urls); > ServiceLoader<ISourceFunctionFactory> loaders = > ServiceLoader.load(ISourceFunctionFactory.class, classLoader); > Iterator<ISourceFunctionFactory> it = loaders.iterator(); > if (it.hasNext()) { > return it.next().create(); > } > throw new IllegalStateException(); > } > public interface ISinkFunctionFactory { > SinkFunction<DTO> create(); > } > public interface ISourceFunctionFactory { > SourceFunction<DTO> create(); > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)