We had something like this when we were setting it in our code (now we’re passing it via config). There’s likely a better /cleaner way: private def configureCheckpoints(env: StreamExecutionEnvironment, checkpointPath: String): Unit = { if (checkpointPath.startsWith("wasb")) { import org.apache.hadoop.fs.{Path => HPath} import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem val jobCheckpointsPath = new HPath(checkpointPath) val conf = new Configuration() conf.setString( "fs.azure.account.key.storage-account.blob.core.windows.net", "access-key" ) FileSystem.initialize(conf) // this ensures the AzureFS is initialized and with correct creds } // other checkpoint config stuff } -- Piyush From: Boris Lublinsky <boris.lublin...@lightbend.com> Date: Saturday, August 22, 2020 at 10:08 PM To: Yun Tang <myas...@live.com> Cc: user <user@flink.apache.org> Subject: Re: Flink checkpointing with Azure block storage Thanks Yun, I make it work, but now I want to set appropriate config programmatically. I can set state.checkpointing.dir by: val fsStateBackend = new FsStateBackend(new URI("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net<http://blob.core.windows.net>/<object-path>")) env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend]) But, I can’t update configuration to add credentials fs.azure.account.key.<account_name>.blob.core.windows.net<http://blob.core.windows.net>: <azure_storage_key> Because getConfiguration is a private method. Any suggestions? On Aug 20, 2020, at 9:29 PM, Yun Tang <myas...@live.com<mailto:myas...@live.com>> wrote: Hi Boris I think the official guide [1] should be enough to tell you how to configure. However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend". You can view the log to see whether your changes printed to search for "Loading configuration property". [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration Best Yun Tang ________________________________ From: Boris Lublinsky <boris.lublin...@lightbend.com<mailto:boris.lublin...@lightbend.com>> Sent: Friday, August 21, 2020 7:18 To: user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Flink checkpointing with Azure block storage To test it, I created flink-conf.yaml file and put it in resource directory of my project The file contains the following: #============================================================================== # Fault tolerance and checkpointing #============================================================================== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # <class-name-of-factory>. # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net<http://blob.core.windows.net/>/<object-path> fs.azure.account.key.<account_name>.blob.core.windows.net<http://blob.core.windows.net/>: <azure_storage_key> # Default target directory for savepoints, optional. # # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints # Flag to enable/disable incremental checkpoints for backends that Which should of produce error, But what I see is that it does not seen to take effect: 313 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 3327 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.jobmaster.JobMaster - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc). 3329 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3 On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <boris.lublin...@lightbend.com<mailto:boris.lublin...@lightbend.com>> wrote: Is there somewhere a complete configuration example for such option?