Hi,

Can you share the full stack trace or just attach job manager and task managers 
logs? This exception should have had some cause logged below.

Piotrek

> On 19 Dec 2019, at 04:06, ouywl <ou...@139.com> wrote:
> 
> Hi Piotr Nowojski,
>    I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The 
> jobmanage don’t start up ,and It load the filesystem plugin in my owner 
> plugin jar . and the log is :
>   “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configuration 
> - Config uses deprecated configuration key 
> 'high-availability.zookeeper.storageDir' instead of proper key 
> 'high-availability.storageDir'
> 2019-12-19 10:58:32,398 INFO  com.filesystem.plugin.FileSystemFactoryEnhance  
>               -  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
> 2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration            
>               - /tmp/mammut-core-site.xml:an attempt to override final 
> parameter: fs.defaultFS;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration            
>               - /tmp/mammut-hdfs-site.xml:an attempt to override final 
> parameter: dfs.datanode.data.dir;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration            
>               - /tmp/mammut-hdfs-site.xml:an attempt to override final 
> parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration            
>               - /tmp/mammut-hdfs-site.xml:an attempt to override final 
> parameter: dfs.namenode.name.dir;  Ignoring.
> 2019-12-19 10:58:32,878 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting 
> YarnJobClusterEntrypoint down with application status FAILED. Diagnostics 
> java.io.IOException: Could not create FileSystem for highly available storage 
> (high-availability.storageDir)
>       at 
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
>       at 
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
>       at 
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
>       at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
>       at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
>       at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
>       at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
>       at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
>  
>  at 
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"
> 
>       
> ouywl
> ou...@139.com
>  
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
> On 12/19/2019 00:01,Piotr Nowojski<pi...@ververica.com> 
> <mailto:pi...@ververica.com> wrote: 
> Hi,
> 
> As Yang Wang pointed out, you should use the new plugins mechanism.
> 
> If it doesn’t work, first make sure that you are shipping/distributing the 
> plugins jars correctly - the correct plugins directory structure both on the 
> client machine. Next make sure that the cluster has the same correct setup. 
> This is especially true for the standalone/cluster execution modes. For yarn, 
> mesos, docker the plugins dir should be shipped to the cluster by Flink 
> itself, however Plugins support in yarn is currently semi broken [1]. This is 
> already fixed, but waiting to be released in 1.9.2 and 1.10.
> 
> If it still doesn’t work, look for TaskManager logs what plugins/file systems 
> are being loaded during the startup. If none, that's the problem.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-14382 
> <https://issues.apache.org/jira/browse/FLINK-14382>
> 
>> On 18 Dec 2019, at 12:40, Yang Wang <danrtsey...@gmail.com 
>> <mailto:danrtsey...@gmail.com>> wrote:
>> 
>> You could have a try the new plugin mechanism.
>> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then 
>> put your filesystem related jars in it.
>> Different plugins will be loaded by separate classloader to avoid conflict.
>> 
>> 
>> Best,
>> Yang
>> 
>> vino yang <yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> 
>> 于2019年12月18日周三 下午6:46写道:
>> Hi ouywl,
>> 
>> >>    Thread.currentThread().getContextClassLoader();
>> What does this statement mean in your program?
>> 
>> In addition, can you share your implementation of the customized file system 
>> plugin and the related exception?
>> 
>> Best,
>> Vino
>> 
>> ouywl <ou...@139.com <mailto:ou...@139.com>> 于2019年12月18日周三 下午4:59写道:
>> Hi all,
>>     We have implemented a filesystem plugin for sink data to hdfs1, and the 
>> yarn for flink running is used hdfs2. So when the job running, the 
>> jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin 
>>  is conflict with flink component. 
>>     We implemeted step:
>>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>       2.  ‘FileSystemFactoryEnhance’ is implement from 
>> “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" 
>>       3. Add a service entry. Create a file 
>> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains 
>> the class name of “ FileSystemFactoryEnhance.class”
>> 
>> And  the job mainclass is :
>>    “ public static void main(String[] args) throws Exception{
>>     StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.enableCheckpointing(60*1000);
>>     
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>     
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>     env.getConfig().enableSysoutLogging();
>> 
>> 
>>     Properties props = new Properties();
>>     props.put("bootstrap.servers", SERVERS);
>>     props.put("group.id <http://group.id/>", GROUPID);
>>     props.put("enable.auto.commit", "true");
>>     // props.put("auto.commit.interval.ms 
>> <http://auto.commit.interval.ms/>", "1000");
>>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>>     props.put("auto.offset.reset", "latest");
>>     props.put("key.deserializer", 
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>>     props.put("value.deserializer", StringDeserializer.class.getName());
>>     FlinkKafkaConsumer010 consumer011 = new 
>> FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
>> 
>>     source.print();
>>     Thread.currentThread().getContextClassLoader();
>> 
>>     StreamingFileSink sink = StreamingFileSink
>>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf 
>> <hdfs://bdms-test/user/sloth/zyf>"), new SimpleStringEncoder<>("UTF-8"))
>>             .build();
>> 
>>     source.addSink(sink);
>> 
>>     env.execute();
>> }”
>> 
>> And start the job, the jobmanager filesystem is error, the log means the 
>> jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
>> 
>> As the url 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems>
>>  how to avoid use “Thread.currentThread().getContextClassLoader()"
>> 
>>      
>> ouywl
>> ou...@139.com
>>  
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
> 
> 

Reply via email to