Hi, Yuval The issue you're encountering arises because Flink's FileSystem is designed to operate with cluster-level configuration. And this configuration is sourced from the flink-conf.yaml file. Consequently, when the FileSystem is initialized, it isn't able to access the configuration objects that are passed through StreamExecutionEnvironment.getExecutionEnvironment. Therefore, it's better to specify any related configuration entries directly in the flink-conf.yaml file to ensure they are recognized by the FileSystem upon initialization.
Best regards, Junrui Yuval Itzchakov <yuva...@gmail.com> 于2023年12月19日周二 15:38写道: > Flink: 1.17.1 > > Hi, > I've encountered a weird issue when passing a configuration object > to StreamExecutionEnvironment.getExecutionEnvironment does not propagate to > the hadoop file system being initialized when running Flink locally in an > IDE. > > I am passing credentials in order to connect to Azure Data Lake Storage > Gen2 using the ABFS. However, until I explicitly initialize Flink > FileSystem: FileSystem.initialize(conf, null), the configuration object > does not get propagated automatically. > > Thus, the following code does not work: > > val props = new Properties > props.put( > "fs.azure.account.key.<storage>.dfs.core.windows.net", > "..." > ) > props.put("fs.azure.account.auth.type.<storage>.dfs.core.windows.net", > "SharedKey") > val conf = ConfigurationUtils.createConfiguration(props) > val env = StreamExecutionEnvironment.getExecutionEnvironment(conf) > > But adding the following line makes everything work: > > FileSystem.initialize(conf, null) > > Has anyone encountered this issue? I am wondering why explicit > initialization of the file system is required for this to work. > > -- > Best Regards, > Yuval Itzchakov. >