Hi, Can you share the full stack trace or just attach job manager and task managers logs? This exception should have had some cause logged below.
Piotrek > On 19 Dec 2019, at 04:06, ouywl <ou...@139.com> wrote: > > Hi Piotr Nowojski, > I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The > jobmanage don’t start up ,and It load the filesystem plugin in my owner > plugin jar . and the log is : > “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'high-availability.zookeeper.storageDir' instead of proper key > 'high-availability.storageDir' > 2019-12-19 10:58:32,398 INFO com.filesystem.plugin.FileSystemFactoryEnhance > - trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf > 2019-12-19 10:58:32,434 WARN org.apache.hadoop.conf.Configuration > - /tmp/mammut-core-site.xml:an attempt to override final > parameter: fs.defaultFS; Ignoring. > 2019-12-19 10:58:32,436 WARN org.apache.hadoop.conf.Configuration > - /tmp/mammut-hdfs-site.xml:an attempt to override final > parameter: dfs.datanode.data.dir; Ignoring. > 2019-12-19 10:58:32,436 WARN org.apache.hadoop.conf.Configuration > - /tmp/mammut-hdfs-site.xml:an attempt to override final > parameter: dfs.datanode.failed.volumes.tolerated; Ignoring. > 2019-12-19 10:58:32,436 WARN org.apache.hadoop.conf.Configuration > - /tmp/mammut-hdfs-site.xml:an attempt to override final > parameter: dfs.namenode.name.dir; Ignoring. > 2019-12-19 10:58:32,878 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting > YarnJobClusterEntrypoint down with application status FAILED. Diagnostics > java.io.IOException: Could not create FileSystem for highly available storage > (high-availability.storageDir) > at > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119) > at > org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92) > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501) > > at > org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)" > > > ouywl > ou...@139.com > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D> > On 12/19/2019 00:01,Piotr Nowojski<pi...@ververica.com> > <mailto:pi...@ververica.com> wrote: > Hi, > > As Yang Wang pointed out, you should use the new plugins mechanism. > > If it doesn’t work, first make sure that you are shipping/distributing the > plugins jars correctly - the correct plugins directory structure both on the > client machine. Next make sure that the cluster has the same correct setup. > This is especially true for the standalone/cluster execution modes. For yarn, > mesos, docker the plugins dir should be shipped to the cluster by Flink > itself, however Plugins support in yarn is currently semi broken [1]. This is > already fixed, but waiting to be released in 1.9.2 and 1.10. > > If it still doesn’t work, look for TaskManager logs what plugins/file systems > are being loaded during the startup. If none, that's the problem. > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-14382 > <https://issues.apache.org/jira/browse/FLINK-14382> > >> On 18 Dec 2019, at 12:40, Yang Wang <danrtsey...@gmail.com >> <mailto:danrtsey...@gmail.com>> wrote: >> >> You could have a try the new plugin mechanism. >> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then >> put your filesystem related jars in it. >> Different plugins will be loaded by separate classloader to avoid conflict. >> >> >> Best, >> Yang >> >> vino yang <yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> >> 于2019年12月18日周三 下午6:46写道: >> Hi ouywl, >> >> >> Thread.currentThread().getContextClassLoader(); >> What does this statement mean in your program? >> >> In addition, can you share your implementation of the customized file system >> plugin and the related exception? >> >> Best, >> Vino >> >> ouywl <ou...@139.com <mailto:ou...@139.com>> 于2019年12月18日周三 下午4:59写道: >> Hi all, >> We have implemented a filesystem plugin for sink data to hdfs1, and the >> yarn for flink running is used hdfs2. So when the job running, the >> jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin >> is conflict with flink component. >> We implemeted step: >> 1. ‘FileSystemEnhance’ is implement from “FileSystem” >> 2. ‘FileSystemFactoryEnhance’ is implement from >> “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance" >> 3. Add a service entry. Create a file >> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains >> the class name of “ FileSystemFactoryEnhance.class” >> >> And the job mainclass is : >> “ public static void main(String[] args) throws Exception{ >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.enableCheckpointing(60*1000); >> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> env.getConfig().enableSysoutLogging(); >> >> >> Properties props = new Properties(); >> props.put("bootstrap.servers", SERVERS); >> props.put("group.id <http://group.id/>", GROUPID); >> props.put("enable.auto.commit", "true"); >> // props.put("auto.commit.interval.ms >> <http://auto.commit.interval.ms/>", "1000"); >> props.put("session.timeout.ms <http://session.timeout.ms/>", "30000"); >> props.put("auto.offset.reset", "latest"); >> props.put("key.deserializer", >> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName()); >> props.put("value.deserializer", StringDeserializer.class.getName()); >> FlinkKafkaConsumer010 consumer011 = new >> FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props); >> DataStream<String> source = env.addSource(consumer011).setParallelism(1); >> >> source.print(); >> Thread.currentThread().getContextClassLoader(); >> >> StreamingFileSink sink = StreamingFileSink >> .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf >> <hdfs://bdms-test/user/sloth/zyf>"), new SimpleStringEncoder<>("UTF-8")) >> .build(); >> >> source.addSink(sink); >> >> env.execute(); >> }” >> >> And start the job, the jobmanager filesystem is error, the log means the >> jobmananger use “FileSystemFactoryEnhance” filesystem and confict. >> >> As the url >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems >> >> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> >> how to avoid use “Thread.currentThread().getContextClassLoader()" >> >> >> ouywl >> ou...@139.com >> >> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D> > >