I think you also need to specify a path for the checkpoint directory. Try to set
state.checkpoints.dir: swift://spout-checkpoints.magellan/flink/checkpoints Cheers, Till On Wed, Apr 24, 2019 at 2:58 PM PoolakkalMukkath, Shakir < shakir_poolakkalmukk...@comcast.com> wrote: > Hi Till, Thanks for the response. Yes, I looks at the document. But still > trying to figure out > > > > Let me summaries my config and what I did > > > > 1. Copied flink-swift-fs-hadoop-1.6.2.jar to lib > 2. *flink-conf.yaml* > > > > > #============================================================================== > > # Fault tolerance and checkpointing > > > #============================================================================== > > > > # The backend that will be used to store operator state checkpoints if > > # checkpointing is enabled. > > # > > # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the > > # <class-name-of-factory>. > > # > > state.backend: filesystem > > > > # Directory for checkpoints filesystem, when using any of the default > bundled > > # state backends. > > state.checkpoints.dir: swift://spout-checkpoints.magellan > > > > > #============================================================================== > > # Hadoop > > > #============================================================================== > > fs.hdfs.hadoopconf: /app/stream/flink-standalone/hadoop/ > > OR > > > > export HADOOP_CONF_DIR=/app/stream/flink-standalone/hadoop/ > > > > > > > > 1. And have the core-site.xml in > HADOOP_CONF_DIR=/app/stream/flink-standalone/hadoop/ > > > > <?xml version="1.0"?> > > <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> > > > > <configuration> > > > > <property> > > <name>fs.swift.impl</name> > > <value>org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem > </value> > > </property> > > > > <property> > > <name>fs.swift.service.magellan.auth.url</name> > > <value>https://osvip-as-c01.ece.***.net:5000/v3</value> > > </property> > > > > <property> > > <name>fs.swift.service.magellan.username</name> > > <value>***</value> > > </property> > > > > <property> > > <name>fs.swift.service.magellan.password</name> > > <value>*** </value> > > </property> > > > > <property> > > <name>fs.swift.service.magellan.public</name> > > <value>true</value> > > </property> > > > > </configuration> > > > > > > When I submit a job with Checkpointing enabled, getting the below error, > > > > java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176) > > at > org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) > > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) > > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > > ... 7 more > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not instantiate configured state backend > > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308) > > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) > > at > org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151) > > at > org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131) > > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294) > > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) > > ... 10 more > > Caused by: org.apache.flink.configuration.IllegalConfigurationException: > Invalid configuration for the state backend > > at > org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:48) > > at > org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:121) > > at > org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222) > > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304) > > ... 15 more > > Caused by: java.lang.IllegalArgumentException: Cannot use the root > directory for checkpoints. > > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.validatePath(AbstractFileStateBackend.java:195) > > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:109) > > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:95) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:319) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:200) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:163) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:126) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:45) > > ... 18 more > > 2019-04-24 12:55:35,630 ERROR > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception > occurred in REST handler. > > org.apache.flink.runtime.rest.handler.RestHandlerException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151) > > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) > > at akka.dispatch.OnComplete.internal(Future.scala:258) > > at akka.dispatch.OnComplete.internal(Future.scala:256) > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) > > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20) > > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) > > at > scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > > at > scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > job. > > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267) > > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739) > > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > > ... 4 more > > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit job. > > ... 24 more > > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > ... 4 more > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176) > > at > org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) > > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) > > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > > ... 7 more > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not instantiate configured state backend > > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308) > > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) > > at > org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151) > > at > org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131) > > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294) > > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) > > ... 10 more > > Caused by: org.apache.flink.configuration.IllegalConfigurationException: > Invalid configuration for the state backend > > at > org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:48) > > at > org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:121) > > at > org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222) > > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304) > > ... 15 more > > Caused by: java.lang.IllegalArgumentException: Cannot use the root > directory for checkpoints. > > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.validatePath(AbstractFileStateBackend.java:195) > > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:109) > > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:95) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:319) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:200) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:163) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:126) > > at > org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:45) > > ... 18 more > > > > > > Thanks for helping me > > > > Thanks > > > > *From: *Till Rohrmann <trohrm...@apache.org> > *Date: *Wednesday, April 24, 2019 at 6:05 AM > *To: *"PoolakkalMukkath, Shakir" <shakir_poolakkalmukk...@comcast.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *[EXTERNAL] Re: Looking for help in configuring Swift as State > Backend > > > > Hi Shakir, > > > > have you checked out Flink's documentation for Filesystems [1]? What is > the problem you are observing? > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html > > > > Cheers, > > Till > > > > On Tue, Apr 23, 2019 at 9:30 PM PoolakkalMukkath, Shakir < > shakir_poolakkalmukk...@comcast.com> wrote: > > Hi, > > > > I am looking for some help in configuring the Swift Fs as State Backend. I > am unable to configure it, let me know if anyone has prior done this or > knowledge to help me > > Do we still need to run an HDFS to use this feature ? > > > > Thanks, > > Shakir > >