> When I check the expansion service docker container, normally it downloads a JAR file and starts SDK Fn Harness
To clarify the terminology here, I think you meant the Java SDK harness container not the expansion service. Expansion service is only needed during job submission and your failure is at execution. Seems like your job gets submitted but it's unable to execute. May I know how you started the local Flink cluster ? Please note that since you are using cross-language (SQLTransform) here, you need to start a portable Flink Runner not the legacy Flink runner [1]. Thanks, Cham [1] https://beam.apache.org/documentation/runners/flink/#prerequisites-and-setup On Tue, Mar 12, 2024 at 12:53 PM Jaehyeon Kim <dott...@gmail.com> wrote: > Hello, > > Attached includes more logging details. > > WARNING:root:Waiting for grpc channel to be ready at localhost:45371. > INFO:root:Building pipeline ... > INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at > localhost:36959 > INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== > <function pack_combiners at 0x7fd0ac33d630> ==================== > INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== > <function lift_combiners at 0x7fd0ac33d6c0> ==================== > INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== > <function sort_stages at 0x7fd0ac33de10> ==================== > INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol > scheme to flink_master parameter: http://localhost:8081 > INFO:apache_beam.utils.subprocess_server:Using cached job server jar from > https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.16-job-server/2.53.0/beam-runners-flink-1.16-job-server-2.53.0.jar > INFO:apache_beam.utils.subprocess_server:Starting service with ('java' > '-jar' > '/home/jaehyeon/.apache_beam/cache/jars/beam-runners-flink-1.16-job-server-2.53.0.jar' > '--flink-master' 'http://localhost:8081' '--artifacts-dir' > '/tmp/beam-temp80sx9jtu/artifactsv2s2mbvo' '--job-port' '50353' > '--artifact-port' '0' '--expansion-port' '0') > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM > org.apache.beam.runners.jobsubmission.JobServerDriver > createArtifactStagingService > INFO:apache_beam.utils.subprocess_server:INFO: ArtifactStagingService > started on localhost:38121 > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM > org.apache.beam.runners.jobsubmission.JobServerDriver > createExpansionService > INFO:apache_beam.utils.subprocess_server:INFO: Java ExpansionService > started on localhost:36431 > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM > org.apache.beam.runners.jobsubmission.JobServerDriver createJobServer > INFO:apache_beam.utils.subprocess_server:INFO: JobService started on > localhost:50353 > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:33 AM > org.apache.beam.runners.jobsubmission.JobServerDriver run > INFO:apache_beam.utils.subprocess_server:INFO: Job server now running, > terminate with Ctrl+C > WARNING:root:Waiting for grpc channel to be ready at localhost:50353. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM > org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 > onNext > INFO:apache_beam.utils.subprocess_server:INFO: Staging artifacts for > job_ddf6a897-788d-49ca-8d14-51029ce17f58. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM > org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 > resolveNextEnvironment > INFO:apache_beam.utils.subprocess_server:INFO: Resolving artifacts for > job_ddf6a897-788d-49ca-8d14-51029ce17f58.0 > :ref_Environment_default_environment_2. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM > org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 > onNext > INFO:apache_beam.utils.subprocess_server:INFO: Getting 1 artifacts for > job_ddf6a897-788d-49ca-8d14-51029ce17f58.0:external_1beam:env:docker:v1. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM > org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 > resolveNextEnvironment > INFO:apache_beam.utils.subprocess_server:INFO: Resolving artifacts for > job_ddf6a897-788d-49ca-8d14-51029ce17f58.0:external_1beam:env:docker:v1. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM > org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 > onNext > INFO:apache_beam.utils.subprocess_server:INFO: Getting 1 artifacts for > job_ddf6a897-788d-49ca-8d14-51029ce17f58.null. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:36 AM > org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 > finishStaging > INFO:apache_beam.utils.subprocess_server:INFO: Artifacts fully staged for > job_ddf6a897-788d-49ca-8d14-51029ce17f58. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM > org.apache.beam.runners.flink.FlinkJobInvoker invokeWithExecutor > INFO:apache_beam.utils.subprocess_server:INFO: Invoking job > sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48 with pipeline runner > org.apache.beam.runners.flink.FlinkPipelineRunner@2f4dfe06 > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM > org.apache.beam.runners.jobsubmission.JobInvocation start > INFO:apache_beam.utils.subprocess_server:INFO: Starting job invocation > sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48 > INFO:apache_beam.runners.portability.portable_runner:Environment > "LOOPBACK" has started a component necessary for the execution. Be sure > to run the pipeline using > with Pipeline() as p: > p.apply(..) > This ensures that the pipeline finishes before this program exits. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:37 AM > org.apache.beam.runners.flink.FlinkPipelineRunner > runPipelineWithTranslator > INFO:apache_beam.utils.subprocess_server:INFO: Translating pipeline to > Flink program. > INFO:apache_beam.runners.portability.portable_runner:Job state changed to > STOPPED > INFO:apache_beam.runners.portability.portable_runner:Job state changed to > STARTING > INFO:apache_beam.runners.portability.portable_runner:Job state changed to > RUNNING > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM > org.apache.beam.runners.flink.FlinkExecutionEnvironments > createBatchExecutionEnvironment > INFO:apache_beam.utils.subprocess_server:INFO: Creating a Batch Execution > Environment. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM > org.apache.beam.runners.flink.FlinkExecutionEnvironments > createBatchExecutionEnvironment > INFO:apache_beam.utils.subprocess_server:INFO: Using Flink Master URL > localhost:8081. > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:38 AM > org.apache.flink.api.java.utils.PlanGenerator logTypeRegistrationDetails > INFO:apache_beam.utils.subprocess_server:INFO: The job has 0 registered > types and 0 default Kryo serializers > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:39 AM > org.apache.flink.client.program.rest.RestClusterClient lambda$submitJob$7 > INFO:apache_beam.utils.subprocess_server:INFO: Submitting job > 'sql-transform' (772417f6ae0736da9f7156d334c4b8e7). > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:49:44 AM > org.apache.flink.client.program.rest.RestClusterClient lambda$null$6 > INFO:apache_beam.utils.subprocess_server:INFO: Successfully submitted job > 'sql-transform' (772417f6ae0736da9f7156d334c4b8e7) to ' > http://localhost:8081'. > INFO:apache_beam.runners.worker.statecache:Creating state cache with size > 104857600 > INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control > channel for localhost:34207. > INFO:apache_beam.runners.worker.sdk_worker:Control channel established. > INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with > unbounded number of workers. > INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state > channel for localhost:37411. > INFO:apache_beam.runners.worker.sdk_worker:State channel established. > INFO:apache_beam.runners.worker.data_plane:Creating client data channel > for localhost:34955 > INFO:apache_beam.utils.subprocess_server:Mar 13, 2024 6:50:09 AM > org.apache.beam.runners.jobsubmission.JobInvocation$1 onFailure > INFO:apache_beam.utils.subprocess_server:SEVERE: Error during job > invocation sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48. > INFO:apache_beam.utils.subprocess_server: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: 772417f6ae0736da9f7156d334c4b8e7) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$ > null$6(ClusterClientJobClientAdapter.java:130) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java: > 642) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.util.concurrent.FutureUtils.lambda > $retryOperationWithDelay$6(FutureUtils.java:301) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java: > 859) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture$UniWhenComplete.tryFire( > CompletableFuture.java:837) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.client.program.rest.RestClusterClient.lambda > $pollResourceAsync$31(RestClusterClient.java:772) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java: > 859) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture$UniWhenComplete.tryFire( > CompletableFuture.java:837) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.util.concurrent.FutureUtils.lambda > $retryOperationWithDelay$6(FutureUtils.java:301) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java: > 859) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture$UniWhenComplete.tryFire( > CompletableFuture.java:837) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture$UniCompose.tryFire( > CompletableFuture.java:1085) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java: > 478) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > lang.Thread.run(Thread.java:829) > INFO:apache_beam.utils.subprocess_server:Caused by: > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult( > JobResult.java:144) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$ > null$6(ClusterClientJobClientAdapter.java:128) > INFO:apache_beam.utils.subprocess_server: ... 23 more > INFO:apache_beam.utils.subprocess_server:Caused by: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure > (ExecutionFailureHandler.java:139) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult > (ExecutionFailureHandler.java:83) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure( > DefaultScheduler.java:256) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure( > DefaultScheduler.java:247) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed( > DefaultScheduler.java:240) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate > (SchedulerBase.java:738) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState( > SchedulerBase.java:715) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState( > SchedulerNG.java:78) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState( > JobMaster.java:477) > INFO:apache_beam.utils.subprocess_server: at > jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > INFO:apache_beam.utils.subprocess_server: at java.base/jdk. > internal.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > lang.reflect.Method.invoke(Method.java:566) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$ > 1(AkkaRpcActor.java:309) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader > (ClassLoadingUtils.java:83) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( > AkkaRpcActor.java:307) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( > AkkaRpcActor.java:222) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage( > FencedAkkaRpcActor.java:84) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( > AkkaRpcActor.java:168) > INFO:apache_beam.utils.subprocess_server: at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > INFO:apache_beam.utils.subprocess_server: at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > INFO:apache_beam.utils.subprocess_server: at > scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > INFO:apache_beam.utils.subprocess_server: at > scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > INFO:apache_beam.utils.subprocess_server: at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > INFO:apache_beam.utils.subprocess_server: at scala.PartialFunction$ > OrElse.applyOrElse(PartialFunction.scala:171) > INFO:apache_beam.utils.subprocess_server: at scala.PartialFunction$ > OrElse.applyOrElse(PartialFunction.scala:172) > INFO:apache_beam.utils.subprocess_server: at scala.PartialFunction$ > OrElse.applyOrElse(PartialFunction.scala:172) > INFO:apache_beam.utils.subprocess_server: at > akka.actor.Actor.aroundReceive(Actor.scala:537) > INFO:apache_beam.utils.subprocess_server: at > akka.actor.Actor.aroundReceive$(Actor.scala:535) > INFO:apache_beam.utils.subprocess_server: at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > INFO:apache_beam.utils.subprocess_server: at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > INFO:apache_beam.utils.subprocess_server: at > akka.actor.ActorCell.invoke(ActorCell.scala:548) > INFO:apache_beam.utils.subprocess_server: at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > INFO:apache_beam.utils.subprocess_server: at > akka.dispatch.Mailbox.run(Mailbox.scala:231) > INFO:apache_beam.utils.subprocess_server: at > akka.dispatch.Mailbox.exec(Mailbox.scala:243) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020 > ) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.Exception: > The user defined 'open()' method caused an exception: > java.lang.IllegalStateException: No container running for id > c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( > Task.java:935) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > INFO:apache_beam.utils.subprocess_server: at java.base/java. > lang.Thread.run(Thread.java:829) > INFO:apache_beam.utils.subprocess_server:Caused by: > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalStateException: No container running for id > c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache > $Segment.get(LocalCache.java:2086) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get > (LocalCache.java:4012) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad > (LocalCache.java:4035) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache > $LocalLoadingCache.get(LocalCache.java:5013) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache > $LocalLoadingCache.getUnchecked(LocalCache.java:5020) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory > $SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory > $SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage > (DefaultJobBundleFactory.java:310) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory > (DefaultExecutableStageContext.java:38) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory > $WrappedContext.getStageBundleFactory( > ReferenceCountingExecutableStageContextFactory.java:207) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open > (FlinkExecutableStageFunction.java:157) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction( > FunctionUtils.java:34) > INFO:apache_beam.utils.subprocess_server: at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > INFO:apache_beam.utils.subprocess_server: ... 6 more > INFO:apache_beam.utils.subprocess_server:Caused by: > java.lang.IllegalStateException: No container running for id > c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment > (DockerEnvironmentFactory.java:137) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1 > .load(DefaultJobBundleFactory.java:259) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1 > .load(DefaultJobBundleFactory.java:232) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache > $LoadingValueReference.loadFuture(LocalCache.java:3571) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache > $Segment.loadSync(LocalCache.java:2313) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache > $Segment.lockedGetOrLoad(LocalCache.java:2190) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache > $Segment.get(LocalCache.java:2080) > INFO:apache_beam.utils.subprocess_server: ... 18 more > INFO:apache_beam.utils.subprocess_server: Suppressed: > java.io.IOException: Received exit code 1 for command 'docker kill > c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800'. > stderr: Error response from daemon: Cannot kill container: > c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800: Container > c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 is not > running > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand > (DockerCommand.java:255) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand > (DockerCommand.java:181) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer > (DockerCommand.java:161) > INFO:apache_beam.utils.subprocess_server: at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment > (DockerEnvironmentFactory.java:161) > INFO:apache_beam.utils.subprocess_server: ... 24 more > INFO:apache_beam.utils.subprocess_server: > ERROR:root:java.lang.IllegalStateException: No container running for id > c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 > INFO:apache_beam.runners.portability.portable_runner:Job state changed to > FAILED > Traceback (most recent call last): > File > "/home/jaehyeon/projects/general-demos/beam-dev-env/section3/sql_transform.py", > line 72, in <module> > run() > File > "/home/jaehyeon/projects/general-demos/beam-dev-env/section3/sql_transform.py", > line 68, in run > p.run().wait_until_finish() > File > "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", > line 576, in wait_until_finish > raise self._runtime_exception > RuntimeError: Pipeline sql-transform_4f61dc80-9aeb-4e85-9bfb-3df1b6e3db48 > failed in state FAILED: java.lang.IllegalStateException: No container > running for id > c03fe103bd0c2498b66945608709e6488dabbae9ba359c1e7600c1a9a9c63800 > > > On Tue, 12 Mar 2024 at 15:07, Jaehyeon Kim <dott...@gmail.com> wrote: > >> Hello, >> >> ✔️ I have a simple pipeline that transforms data with *SqlTransform*. I >> use the *FlinkRunner *and, when I don't specify the *flink_master *option >> and use an embedded flink cluster, it works fine. However, if I use a local >> flink cluster and specify the *flink_master *option to *localhost:8081*, >> the expansion service running on Docker doesn't work. The flink cluster >> gets started locally without using Docker ( >> *./setup/flink-1.16.3/bin/start-cluster.sh*). >> >> ✔️ The pipeline code can be found below and I added some troubleshooting >> details below it. >> >> import argparse >> import logging >> import typing >> >> import apache_beam as beam >> from apache_beam.transforms.sql import SqlTransform >> from apache_beam.options.pipeline_options import PipelineOptions >> from apache_beam.options.pipeline_options import SetupOptions >> >> class MyItem(typing.NamedTuple): >> id: int >> name: str >> value: float >> >> beam.coders.registry.register_coder(MyItem, beam.coders.RowCoder) >> >> def convert_to_item(row: list): >> cols = ["id", "name", "value"] >> return MyItem(**dict(zip(cols, row))) >> >> def run(): >> parser = argparse.ArgumentParser( >> description="Process statistics by user from website visit event" >> ) >> parser.add_argument( >> "--runner", default="FlinkRunner", help="Specify Apache Beam >> Runner" >> ) >> parser.add_argument( >> "--use_own", >> action="store_true", >> default="Flag to indicate whether to use an own local cluster", >> ) >> opts = parser.parse_args() >> >> options = PipelineOptions() >> pipeline_opts = { >> "runner": opts.runner, >> "job_name": "sql-transform", >> "environment_type": "LOOPBACK", >> } >> if opts.use_own is True: >> pipeline_opts = {**pipeline_opts, **{"flink_master": >> "localhost:8081"}} >> print(pipeline_opts) >> options = PipelineOptions([], **pipeline_opts) >> # Required, else it will complain that when importing worker >> functions >> options.view_as(SetupOptions).save_main_session = True >> >> query = """ >> SELECT * FROM PCOLLECTION WHERE name = 'jack' >> """ >> >> p = beam.Pipeline(options=options) >> ( >> p >> | beam.Create([[1, "john", 123], [2, "jane", 234], [3, "jack", >> 345]]) >> | beam.Map(convert_to_item).with_output_types(MyItem) >> | SqlTransform(query) >> | beam.Map(print) >> ) >> >> logging.getLogger().setLevel(logging.WARN) >> logging.info("Building pipeline ...") >> >> p.run().wait_until_finish() >> >> >> if __name__ == "__main__": >> run() >> >> ✔️ When I check the expansion service docker container, normally it >> downloads a JAR file and starts SDK Fn Harness. However it doesn't move >> into the download step when I specify the *flink_master *to >> *localhost:8081*. As the service container gets stuck, the flink task >> manager considers it is lost and the container gets killed. >> >> 2024/03/12 03:49:23 Provision info: >> pipeline_options:{fields:{key:"beam:option:allow_non_deterministic_key_coders:v1" >> ... >> runner_capabilities:"beam:protocol:control_response_elements_embedding:v1" >> *2024/03/12 03:49:24 Downloaded: >> /tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar >> (sha256:fb9986c268c434b7357cf59674383f4b60123c1f163984e7500464fc8848fbf0, >> size: 281440385)* >> SLF4J: Class path contains multiple SLF4J bindings. >> SLF4J: Found binding in >> [jar:file:/opt/apache/beam/jars/slf4j-jdk14.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: Found binding in >> [jar:file:/tmp/1-2/staged/beam-sdks-java-extensions-sql-expansion-service-2.53.0--5mGwmjENLc1fPWWdDg_S2ASPB8WOYTnUARk_IhI-_A.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory] >> SDK Fn Harness started >> Harness ID 1-2 >> Logging location url: "localhost:42133" >> >> Control location url: "localhost:36449" >> >> Status location null >> Pipeline Options File pipeline_options.json >> Pipeline Options File pipeline_options.json exists. Overriding existing >> options. >> Pipeline options >> {"beam:option:allow_non_deterministic_key_coders:v1":false,..."beam:option:verify_row_values:v1":false} >> >> ✔️ The difference with/without the *flink_master *option is quite >> minimal in the pipeline options as shown below. However I'm not sure what >> makes it fails to run the pipeline successfully. >> >> without flink_master - fields:{key:"beam:option:flink_master:v1" >> value:{string_value:"[auto]"}} >> with flink_master - fields:{key:"beam:option:flink_master:v1" >> value:{string_value:"http://localhost:8081"}} >> >> Can you please inform me how to fix the issue? >> >> Cheers, >> Jaehyeon >> >