If you remove environment_type=LOOPBACK, the default is docker, which requires your Flink task managers to have Docker installed (explained here: https://beam.apache.org/documentation/runtime/sdk-harness-config/). You can try Docker in Docker if you want, but that's not the best way to do this.
Instead of trying to run everything inside a single VM, I recommend using GKE. Instructions here: https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md On Fri, Jun 5, 2020 at 6:22 AM Ashish Raghav <ashish.rag...@corecompete.com> wrote: > It does not work even after removing the Loopback flag. > > Current setting: > "--runner=PortableRunner", > "--job_endpoint=localhost:8099", > > I have a jobserver running and flink cluster running on docker on the same > machine on gcp vm. > CONTAINER ID IMAGE > COMMAND CREATED STATUS > PORTS NAMES > 6e568bcc2167 apache/beam_flink1.10_job_server:latest > "./flink-job-server.…" 37 minutes ago Up 37 > minutes reverent_keld > ysh > 89df9f6e4b07 flink > "/docker-entrypoint.…" 6 days ago Up 36 minutes > 6121-6123/tcp, 8081/tcp beam_taskmana > ger_1 > eaa5de653de9 flink > "/docker-entrypoint.…" 6 days ago Up 36 minutes 6123/tcp, > 0.0.0.0:8081->8081/tcp beam_jobmanag > er_1 > > > error log: > > 2020-06-05 15:30:41 > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:517) > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The user defined 'open()' method caused an > exception: java.io.IOException: Cannot run program "docker": error=2, No > such file or directory > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.io.IOException: Cannot run program "docker": error=2, No such file or > directory > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:445) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:430) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:297) > at > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) > at > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:198) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:137) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:495) > ... 4 more > Caused by: java.io.IOException: Cannot run program "docker": error=2, No > such file or directory > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:186) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:168) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92) > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:246) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:230) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) > ... 12 more > Caused by: java.io.IOException: error=2, No such file or directory > at java.lang.UNIXProcess.forkAndExec(Native Method) > at java.lang.UNIXProcess.<init>(UNIXProcess.java:247) > at java.lang.ProcessImpl.start(ProcessImpl.java:134) > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) > ... 26 more > > Ashish Raghav | DE > Core Compete | ashish.rag...@corecompete.com > Accelerating Cloud Analytics > > > -----Original Message----- > From: Ashish Raghav <ashish.rag...@corecompete.com> > Sent: 01 June 2020 16:48 > To: user@beam.apache.org > Subject: RE: Issue while submitting python beam pipeline on flink - local > > EXTERNAL EMAIL > Do not click links or open attachments unless you recognise the sender and > know the content is safe. Report suspicious email to > info...@corecompete.com. > > > > Hello Michels, > > I am following below documentation and doing local testing only. Both > job-server and flink cluster are on the same machine. The pipeline too is > submitted from the same machine. > I will drop the `--environment_type=LOOPBACK` flag and test again. > > https://beam.apache.org/documentation/runners/flink/#dependencies > > > -----Original Message----- > From: Maximilian Michels <m...@apache.org> > Sent: 01 June 2020 15:37 > To: user@beam.apache.org > Subject: Re: Issue while submitting python beam pipeline on flink - local > > EXTERNAL EMAIL > Do not click links or open attachments unless you recognise the sender and > know the content is safe. Report suspicious email to > info...@corecompete.com. > > > > The logs indicate that you are not running the Docker-based execution but > the `LOOPBACK` mode. In this mode the Flink cluster needs to connect to the > machine that started the pipeline. That will not be possible unless you are > running the Flink cluster on the same machine (we bind to `localhost` which > prevents non-local connections). > > Please try the Docker-based execution, i.e. drop the > `--environment_type=LOOPBACK` flag. > > On 31.05.20 20:25, Ashish Raghav wrote: > > Hello Kyle, > > > > > > > > I did finally manage to run this setup on gcp after spinning up a > > Debian server and doing the necessary setup to submit python beam > > pipeline on flink job server and flink cluster ( I still cannot run > > directly using FlinkRunner). > > > > The jobs are getting submitted on flink cluster , but they are failing > > with these errors( attached logs). > > > > I have setup the -artifacts-dir as gs://<bucket>/prefix while running > > the flink job server, but still it fails. I don't think this is access > > issue as the instance on which the flink cluster is running has full > > access to gcs. > > > > > > > > I tried following this > > https://stackoverflow.com/questions/59429897/beam-running-on-flink-wit > > h-python-sdk-and-using-google-cloud-storage-for-artifac > > > > But it seems, there are other things to do to fix this. > > > > > > > > Please suggest. > > > > > > > > > > > > *From:*Ashish Raghav <ashish.rag...@corecompete.com> > > *Sent:* 30 May 2020 10:12 > > *To:* user@beam.apache.org > > *Subject:* Re: Issue while submitting python beam pipeline on flink - > > local > > > > > > > > Hello Kyle, reply below. > > > > Also, what is the stack to run this as production setup on gcloud?? I > > can try that setup to see if this works. > > > > > > > > Get Outlook for Android <https://aka.ms/ghei36> > > > > > > > > ---------------------------------------------------------------------- > > -- > > > > *From:* Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>> > > *Sent:* Thursday, May 28, 2020, 10:34 PM > > *To:* user@beam.apache.org <mailto:user@beam.apache.org > <user@beam.apache.org>> > > *Subject:* Re: Issue while submitting python beam pipeline on flink - > > local > > > > > > > > *EXTERNAL EMAIL* > > > > Do not click links or open attachments unless you recognise the sender > > and know the content is safe. Report suspicious email to > > info...@corecompete.com <mailto:info...@corecompete.com > <info...@corecompete.com>>. > > > > > > > >> You are using the LOOPBACK environment which requires that the Flink > >> cluster can connect back to your local machine. Since the loopback > >> environment by defaults binds to localhost that should not be possible. > > > > > > > > On the Flink runner page, we recommend using --net=host to avoid the > > kinds of networking issues Ashish is experiencing. But the Docker > > documentation says the host networking option is only available on > > Linux (https://docs.docker.com/network/host/). You will either have to: > > > > a) expose the required ports, or ==> I tried exposing 8099 and not > > using --net=host on docker desktop for windows and it did not work. I > > also tried running job server on gcloud shell and submitting pipeline > > over gcloud shell after setting up all requirements. it fails with > > same errors as local. > > > > b) run the job server in a local process instead of a Docker > > container, as described in older versions of the documentation ==> I > > havent tried it yet. > > > > (https://web.archive.org/web/20191228110311/https://beam.apache.org/do > > cumentation/runners/flink/ > > <https://web.archive.org/web/20191228110311/https:/beam.apache.org/doc > > umentation/runners/flink/>) > > > > > > > >