Hi all,
    We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component. 
    We implemeted step:
      1.  ‘FileSystemEnhance’ is implement from “FileSystem
      2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance
      3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class

And  the job mainclass is :
   “ public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60*1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().enableSysoutLogging();


Properties props = new Properties();
props.put("bootstrap.servers", SERVERS);
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
// props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
DataStream<String> source = env.addSource(consumer011).setParallelism(1);

source.print();
Thread.currentThread().getContextClassLoader();

StreamingFileSink sink = StreamingFileSink
.forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))
.build();

source.addSink(sink);

env.execute();
}

And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance filesystem and confict.

As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems how to avoid use “Thread.currentThread().getContextClassLoader()"




Reply via email to