Thanks Yun, I make it work, but now I want to set appropriate config programmatically. I can set state.checkpointing.dir by:
val fsStateBackend = new FsStateBackend(new URI("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")) env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend]) But, I can’t update configuration to add credentials fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key> Because getConfiguration is a private method. Any suggestions? > On Aug 20, 2020, at 9:29 PM, Yun Tang <myas...@live.com> wrote: > > Hi Boris > > I think the official guide [1] should be enough to tell you how to configure. > However, I think your changes to flink-conf.ymal might not take effect as you > have configured the state backend as 'filesystem' while logs still tell us > that "No state backend has been configured, using default (Memory / > JobManager) MemoryStateBackend". > > You can view the log to see whether your changes printed to search for > "Loading configuration property". > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration > > <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration> > > Best > Yun Tang > > From: Boris Lublinsky <boris.lublin...@lightbend.com> > Sent: Friday, August 21, 2020 7:18 > To: user <user@flink.apache.org> > Subject: Re: Flink checkpointing with Azure block storage > > To test it, I created flink-conf.yaml file and put it in resource directory > of my project > The file contains the following: > > #============================================================================== > # Fault tolerance and checkpointing > #============================================================================== > > # The backend that will be used to store operator state checkpoints if > # checkpointing is enabled. > # > # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the > # <class-name-of-factory>. > # > state.backend: filesystem > > # Directory for checkpoints filesystem, when using any of the default bundled > # state backends. > # > state.checkpoints.dir: > wasb://<your-container>@$<your-azure-account>.blob.core.windows.net > <http://blob.core.windows.net/>/<object-path> > > fs.azure.account.key.<account_name>.blob.core.windows.net > <http://blob.core.windows.net/>: <azure_storage_key> > > # Default target directory for savepoints, optional. > # > # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints > <applewebdata://8BADD1A2-B986-4F22-A39B-2CF3F1E02EBB> > > # Flag to enable/disable incremental checkpoints for backends that > > Which should of produce error, > > But what I see is that it does not seen to take effect: > > > 313 [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been > configured, using default (Memory / JobManager) MemoryStateBackend (data in > heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: > 'null', asynchronous: TRUE, maxStateSize: 5242880) > 3327 [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.jobmaster.JobMaster - Using failover strategy > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb > for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc). > 3329 [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService > - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3 > <applewebdata://8BADD1A2-B986-4F22-A39B-2CF3F1E02EBB> > > >> On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <boris.lublin...@lightbend.com >> <mailto:boris.lublin...@lightbend.com>> wrote: >> >> Is there somewhere a complete configuration example for such option?