Thanks . Will try From: Kyle Weaver <kcwea...@google.com> Sent: 05 June 2020 20:46 To: user@beam.apache.org Subject: Re: Issue while submitting python beam pipeline on flink - local
EXTERNAL EMAIL Do not click links or open attachments unless you recognise the sender and know the content is safe. Report suspicious email to info...@corecompete.com<mailto:info...@corecompete.com>. If you remove environment_type=LOOPBACK, the default is docker, which requires your Flink task managers to have Docker installed (explained here: https://beam.apache.org/documentation/runtime/sdk-harness-config/). You can try Docker in Docker if you want, but that's not the best way to do this. Instead of trying to run everything inside a single VM, I recommend using GKE. Instructions here: https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md On Fri, Jun 5, 2020 at 6:22 AM Ashish Raghav <ashish.rag...@corecompete.com<mailto:ashish.rag...@corecompete.com>> wrote: It does not work even after removing the Loopback flag. Current setting: "--runner=PortableRunner", "--job_endpoint=localhost:8099", I have a jobserver running and flink cluster running on docker on the same machine on gcp vm. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 6e568bcc2167 apache/beam_flink1.10_job_server:latest "./flink-job-server.…" 37 minutes ago Up 37 minutes reverent_keld ysh 89df9f6e4b07 flink "/docker-entrypoint.…" 6 days ago Up 36 minutes 6121-6123/tcp, 8081/tcp beam_taskmana ger_1 eaa5de653de9 flink "/docker-entrypoint.…" 6 days ago Up 36 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp beam_jobmanag er_1 error log: 2020-06-05 15:30:41 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: java.io.IOException: Cannot run program "docker": error=2, No such file or directory at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: Cannot run program "docker": error=2, No such file or directory at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:445) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:430) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:297) at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:198) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:137) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:495) ... 4 more Caused by: java.io.IOException: Cannot run program "docker": error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:186) at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:168) at org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92) at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:246) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:230) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ... 12 more Caused by: java.io.IOException: error=2, No such file or directory at java.lang.UNIXProcess.forkAndExec(Native Method) at java.lang.UNIXProcess.<init>(UNIXProcess.java:247) at java.lang.ProcessImpl.start(ProcessImpl.java:134) at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) ... 26 more Ashish Raghav | DE Core Compete | ashish.rag...@corecompete.com<mailto:ashish.rag...@corecompete.com> Accelerating Cloud Analytics -----Original Message----- From: Ashish Raghav <ashish.rag...@corecompete.com<mailto:ashish.rag...@corecompete.com>> Sent: 01 June 2020 16:48 To: user@beam.apache.org<mailto:user@beam.apache.org> Subject: RE: Issue while submitting python beam pipeline on flink - local EXTERNAL EMAIL Do not click links or open attachments unless you recognise the sender and know the content is safe. Report suspicious email to info...@corecompete.com<mailto:info...@corecompete.com>. Hello Michels, I am following below documentation and doing local testing only. Both job-server and flink cluster are on the same machine. The pipeline too is submitted from the same machine. I will drop the `--environment_type=LOOPBACK` flag and test again. https://beam.apache.org/documentation/runners/flink/#dependencies -----Original Message----- From: Maximilian Michels <m...@apache.org<mailto:m...@apache.org>> Sent: 01 June 2020 15:37 To: user@beam.apache.org<mailto:user@beam.apache.org> Subject: Re: Issue while submitting python beam pipeline on flink - local EXTERNAL EMAIL Do not click links or open attachments unless you recognise the sender and know the content is safe. Report suspicious email to info...@corecompete.com<mailto:info...@corecompete.com>. The logs indicate that you are not running the Docker-based execution but the `LOOPBACK` mode. In this mode the Flink cluster needs to connect to the machine that started the pipeline. That will not be possible unless you are running the Flink cluster on the same machine (we bind to `localhost` which prevents non-local connections). Please try the Docker-based execution, i.e. drop the `--environment_type=LOOPBACK` flag. On 31.05.20 20:25, Ashish Raghav wrote: > Hello Kyle, > > > > I did finally manage to run this setup on gcp after spinning up a > Debian server and doing the necessary setup to submit python beam > pipeline on flink job server and flink cluster ( I still cannot run > directly using FlinkRunner). > > The jobs are getting submitted on flink cluster , but they are failing > with these errors( attached logs). > > I have setup the -artifacts-dir as gs://<bucket>/prefix while running > the flink job server, but still it fails. I don't think this is access > issue as the instance on which the flink cluster is running has full > access to gcs. > > > > I tried following this > https://stackoverflow.com/questions/59429897/beam-running-on-flink-wit > h-python-sdk-and-using-google-cloud-storage-for-artifac > > But it seems, there are other things to do to fix this. > > > > Please suggest. > > > > > > *From:*Ashish Raghav > <ashish.rag...@corecompete.com<mailto:ashish.rag...@corecompete.com>> > *Sent:* 30 May 2020 10:12 > *To:* user@beam.apache.org<mailto:user@beam.apache.org> > *Subject:* Re: Issue while submitting python beam pipeline on flink - > local > > > > Hello Kyle, reply below. > > Also, what is the stack to run this as production setup on gcloud?? I > can try that setup to see if this works. > > > > Get Outlook for Android <https://aka.ms/ghei36> > > > > ---------------------------------------------------------------------- > -- > > *From:* Kyle Weaver <kcwea...@google.com > <mailto:kcwea...@google.com<mailto:kcwea...@google.com+%3Cmailto:kcwea...@google.com>>> > *Sent:* Thursday, May 28, 2020, 10:34 PM > *To:* user@beam.apache.org<mailto:user@beam.apache.org> > <mailto:user@beam.apache.org> > *Subject:* Re: Issue while submitting python beam pipeline on flink - > local > > > > *EXTERNAL EMAIL* > > Do not click links or open attachments unless you recognise the sender > and know the content is safe. Report suspicious email to > info...@corecompete.com<mailto:info...@corecompete.com> > <mailto:info...@corecompete.com>. > > > >> You are using the LOOPBACK environment which requires that the Flink >> cluster can connect back to your local machine. Since the loopback >> environment by defaults binds to localhost that should not be possible. > > > > On the Flink runner page, we recommend using --net=host to avoid the > kinds of networking issues Ashish is experiencing. But the Docker > documentation says the host networking option is only available on > Linux (https://docs.docker.com/network/host/). You will either have to: > > a) expose the required ports, or ==> I tried exposing 8099 and not > using --net=host on docker desktop for windows and it did not work. I > also tried running job server on gcloud shell and submitting pipeline > over gcloud shell after setting up all requirements. it fails with > same errors as local. > > b) run the job server in a local process instead of a Docker > container, as described in older versions of the documentation ==> I > havent tried it yet. > > (https://web.archive.org/web/20191228110311/https://beam.apache.org/do<https://web.archive.org/web/20191228110311/https:/beam.apache.org/do> > cumentation/runners/flink/ > <https://web.archive.org/web/20191228110311/https:/beam.apache.org/doc > umentation/runners/flink/>) > > >