Thanks ! My apology for my late response. all good advices I did put the flink-hadoop-fs jar in /lib as Chesnay suggested. and from the IDE simply use file:// as Yun suggested
On Mon, Dec 24, 2018 at 6:32 AM Yun Tang <myas...@live.com> wrote: > Hi Avi > > For application running in your IDE, please set the checkpoint path schema > as "file://", you could refer to source code of ITcases using > rocksDBStateBackend. > > For application running in your cluster, please choose Flink with Hadoop > to download, or choose Flink without hadoop and export your > HADOOP_CLASSPATH [1] > > [1] https://flink.apache.org/downloads.html#latest-stable-release-v171 > > > Best > Yun Tang > ------------------------------ > *From:* Avi Levi <avi.l...@bluevoyant.com> > *Sent:* Thursday, December 20, 2018 2:11 > *To:* Steven Nelson > *Cc:* Chesnay Schepler; user@flink.apache.org > *Subject:* Re: getting an error when configuring state backend to hdfs > > when I try running from my IDE (intellij) I am getting this exception > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Could not retrieve > JobResult. > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at > com.bluevoyant.StreamingJob$.delayedEndpoint$com$bluevoyant$StreamingJob$1(StreamingJob.scala:41) > at > com.bluevoyant.StreamingJob$delayedInit$body.apply(StreamingJob.scala:15) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:392) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at com.bluevoyant.StreamingJob$.main(StreamingJob.scala:15) > at com.bluevoyant.StreamingJob.main(StreamingJob.scala) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > ... 4 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176) > at > org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more > Caused by: java.lang.RuntimeException: Failed to start checkpoint ID > counter: Could not find a file system implementation for scheme 'hdfs'. The > scheme is not directly supported by Flink and no Hadoop file system to > support this scheme could be loaded. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:255) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) > at > org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166) > at > org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1146) > at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:296) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) > ... 10 more > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme 'hdfs'. The scheme > is not directly supported by Flink and no Hadoop file system to support > this scheme could be loaded. > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249) > ... 17 more > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Hadoop is not in the classpath/dependencies. > at > org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399) > ... 23 more > > > On Wed, Dec 19, 2018 at 6:50 PM Steven Nelson <snel...@sourceallies.com> > wrote: > > What image are you using? > > Sent from my iPhone > > On Dec 19, 2018, at 9:44 AM, Avi Levi <avi.l...@bluevoyant.com> wrote: > > Hi Chesnay, > What do you mean? I am creating a fat jar with all dependencies (using sbt > assembly). which jar I should place in the /lib directory ? > > On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler <ches...@apache.org> > wrote: > > Are you including the filesystems in your jar? Filesystem jars must be > placed in the /lib directory of the flink distribution. > > On 19.12.2018 15:03, Avi Levi wrote: > > Hi, > I am trying to set the backend state to hdfs > *val stateUri = "hdfs/path_to_dir"* > *val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, > true)* > > *env.setStateBackend(backend) * > > I am running with flink 1.7.0 with the following dependencies (tried them > with different combinations) : > *"org.apache.flink" %% "flink-connector-filesystem" % flinkV* > *"org.apache.flink" % "flink-hadoop-fs" % flinkV* > *"org.apache.hadoop" % "hadoop-hdfs" % > hadoopVersion* > > > *"org.apache.hadoop" % "hadoop-common" % > hadoopVersion * > *however when running the jar I am getting this error:* > > *Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme 'hdfs'. The scheme > is not directly supported by Flink and no Hadoop file system to support > this scheme could be loaded.* > *at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)* > *at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)* > *at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)* > *at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)* > *at > org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)* > *at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)* > *at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)* > *... 17 more* > *Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Hadoop is not in the classpath/dependencies.* > *at > org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)* > *at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)* > *... 23 more* > > any help will be greatly appreciated > > >