Hello Kyle, I did finally manage to run this setup on gcp after spinning up a Debian server and doing the necessary setup to submit python beam pipeline on flink job server and flink cluster ( I still cannot run directly using FlinkRunner). The jobs are getting submitted on flink cluster , but they are failing with these errors( attached logs). I have setup the -artifacts-dir as gs://<bucket>/prefix while running the flink job server, but still it fails. I don't think this is access issue as the instance on which the flink cluster is running has full access to gcs.
I tried following this https://stackoverflow.com/questions/59429897/beam-running-on-flink-with-python-sdk-and-using-google-cloud-storage-for-artifac But it seems, there are other things to do to fix this. Please suggest. From: Ashish Raghav <ashish.rag...@corecompete.com> Sent: 30 May 2020 10:12 To: user@beam.apache.org Subject: Re: Issue while submitting python beam pipeline on flink - local Hello Kyle, reply below. Also, what is the stack to run this as production setup on gcloud?? I can try that setup to see if this works. Get Outlook for Android<https://aka.ms/ghei36> ________________________________ From: Kyle Weaver <kcwea...@google.com<mailto:kcwea...@google.com>> Sent: Thursday, May 28, 2020, 10:34 PM To: user@beam.apache.org<mailto:user@beam.apache.org> Subject: Re: Issue while submitting python beam pipeline on flink - local EXTERNAL EMAIL Do not click links or open attachments unless you recognise the sender and know the content is safe. Report suspicious email to info...@corecompete.com<mailto:info...@corecompete.com>. > You are using the LOOPBACK environment which requires that the Flink > cluster can connect back to your local machine. Since the loopback > environment by defaults binds to localhost that should not be possible. On the Flink runner page, we recommend using --net=host to avoid the kinds of networking issues Ashish is experiencing. But the Docker documentation says the host networking option is only available on Linux (https://docs.docker.com/network/host/). You will either have to: a) expose the required ports, or ==> I tried exposing 8099 and not using --net=host on docker desktop for windows and it did not work. I also tried running job server on gcloud shell and submitting pipeline over gcloud shell after setting up all requirements. it fails with same errors as local. b) run the job server in a local process instead of a Docker container, as described in older versions of the documentation ==> I havent tried it yet. (https://web.archive.org/web/20191228110311/https://beam.apache.org/documentation/runners/flink/<https://web.archive.org/web/20191228110311/https:/beam.apache.org/documentation/runners/flink/>)
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - LegacyArtifactStagingService started on localhost:8098 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:8097 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:8099 [grpc-default-executor-1] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job beam_to_flink_cluster_f66fa319-20ac-41a8-ae78-576f795e6c41 with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@5ea48610 [grpc-default-executor-1] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation beam_to_flink_cluster_f66fa319-20ac-41a8-ae78-576f795e6c41 [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch Execution Environment. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master URL localhost:8081. [flink-runner-job-invoker] INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers [flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation beam_to_flink_cluster_f66fa319-20ac-41a8-ae78-576f795e6c41. java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 7ade4a3688b2e671b42db146cff1c218) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:864) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:194) at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:116) at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:83) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:83) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 7ade4a3688b2e671b42db146cff1c218) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ... 3 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 19 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.GeneratedMethodAccessor72.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:445) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:430) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:297) at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:198) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:137) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:495) ... 4 more Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140) at org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:237) at org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:246) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:230) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ... 16 more Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:35055 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:688) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) May 31, 2020 5:52:46 PM com.google.api.client.googleapis.services.AbstractGoogleClient <init> WARNING: Application name is not set. Call Builder#setApplicationName. [flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.AbstractLegacyArtifactRetrievalService - Manifest at gs://mybucket/artifacts/job_294cb0e0-7316-485c-80a4-24ee2e7ce28c/MANIFEST has 1 artifact locations [pool-15-thread-1] INFO org.apache.beam.sdk.extensions.gcp.util.GcsUtil - Ignoring failed deletion of file gs://mybucket/artifacts/job_294cb0e0-7316-485c-80a4-24ee2e7ce28c/artifacts/ which already does not exist: {"code":404,"errors":[{"domain":"global","message":"No such object: mybucket/artifacts/job_294cb0e0-7316-485c-80a4-24ee2e7ce28c/artifacts/","reason":"notFound"}],"message":"No such object: mybucket/artifacts/job_294cb0e0-7316-485c-80a4-24ee2e7ce28c/artifacts/"}
2020-05-31 20:54:05 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:445) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:430) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:297) at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:198) at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:137) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:495) ... 4 more Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140) at org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:237) at org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:246) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:230) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ... 16 more Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:46513 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:688) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)