[ 
https://issues.apache.org/jira/browse/FLINK-24558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bai sui updated FLINK-24558:
----------------------------
    Description: 
create a dataStream demo as below,in the demo,create a very simple example,

read stream data from sourceFunction,and send it to sinkFunction without any 
processing.
The point is,by creating the instance of SourceFunction and SinkFunction has 
used two separately URLClassLoader with different dependencies,for avoiding the 
code conflict .

but the problem is flink client send to server ,the server side throw an 
classNotFoundException which defined the de classloader dependencies, Obviously 
the server side has not use the classloader as client side.

how can I solve the problem ,is there any one can give me some advice ? thanks 
a lot 
 
{code:java}
public class FlinkStreamDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        SourceFunction<DTO> sourceFunc = createSourceFunction();

        DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc);

        SinkFunction<DTO> sinkFunction = createSink();

        dtoDataStreamSource.addSink(sinkFunction);

        env.execute("flink-example");
    }

    private static SinkFunction<DTO> createSink() {
        URL[] urls = new URL[]{...};
        ClassLoader classLoader = new URLClassLoader(urls);
        ServiceLoader<ISinkFunctionFactory> loaders = 
ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
        Iterator<ISinkFunctionFactory> it = loaders.iterator();
        if (it.hasNext()) {
            return it.next().create();
        }
        throw new IllegalStateException();
    }

    private static SourceFunction<DTO> createSourceFunction() {
        URL[] urls = new URL[]{...};
        ClassLoader classLoader = new URLClassLoader(urls);
        ServiceLoader<ISourceFunctionFactory> loaders = 
ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
        Iterator<ISourceFunctionFactory> it = loaders.iterator();
        if (it.hasNext()) {
            return it.next().create();
        }
        throw new IllegalStateException();
    }

    public interface ISinkFunctionFactory {
        SinkFunction<DTO> create();
    }

    public interface ISourceFunctionFactory {
        SourceFunction<DTO> create();
    }
}
{code}


  was:
create a dataStream demo as below,in the demo,create a very simple example,

read stream data from sourceFunction,and send it to sinkFunction without any 
processing.
The point is,by creating the instance of SourceFunction and SinkFunction has 
used two separately URLClassLoader with different dependencies,for avoiding the 
code conflict .

but the problem is flink client send to server ,the server side throw an 
classNotFoundException which defined the de classloader dependencies, Obviously 
the server side has not use the classloader as client side.

how can I solve the problem ,is there any one can give me some advice ? thanks 
a lot 
 
{code:java}
public class FlinkStreamDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        SourceFunction<DTO> sourceFunc = createSourceFunction();

        DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc);

        SinkFunction<DTO> sinkFunction = createSink();

        dtoDataStreamSource.addSink(sinkFunction);

        env.execute("flink-example");
    }

    private static SinkFunction<DTO> createSink() {
        URL[] urls = new URL[]{};
        ClassLoader classLoader = new URLClassLoader(urls);
        ServiceLoader<ISinkFunctionFactory> loaders = 
ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
        Iterator<ISinkFunctionFactory> it = loaders.iterator();
        if (it.hasNext()) {
            return it.next().create();
        }
        throw new IllegalStateException();
    }

    private static SourceFunction<DTO> createSourceFunction() {
        URL[] urls = new URL[]{};
        ClassLoader classLoader = new URLClassLoader(urls);
        ServiceLoader<ISourceFunctionFactory> loaders = 
ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
        Iterator<ISourceFunctionFactory> it = loaders.iterator();
        if (it.hasNext()) {
            return it.next().create();
        }
        throw new IllegalStateException();
    }

    public interface ISinkFunctionFactory {
        SinkFunction<DTO> create();
    }

    public interface ISourceFunctionFactory {
        SourceFunction<DTO> create();
    }
}
{code}



> dataStream can not use multiple classloaders 
> ---------------------------------------------
>
>                 Key: FLINK-24558
>                 URL: https://issues.apache.org/jira/browse/FLINK-24558
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: bai sui
>            Priority: Minor
>
> create a dataStream demo as below,in the demo,create a very simple example,
> read stream data from sourceFunction,and send it to sinkFunction without any 
> processing.
> The point is,by creating the instance of SourceFunction and SinkFunction has 
> used two separately URLClassLoader with different dependencies,for avoiding 
> the code conflict .
> but the problem is flink client send to server ,the server side throw an 
> classNotFoundException which defined the de classloader dependencies, 
> Obviously the server side has not use the classloader as client side.
> how can I solve the problem ,is there any one can give me some advice ? 
> thanks a lot 
>  
> {code:java}
> public class FlinkStreamDemo {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         SourceFunction<DTO> sourceFunc = createSourceFunction();
>         DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc);
>         SinkFunction<DTO> sinkFunction = createSink();
>         dtoDataStreamSource.addSink(sinkFunction);
>         env.execute("flink-example");
>     }
>     private static SinkFunction<DTO> createSink() {
>         URL[] urls = new URL[]{...};
>         ClassLoader classLoader = new URLClassLoader(urls);
>         ServiceLoader<ISinkFunctionFactory> loaders = 
> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
>         Iterator<ISinkFunctionFactory> it = loaders.iterator();
>         if (it.hasNext()) {
>             return it.next().create();
>         }
>         throw new IllegalStateException();
>     }
>     private static SourceFunction<DTO> createSourceFunction() {
>         URL[] urls = new URL[]{...};
>         ClassLoader classLoader = new URLClassLoader(urls);
>         ServiceLoader<ISourceFunctionFactory> loaders = 
> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
>         Iterator<ISourceFunctionFactory> it = loaders.iterator();
>         if (it.hasNext()) {
>             return it.next().create();
>         }
>         throw new IllegalStateException();
>     }
>     public interface ISinkFunctionFactory {
>         SinkFunction<DTO> create();
>     }
>     public interface ISourceFunctionFactory {
>         SourceFunction<DTO> create();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to