
As Yang Wang pointed out, you should use the new plugins mechanism.

If it doesn’t work, first make sure that you are shipping/distributing the 
plugins jars correctly - the correct plugins directory structure both on the 
client machine. Next make sure that the cluster has the same correct setup. 
This is especially true for the standalone/cluster execution modes. For yarn, 
mesos, docker the plugins dir should be shipped to the cluster by Flink itself, 
however Plugins support in yarn is currently semi broken [1]. This is already 
fixed, but waiting to be released in 1.9.2 and 1.10.

If it still doesn’t work, look for TaskManager logs what plugins/file systems 
are being loaded during the startup. If none, that's the problem.


[1] https://issues.apache.org/jira/browse/FLINK-14382 

> You could have a try the new plugin mechanism.
> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put 
> your filesystem related jars in it.
> Different plugins will be loaded by separate classloader to avoid conflict.
> Hi ouywl,
> >>    Thread.currentThread().getContextClassLoader();
> What does this statement mean in your program?
> In addition, can you share your implementation of the customized file system 
> plugin and the related exception?
> Hi all,
>     We have implemented a filesystem plugin for sink data to hdfs1, and the 
> yarn for flink running is used hdfs2. So when the job running, the jobmanager 
> use the conf of hdfs1 to create filesystem, the filesystem plugin  is 
> conflict with flink component. 
>     We implemeted step:
>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>       2.  ‘FileSystemFactoryEnhance’ is implement from 
> “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" 
>       3. Add a service entry. Create a file 
> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains 
> the class name of “ FileSystemFactoryEnhance.class”
> And  the job mainclass is :
>    “ public static void main(String[] args) throws Exception{
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(60*1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.getConfig().enableSysoutLogging();
>     Properties props = new Properties();
>     props.put("bootstrap.servers", SERVERS);
>     props.put("group.id <http://group.id/>", GROUPID);
>     props.put("enable.auto.commit", "true");
>     // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", 
> "1000");
>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>     props.put("auto.offset.reset", "latest");
>     props.put("key.deserializer", 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     FlinkKafkaConsumer010 consumer011 = new 
> FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
>     source.print();
>     Thread.currentThread().getContextClassLoader();
>     StreamingFileSink sink = StreamingFileSink
>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new 
> SimpleStringEncoder<>("UTF-8"))
>             .build();
>     source.addSink(sink);
>     env.execute();
> }”
> And start the job, the jobmanager filesystem is error, the log means the 
> jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
> As the url 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems>
>  how to avoid use “Thread.currentThread().getContextClassLoader()"
