Hi, Yuval

The issue you're encountering arises because Flink's FileSystem is designed
to operate with cluster-level configuration. And this configuration is
sourced from the flink-conf.yaml file.
Consequently, when the FileSystem is initialized, it isn't able to access
the configuration objects that are passed through
StreamExecutionEnvironment.getExecutionEnvironment. Therefore, it's better
to specify any related configuration entries directly in the
flink-conf.yaml file to ensure they are recognized by the FileSystem upon
initialization.

Best regards,
Junrui

Yuval Itzchakov <yuva...@gmail.com> 于2023年12月19日周二 15:38写道:

> Flink: 1.17.1
>
> Hi,
> I've encountered a weird issue when passing a configuration object
> to StreamExecutionEnvironment.getExecutionEnvironment does not propagate to
> the hadoop file system being initialized when running Flink locally in an
> IDE.
>
> I am passing credentials in order to connect to Azure Data Lake Storage
> Gen2 using the ABFS. However, until I explicitly initialize Flink
> FileSystem: FileSystem.initialize(conf, null), the configuration object
> does not get propagated automatically.
>
> Thus, the following code does not work:
>
>     val props = new Properties
>     props.put(
>       "fs.azure.account.key.<storage>.dfs.core.windows.net",
>       "..."
>     )
>     props.put("fs.azure.account.auth.type.<storage>.dfs.core.windows.net",
> "SharedKey")
>     val conf = ConfigurationUtils.createConfiguration(props)
>     val env = StreamExecutionEnvironment.getExecutionEnvironment(conf)
>
> But adding the following line makes everything work:
>
>     FileSystem.initialize(conf, null)
>
> Has anyone encountered this issue? I am wondering why explicit
> initialization of the file system is required for this to work.
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Reply via email to