Hi,

As Yang Wang pointed out, you should use the new plugins mechanism.

If it doesn’t work, first make sure that you are shipping/distributing the 
plugins jars correctly - the correct plugins directory structure both on the 
client machine. Next make sure that the cluster has the same correct setup. 
This is especially true for the standalone/cluster execution modes. For yarn, 
mesos, docker the plugins dir should be shipped to the cluster by Flink itself, 
however Plugins support in yarn is currently semi broken [1]. This is already 
fixed, but waiting to be released in 1.9.2 and 1.10.

If it still doesn’t work, look for TaskManager logs what plugins/file systems 
are being loaded during the startup. If none, that's the problem.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-14382 
<https://issues.apache.org/jira/browse/FLINK-14382>

> On 18 Dec 2019, at 12:40, Yang Wang <danrtsey...@gmail.com> wrote:
> 
> You could have a try the new plugin mechanism.
> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put 
> your filesystem related jars in it.
> Different plugins will be loaded by separate classloader to avoid conflict.
> 
> 
> Best,
> Yang
> 
> vino yang <yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> 
> 于2019年12月18日周三 下午6:46写道:
> Hi ouywl,
> 
> >>    Thread.currentThread().getContextClassLoader();
> What does this statement mean in your program?
> 
> In addition, can you share your implementation of the customized file system 
> plugin and the related exception?
> 
> Best,
> Vino
> 
> ouywl <ou...@139.com <mailto:ou...@139.com>> 于2019年12月18日周三 下午4:59写道:
> Hi all,
>     We have implemented a filesystem plugin for sink data to hdfs1, and the 
> yarn for flink running is used hdfs2. So when the job running, the jobmanager 
> use the conf of hdfs1 to create filesystem, the filesystem plugin  is 
> conflict with flink component. 
>     We implemeted step:
>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>       2.  ‘FileSystemFactoryEnhance’ is implement from 
> “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" 
>       3. Add a service entry. Create a file 
> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains 
> the class name of “ FileSystemFactoryEnhance.class”
> 
> And  the job mainclass is :
>    “ public static void main(String[] args) throws Exception{
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(60*1000);
>     
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.getConfig().enableSysoutLogging();
> 
> 
>     Properties props = new Properties();
>     props.put("bootstrap.servers", SERVERS);
>     props.put("group.id <http://group.id/>", GROUPID);
>     props.put("enable.auto.commit", "true");
>     // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", 
> "1000");
>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>     props.put("auto.offset.reset", "latest");
>     props.put("key.deserializer", 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     FlinkKafkaConsumer010 consumer011 = new 
> FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
> 
>     source.print();
>     Thread.currentThread().getContextClassLoader();
> 
>     StreamingFileSink sink = StreamingFileSink
>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new 
> SimpleStringEncoder<>("UTF-8"))
>             .build();
> 
>     source.addSink(sink);
> 
>     env.execute();
> }”
> 
> And start the job, the jobmanager filesystem is error, the log means the 
> jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
> 
> As the url 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems>
>  how to avoid use “Thread.currentThread().getContextClassLoader()"
> 
> 
>       
> ouywl
> ou...@139.com
>  
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>

Reply via email to