Hi, Is google pubsub supported with beam running in flink? I'm having a hard time getting to work a pipeline that consumes from Google pubsub. It works perfectly with beam DirectRunner, but is refusing to work with FlinkRunner. I debugged a lot and came to the conclusion that the problem is triggered when translating my Python pipeline for flink runner.
I've found that it is not even submitting the job to flink, so for I've seen the culprit is flink job server. This is the full log when I run it locally: $ python shares_pipeline.py -batch_size 10 --runner=FlinkRunner --flink_master=localhost:8081 --flink_version=1.13 --streaming --environment_type=LOOPBACK INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:33805 WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.31.0 INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol scheme to flink_master parameter: http://localhost:8081 INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.13-job-server/2.31.0/beam-runners-flink-1.13-job-server-2.31.0.jar INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/home/comtom/.apache_beam/cache/jars/beam-runners-flink-1.13-job-server-2.31.0.jar' '--flink-master' 'http://localhost:8081' '--artifacts-dir' '/tmp/beam-temp3izv3a04/artifactsg943fqea' '--job-port' '48261' '--artifact-port' '0' '--expansion-port' '0'] INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:47 PM org.apache.beam.runners.jobsubmission.JobServerDriver createArtifactStagingService' INFO:apache_beam.utils.subprocess_server:b'INFO: ArtifactStagingService started on localhost:41579' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:47 PM org.apache.beam.runners.jobsubmission.JobServerDriver createExpansionService' INFO:apache_beam.utils.subprocess_server:b'INFO: Java ExpansionService started on localhost:41357' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:47 PM org.apache.beam.runners.jobsubmission.JobServerDriver createJobServer' INFO:apache_beam.utils.subprocess_server:b'INFO: JobService started on localhost:48261' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:47 PM org.apache.beam.runners.jobsubmission.JobServerDriver run' INFO:apache_beam.utils.subprocess_server:b'INFO: Job server now running, terminate with Ctrl+C' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext' INFO:apache_beam.utils.subprocess_server:b'INFO: Staging artifacts for job_b9969ac8-40a0-4f0e-aa8a-a868cb1e3cfc.' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 resolveNextEnvironment' INFO:apache_beam.utils.subprocess_server:b'INFO: Resolving artifacts for job_b9969ac8-40a0-4f0e-aa8a-a868cb1e3cfc.ref_Environment_default_environment_1.' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext' INFO:apache_beam.utils.subprocess_server:b'INFO: Getting 0 artifacts for job_b9969ac8-40a0-4f0e-aa8a-a868cb1e3cfc.null.' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 finishStaging' INFO:apache_beam.utils.subprocess_server:b'INFO: Artifacts fully staged for job_b9969ac8-40a0-4f0e-aa8a-a868cb1e3cfc.' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:48 PM org.apache.beam.runners.flink.FlinkJobInvoker invokeWithExecutor' INFO:apache_beam.utils.subprocess_server:b'INFO: Invoking job BeamApp-comtom-0810175348-36b1a680_95330817-9401-4e1d-90d3-9354d2045250 with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@75975728' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:49 PM org.apache.beam.runners.jobsubmission.JobInvocation start' INFO:apache_beam.utils.subprocess_server:b'INFO: Starting job invocation BeamApp-comtom-0810175348-36b1a680_95330817-9401-4e1d-90d3-9354d2045250' INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using with Pipeline() as p: p.apply(..) This ensures that the pipeline finishes before this program exits. INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:49 PM org.apache.beam.runners.flink.FlinkPipelineRunner runPipelineWithTranslator' INFO:apache_beam.utils.subprocess_server:b'INFO: Translating pipeline to Flink program.' INFO:apache_beam.utils.subprocess_server:b'Aug 10, 2021 7:53:49 PM org.apache.beam.runners.jobsubmission.JobInvocation$1 onFailure' INFO:apache_beam.utils.subprocess_server:b'SEVERE: Error during job invocation BeamApp-comtom-0810175348-36b1a680_95330817-9401-4e1d-90d3-9354d2045250.' INFO:apache_beam.utils.subprocess_server:b'java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read From Pub/Sub/Read.None"' INFO:apache_beam.utils.subprocess_server:b'coder_id: "ref_Coder_BytesCoder_1"' INFO:apache_beam.utils.subprocess_server:b'is_bounded: UNBOUNDED' INFO:apache_beam.utils.subprocess_server:b'windowing_strategy_id: "ref_Windowing_Windowing_1"' INFO:apache_beam.utils.subprocess_server:b'}] were consumed but never produced' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:234)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:127)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:90)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:70)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:93)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:112)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:85)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)' INFO:apache_beam.utils.subprocess_server:b'\tat org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)' INFO:apache_beam.utils.subprocess_server:b'\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)' INFO:apache_beam.utils.subprocess_server:b'\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)' INFO:apache_beam.utils.subprocess_server:b'\tat java.base/java.lang.Thread.run(Thread.java:832)' INFO:apache_beam.utils.subprocess_server:b'' ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read From Pub/Sub/Read.None" coder_id: "ref_Coder_BytesCoder_1" is_bounded: UNBOUNDED windowing_strategy_id: "ref_Windowing_Windowing_1" }] were consumed but never produced INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED Traceback (most recent call last): File "shares_pipeline.py", line 80, in <module> run(known_args, pipeline_args) File "shares_pipeline.py", line 51, in run p = create_pipeline(pipeline_options, known_args) File "shares_pipeline.py", line 38, in create_pipeline | "print" >> beam.Map(print) File "/home/comtom/.pyenv/versions/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 586, in __exit__ self.result.wait_until_finish() File "/home/comtom/.pyenv/versions/venv/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 602, in wait_until_finish raise self._runtime_exception RuntimeError: Pipeline BeamApp-comtom-0810175348-36b1a680_95330817-9401-4e1d-90d3-9354d2045250 failed in state FAILED: java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "22Read From Pub/Sub/Read.None" coder_id: "ref_Coder_BytesCoder_1" is_bounded: UNBOUNDED windowing_strategy_id: "ref_Windowing_Windowing_1" }] were consumed but never produced I've even simplified the pipeline and the same error is triggered, this is the pipeline definition: with beam.Pipeline(options=pipeline_options) as p: ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=known_args.topic) | "print" >> beam.Map(print) ) Could you give me a hint on what to look for?