Hi Yan, This seems to be a bug in the FileSystems and how they're initialized. I'm looking into this myself but I'm also looping in Stephan and Stefan how have worked on this the most in the past. Maybe they have some valuable input.
Best, Aljoscha > On 4. Oct 2018, at 01:18, Yan Yan <yanyan300...@g.mail.com> wrote: > > Hi, > > We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS > configuration. > > We are using FlinkYarnSessionCli to start the cluster and submit job. > > In 1.3.2, we set below Flink properties when using checkpoints: > state.backend.fs.checkpointdir = hdfs://nameservice0/.../.. > state.checkpoints.dir = hdfs://nameservice0/../.. > > The mapping between logical nameservice (nameservice0) and actual namenodes > hostports are passed to Flink via yarnship/core-site.xml (by providing the > -yt option), and set fs.hdfs.hadoopconf=yarnship/ > > However, we encountered below error after bumping to 1.4, which caused the > checkpointing to fail. > > 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN > org.apache.flink.runtime.checkpoint.OperatorSubtaskState - Error while > discarding operator states. > java.io.IOException: Cannot instantiate file system for URI: > hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a > at > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81) > at > org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65) > at > org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51) > at > org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53) > at > org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207) > at > org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108) > at > org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51) > at > org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: > nameservice0 > at > org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378) > at > org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310) > at > org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678) > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619) > at > org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) > at > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159) > ... 15 common frames omitted > Caused by: java.net.UnknownHostException: nameservice0 > ... 22 common frames omitted > > It does not recognize nameservice0 because the core-site.xml on the actual > machine (read in by Flink via $HADOOP_CONF_DIR) does not use nameservice0 but > something else for the fs.defaultFs > > Digging a little bit, I found that the hadoopConfig (code > <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>) > does not have the properties we set via yarnship/core-site.xml. Especially, > I suspect it is due to the cached HadoopFsFactory is initialized with an > dummy Configuration (code > <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>), > which prevents future flinkConfig getting passed in (code > <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>). > > I am not sure if this is intentional, or has been solved in later releases. > Has anyone encountered the same problem? And I would appreciate any > suggestions. > > Thanks, > Yan