Hi, As Yang Wang pointed out, you should use the new plugins mechanism.
If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10. If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem. Piotrek [1] https://issues.apache.org/jira/browse/FLINK-14382 <https://issues.apache.org/jira/browse/FLINK-14382> > On 18 Dec 2019, at 12:40, Yang Wang <danrtsey...@gmail.com> wrote: > > You could have a try the new plugin mechanism. > Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put > your filesystem related jars in it. > Different plugins will be loaded by separate classloader to avoid conflict. > > > Best, > Yang > > vino yang <yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> > 于2019年12月18日周三 下午6:46写道: > Hi ouywl, > > >> Thread.currentThread().getContextClassLoader(); > What does this statement mean in your program? > > In addition, can you share your implementation of the customized file system > plugin and the related exception? > > Best, > Vino > > ouywl <ou...@139.com <mailto:ou...@139.com>> 于2019年12月18日周三 下午4:59写道: > Hi all, > We have implemented a filesystem plugin for sink data to hdfs1, and the > yarn for flink running is used hdfs2. So when the job running, the jobmanager > use the conf of hdfs1 to create filesystem, the filesystem plugin is > conflict with flink component. > We implemeted step: > 1. ‘FileSystemEnhance’ is implement from “FileSystem” > 2. ‘FileSystemFactoryEnhance’ is implement from > “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" > 3. Add a service entry. Create a file > META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains > the class name of “ FileSystemFactoryEnhance.class” > > And the job mainclass is : > “ public static void main(String[] args) throws Exception{ > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(60*1000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.getConfig().enableSysoutLogging(); > > > Properties props = new Properties(); > props.put("bootstrap.servers", SERVERS); > props.put("group.id <http://group.id/>", GROUPID); > props.put("enable.auto.commit", "true"); > // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", > "1000"); > props.put("session.timeout.ms <http://session.timeout.ms/>", "30000"); > props.put("auto.offset.reset", "latest"); > props.put("key.deserializer", > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName()); > props.put("value.deserializer", StringDeserializer.class.getName()); > FlinkKafkaConsumer010 consumer011 = new > FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props); > DataStream<String> source = env.addSource(consumer011).setParallelism(1); > > source.print(); > Thread.currentThread().getContextClassLoader(); > > StreamingFileSink sink = StreamingFileSink > .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new > SimpleStringEncoder<>("UTF-8")) > .build(); > > source.addSink(sink); > > env.execute(); > }” > > And start the job, the jobmanager filesystem is error, the log means the > jobmananger use “FileSystemFactoryEnhance” filesystem and confict. > > As the url > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems > > <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> > how to avoid use “Thread.currentThread().getContextClassLoader()" > > > > ouywl > ou...@139.com > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>