If you remove environment_type=LOOPBACK, the default is docker, which
requires your Flink task managers to have Docker installed (explained here:
https://beam.apache.org/documentation/runtime/sdk-harness-config/). You can
try Docker in Docker if you want, but that's not the best way to do this.

Instead of trying to run everything inside a single VM, I recommend using
GKE. Instructions here:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md

On Fri, Jun 5, 2020 at 6:22 AM Ashish Raghav <ashish.rag...@corecompete.com>
wrote:

> It does not work even after removing the Loopback flag.
>
> Current setting:
>             "--runner=PortableRunner",
>             "--job_endpoint=localhost:8099",
>
> I have a jobserver running and flink cluster running on docker on the same
> machine on gcp vm.
> CONTAINER ID        IMAGE
> COMMAND                  CREATED             STATUS
> PORTS                              NAMES
> 6e568bcc2167        apache/beam_flink1.10_job_server:latest
> "./flink-job-server.…"   37 minutes ago      Up 37
> minutes                                          reverent_keld
> ysh
> 89df9f6e4b07        flink
> "/docker-entrypoint.…"   6 days ago          Up 36 minutes
> 6121-6123/tcp, 8081/tcp            beam_taskmana
> ger_1
> eaa5de653de9        flink
> "/docker-entrypoint.…"   6 days ago          Up 36 minutes       6123/tcp,
> 0.0.0.0:8081->8081/tcp   beam_jobmanag
> er_1
>
>
> error log:
>
> 2020-06-05 15:30:41
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>         at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>         at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>         at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The user defined 'open()' method caused an
> exception: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:445)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:430)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:297)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
>         at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:198)
>         at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:137)
>         at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:495)
>         ... 4 more
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
>         at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:186)
>         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:168)
>         at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92)
>         at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:246)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:230)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
>         ... 12 more
> Caused by: java.io.IOException: error=2, No such file or directory
>         at java.lang.UNIXProcess.forkAndExec(Native Method)
>         at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>         at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>         at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>         ... 26 more
>
> Ashish Raghav | DE
> Core Compete | ashish.rag...@corecompete.com
> Accelerating Cloud Analytics
>
>
> -----Original Message-----
> From: Ashish Raghav <ashish.rag...@corecompete.com>
> Sent: 01 June 2020 16:48
> To: user@beam.apache.org
> Subject: RE: Issue while submitting python beam pipeline on flink - local
>
> EXTERNAL EMAIL
> Do not click links or open attachments unless you recognise the sender and
> know the content is safe. Report suspicious email to
> info...@corecompete.com.
>
>
>
> Hello Michels,
>
> I am following below documentation and doing local testing only. Both
> job-server and flink cluster are on the same machine. The pipeline too is
> submitted from the same machine.
> I will drop the `--environment_type=LOOPBACK` flag and test again.
>
> https://beam.apache.org/documentation/runners/flink/#dependencies
>
>
> -----Original Message-----
> From: Maximilian Michels <m...@apache.org>
> Sent: 01 June 2020 15:37
> To: user@beam.apache.org
> Subject: Re: Issue while submitting python beam pipeline on flink - local
>
> EXTERNAL EMAIL
> Do not click links or open attachments unless you recognise the sender and
> know the content is safe. Report suspicious email to
> info...@corecompete.com.
>
>
>
> The logs indicate that you are not running the Docker-based execution but
> the `LOOPBACK` mode. In this mode the Flink cluster needs to connect to the
> machine that started the pipeline. That will not be possible unless you are
> running the Flink cluster on the same machine (we bind to `localhost` which
> prevents non-local connections).
>
> Please try the Docker-based execution, i.e. drop the
> `--environment_type=LOOPBACK` flag.
>
> On 31.05.20 20:25, Ashish Raghav wrote:
> > Hello Kyle,
> >
> >
> >
> > I did finally manage to run this setup on gcp after spinning up a
> > Debian server and doing the necessary setup to submit python beam
> > pipeline on flink job server and flink cluster ( I still cannot run
> > directly using FlinkRunner).
> >
> > The jobs are getting submitted on flink cluster , but they are failing
> > with these errors( attached logs).
> >
> > I have setup the -artifacts-dir as gs://<bucket>/prefix while running
> > the flink job server, but still it fails. I don't think this is access
> > issue as the instance on which the flink cluster is running has full
> > access to gcs.
> >
> >
> >
> > I tried following this
> > https://stackoverflow.com/questions/59429897/beam-running-on-flink-wit
> > h-python-sdk-and-using-google-cloud-storage-for-artifac
> >
> > But it seems, there are other things to do to fix this.
> >
> >
> >
> > Please suggest.
> >
> >
> >
> >
> >
> > *From:*Ashish Raghav <ashish.rag...@corecompete.com>
> > *Sent:* 30 May 2020 10:12
> > *To:* user@beam.apache.org
> > *Subject:* Re: Issue while submitting python beam pipeline on flink -
> > local
> >
> >
> >
> > Hello Kyle,  reply below.
> >
> > Also,  what is the stack to run this as production setup on gcloud?? I
> > can try that setup to see if this works.
> >
> >
> >
> > Get Outlook for Android <https://aka.ms/ghei36>
> >
> >
> >
> > ----------------------------------------------------------------------
> > --
> >
> > *From:* Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>
> > *Sent:* Thursday, May 28, 2020, 10:34 PM
> > *To:* user@beam.apache.org <mailto:user@beam.apache.org
> <user@beam.apache.org>>
> > *Subject:* Re: Issue while submitting python beam pipeline on flink -
> > local
> >
> >
> >
> > *EXTERNAL EMAIL*
> >
> > Do not click links or open attachments unless you recognise the sender
> > and know the content is safe. Report suspicious email to
> > info...@corecompete.com <mailto:info...@corecompete.com
> <info...@corecompete.com>>.
> >
> >
> >
> >> You are using the LOOPBACK environment which requires that the Flink
> >> cluster can connect back to your local machine. Since the loopback
> >> environment by defaults binds to localhost that should not be possible.
> >
> >
> >
> > On the Flink runner page, we recommend using --net=host to avoid the
> > kinds of networking issues Ashish is experiencing. But the Docker
> > documentation says the host networking option is only available on
> > Linux (https://docs.docker.com/network/host/). You will either have to:
> >
> > a) expose the required ports, or ==>  I tried exposing 8099 and not
> > using --net=host on docker desktop for windows and it did not work. I
> > also tried running job server on gcloud shell and submitting pipeline
> > over gcloud shell after setting up all requirements. it fails with
> > same errors as local.
> >
> > b) run the job server in a local process instead of a Docker
> > container, as described in older versions of the documentation  ==> I
> > havent tried it yet.
> >
> > (https://web.archive.org/web/20191228110311/https://beam.apache.org/do
> > cumentation/runners/flink/
> > <https://web.archive.org/web/20191228110311/https:/beam.apache.org/doc
> > umentation/runners/flink/>)
> >
> >
> >
>
>

Reply via email to