I think you also need to specify a path for the checkpoint directory. Try
to set

state.checkpoints.dir: swift://spout-checkpoints.magellan/flink/checkpoints

Cheers,
Till

On Wed, Apr 24, 2019 at 2:58 PM PoolakkalMukkath, Shakir <
shakir_poolakkalmukk...@comcast.com> wrote:

> Hi Till, Thanks for the response. Yes, I looks at  the document. But still
> trying to figure out
>
>
>
> Let me summaries my config and what I did
>
>
>
>    1. Copied flink-swift-fs-hadoop-1.6.2.jar to lib
>    2. *flink-conf.yaml*
>
>
>
>
> #==============================================================================
>
> # Fault tolerance and checkpointing
>
>
> #==============================================================================
>
>
>
> # The backend that will be used to store operator state checkpoints if
>
> # checkpointing is enabled.
>
> #
>
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
>
> # <class-name-of-factory>.
>
> #
>
> state.backend: filesystem
>
>
>
> # Directory for checkpoints filesystem, when using any of the default
> bundled
>
> # state backends.
>
> state.checkpoints.dir: swift://spout-checkpoints.magellan
>
>
>
>
> #==============================================================================
>
> # Hadoop
>
>
> #==============================================================================
>
> fs.hdfs.hadoopconf: /app/stream/flink-standalone/hadoop/
>
> OR
>
>
>
> export HADOOP_CONF_DIR=/app/stream/flink-standalone/hadoop/
>
>
>
>
>
>
>
>    1. And have the  core-site.xml in
>    HADOOP_CONF_DIR=/app/stream/flink-standalone/hadoop/
>
>
>
> <?xml version="1.0"?>
>
> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
>
>
>
> <configuration>
>
>
>
>   <property>
>
>     <name>fs.swift.impl</name>
>
>     <value>org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem
> </value>
>
>   </property>
>
>
>
>   <property>
>
>     <name>fs.swift.service.magellan.auth.url</name>
>
>     <value>https://osvip-as-c01.ece.***.net:5000/v3</value>
>
>   </property>
>
>
>
>   <property>
>
>     <name>fs.swift.service.magellan.username</name>
>
>     <value>***</value>
>
>   </property>
>
>
>
>   <property>
>
>     <name>fs.swift.service.magellan.password</name>
>
>     <value>*** </value>
>
>   </property>
>
>
>
>   <property>
>
>     <name>fs.swift.service.magellan.public</name>
>
>     <value>true</value>
>
>   </property>
>
>
>
> </configuration>
>
>
>
>
>
> When I submit a job with Checkpointing enabled, getting the below error,
>
>
>
> java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
>
>        at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>
>        at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>
>        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
>        at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
>        at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>        at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>        at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>        at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
>
>        at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
>
>        at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>
>        at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>
>        at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>
>        ... 7 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not instantiate configured state backend
>
>        at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
>
>        at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>
>        at
> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
>
>        at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
>
>        at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
>
>        at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
>
>        ... 10 more
>
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> Invalid configuration for the state backend
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:48)
>
>        at
> org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:121)
>
>        at
> org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
>
>        at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
>
>        ... 15 more
>
> Caused by: java.lang.IllegalArgumentException: Cannot use the root
> directory for checkpoints.
>
>        at
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.validatePath(AbstractFileStateBackend.java:195)
>
>        at
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:109)
>
>        at
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:95)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:319)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:200)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:163)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:126)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:45)
>
>        ... 18 more
>
> 2019-04-24 12:55:35,630 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
> occurred in REST handler.
>
> org.apache.flink.runtime.rest.handler.RestHandlerException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>
>        at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>
>        at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>
>        at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>
>        at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>        at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
>        at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>
>        at akka.dispatch.OnComplete.internal(Future.scala:258)
>
>        at akka.dispatch.OnComplete.internal(Future.scala:256)
>
>        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>
>        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>
>        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>
>        at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>
>        at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>
>        at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>
>        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>
>        at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>
>        at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>
>        at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>
>        at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>
>        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>
>        at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>
>        at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>
>        at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>
>        at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>
>        at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>
>        at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>
>        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
>        at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
>        at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>        at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>        at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>        at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
>
>        at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
>
>        at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>
>        at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>
>        at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>        at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>
>        at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
>
>        at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>
>        at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>
>        at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>
>        at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>
>        at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>
>        at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>
>        at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
>        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
>        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
>        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
>        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
>        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
>        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
>        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
>        ... 4 more
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit job.
>
>        ... 24 more
>
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
>
>        at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>
>        at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>
>        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
>        at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
>        ... 4 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
>
>        at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
>
>        at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>
>        at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>
>        at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>
>        ... 7 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not instantiate configured state backend
>
>        at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
>
>        at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>
>        at
> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
>
>        at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
>
>        at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
>
>        at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
>
>        ... 10 more
>
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> Invalid configuration for the state backend
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:48)
>
>        at
> org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:121)
>
>        at
> org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
>
>        at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
>
>        ... 15 more
>
> Caused by: java.lang.IllegalArgumentException: Cannot use the root
> directory for checkpoints.
>
>        at
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.validatePath(AbstractFileStateBackend.java:195)
>
>        at
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:109)
>
>        at
> org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:95)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:319)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:200)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:163)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:126)
>
>        at
> org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:45)
>
>        ... 18 more
>
>
>
>
>
> Thanks for helping me
>
>
>
> Thanks
>
>
>
> *From: *Till Rohrmann <trohrm...@apache.org>
> *Date: *Wednesday, April 24, 2019 at 6:05 AM
> *To: *"PoolakkalMukkath, Shakir" <shakir_poolakkalmukk...@comcast.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *[EXTERNAL] Re: Looking for help in configuring Swift as State
> Backend
>
>
>
> Hi Shakir,
>
>
>
> have you checked out Flink's documentation for Filesystems [1]? What is
> the problem you are observing?
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html
>
>
>
> Cheers,
>
> Till
>
>
>
> On Tue, Apr 23, 2019 at 9:30 PM PoolakkalMukkath, Shakir <
> shakir_poolakkalmukk...@comcast.com> wrote:
>
> Hi,
>
>
>
> I am looking for some help in configuring the Swift Fs as State Backend. I
> am unable to configure it, let me know if anyone has prior done this or
> knowledge to help me
>
> Do we still need to run an HDFS to use this feature ?
>
>
>
> Thanks,
>
> Shakir
>
>

Reply via email to