You could have a try the new plugin mechanism. Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it. Different plugins will be loaded by separate classloader to avoid conflict.
Best, Yang vino yang <yanghua1...@gmail.com> 于2019年12月18日周三 下午6:46写道: > Hi ouywl, > > *>> Thread.currentThread().getContextClassLoader();* > > What does this statement mean in your program? > > In addition, can you share your implementation of the customized file > system plugin and the related exception? > > Best, > Vino > > ouywl <ou...@139.com> 于2019年12月18日周三 下午4:59写道: > >> Hi all, >> We have implemented a filesystem plugin for sink data to hdfs1, and >> the yarn for flink running is used hdfs2. So when the job running, the >> jobmanager use the conf of hdfs1 to create filesystem, the filesystem >> plugin is conflict with flink component. >> We implemeted step: >> 1. ‘FileSystemEnhance’ is implement from “FileSystem” >> 2. ‘FileSystemFactoryEnhance’ is implement from >> “FileSystemFactory”,add >> kerberos auth in ”FileSystemFactoryEnhance" >> 3. Add a service entry. Create a file >> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which >> contains the class name of “ FileSystemFactoryEnhance.class” >> >> And the job mainclass is : >> “ *public static void main(String[] args) throws Exception{* >> >> * StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment();* >> >> >> >> >> >> >> >> >> * env.enableCheckpointing(60*1000); >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> env.getConfig().enableSysoutLogging(); Properties props = new >> Properties(); props.put("bootstrap.servers", SERVERS); >> props.put("group.id <http://group.id>", GROUPID); >> props.put("enable.auto.commit", "true"); // >> props.put("auto.commit.interval.ms <http://auto.commit.interval.ms>", >> "1000"); props.put("session.timeout.ms <http://session.timeout.ms>", >> "30000"); props.put("auto.offset.reset", "latest"); >> props.put("key.deserializer", >> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName()); >> props.put("value.deserializer", StringDeserializer.class.getName()); >> FlinkKafkaConsumer010 consumer011 = new >> FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), >> props); DataStream<String> source = >> env.addSource(consumer011).setParallelism(1); source.print(); >> Thread.currentThread().getContextClassLoader(); StreamingFileSink sink = >> StreamingFileSink .forRowFormat(new >> Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8")) >> .build(); source.addSink(sink); env.execute();}”And start >> the job, the jobmanager filesystem is error, the log means the jobmananger >> use “FileSystemFactoryEnhance” filesystem and confict.As the url >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems >> >> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> >> how to avoid use “Thread.currentThread().getContextClassLoader()"* >> >> >> ouywl >> ou...@139.com >> >> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D> >> >>