Thanks for figuring this out, Shuyi!

> On 9. Oct 2018, at 09:09, Shuyi Chen <suez1...@gmail.com> wrote:
> 
> I think the bug is introduced in FLINK-7643 (Rework FileSystem loading to use 
> factories). In YarnApplicationMasterRunner, after the JIRA, FileSystem was 
> not properly initialized with the correct flink configuration before calling 
> runApplicationMaster(). W/o the initialization, a call of 
> FileSystem.get("hdfs://nameservice0/") will cause the HadoopFsFactory to be 
> initialized with an empty Flink Configuration, thus not able to recognize 
> nameservice0.
> 
> I've composed unittests in YarnApplicationMasterRunner, 
> YarnTaskManagerRunnerFactory and YarnTaskExecutorRunner, and verify that 
> YarnApplicationMasterRunner fail to initialize the FileSystem properly, and  
> YarnTaskManagerRunnerFactory does the right thing, so does 
> YarnTaskExecutorRunner.
> 
> I'll create a JIRA to track the fix.
> 
> 
> On Thu, Oct 4, 2018 at 11:23 AM Yan Yan <yanyan300...@gmail.com 
> <mailto:yanyan300...@gmail.com>> wrote:
> Hi Aljoscha,
> 
> Thanks for looking into this! Yes, we toggled Flink 1.4 back to Flink 1.3.2 
> and it works. So seems truly a feature disparity between 1.3.2 and 1.4.
> 
> Best,
> Yan
> 
> 
> On Thu, Oct 4, 2018 at 6:36 AM Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Another thing: when you retry this again with Flink 1.3.2 it works? I'm 
> trying to rule out another problem in the setup.
> 
> 
>> On 4. Oct 2018, at 15:17, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> 
>> Hi Yan,
>> 
>> This seems to be a bug in the FileSystems and how they're initialized. I'm 
>> looking into this myself but I'm also looping in Stephan and Stefan how have 
>> worked on this the most in the past. Maybe they have some valuable input.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 4. Oct 2018, at 01:18, Yan Yan <yanyan300...@g.mail.com 
>>> <mailto:yanyan300...@g.mail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS 
>>> configuration.
>>> 
>>> We are using FlinkYarnSessionCli to start the cluster and submit job.
>>> 
>>> In 1.3.2, we set below Flink properties when using checkpoints:
>>> state.backend.fs.checkpointdir = hdfs://nameservice0/ <>.../..
>>> state.checkpoints.dir = hdfs://nameservice0/ <>../..
>>> 
>>> The mapping between logical nameservice (nameservice0) and actual namenodes 
>>> hostports are passed to Flink via yarnship/core-site.xml (by providing the 
>>> -yt option), and set fs.hdfs.hadoopconf=yarnship/
>>> 
>>> However, we encountered below error after bumping to 1.4, which caused the 
>>> checkpointing to fail.
>>> 
>>> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  
>>> org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while 
>>> discarding operator states.
>>> java.io.IOException: Cannot instantiate file system for URI: 
>>> hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
>>>  <>
>>>     at 
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>>     at 
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>>     at 
>>> org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>>>     at 
>>> org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>>>     at 
>>> org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>>>     at 
>>> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>>     at 
>>> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>>     at 
>>> org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>>>     at 
>>> org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>>>     at 
>>> org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>>     at 
>>> org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>>     at 
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalArgumentException: 
>>> java.net.UnknownHostException: nameservice0
>>>     at 
>>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>>>     at 
>>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>>>     at 
>>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>>     at 
>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>>     at 
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>>     ... 15 common frames omitted
>>> Caused by: java.net.UnknownHostException: nameservice0
>>>     ... 22 common frames omitted
>>> 
>>> It does not recognize nameservice0 because the core-site.xml on the actual 
>>> machine (read in by Flink via $HADOOP_CONF_DIR) does not use nameservice0 
>>> but something else for the fs.defaultFs
>>> 
>>> Digging a little bit, I found that the hadoopConfig (code 
>>> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>)
>>>  does not have the properties we set via yarnship/core-site.xml. 
>>> Especially, I suspect it is due to the cached HadoopFsFactory is 
>>> initialized with an dummy Configuration (code 
>>> <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>),
>>>  which prevents future flinkConfig getting passed in (code 
>>> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>).
>>> 
>>> I am not sure if this is intentional, or has been solved in later releases. 
>>> Has anyone encountered the same problem? And I would appreciate any 
>>> suggestions.
>>> 
>>> Thanks,
>>> Yan
>> 
> 
> -- 
> Best,
> Yan
> 
> 
> -- 
> "So you have to trust that the dots will somehow connect in your future."

Reply via email to